risingwave_batch_executors/executor/join/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod chunked_data;
16mod distributed_lookup_join;
17pub mod hash_join;
18pub mod local_lookup_join;
19mod lookup_join_base;
20pub mod nested_loop_join;
21
22pub use chunked_data::*;
23pub use distributed_lookup_join::*;
24pub use hash_join::*;
25use itertools::Itertools;
26pub use local_lookup_join::*;
27pub use lookup_join_base::*;
28pub use nested_loop_join::*;
29use risingwave_common::array::{DataChunk, RowRef};
30use risingwave_common::row::Row;
31use risingwave_common::types::{DataType, DatumRef};
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType, JoinType as PbJoinType};
34
35use crate::error::Result;
36
37#[derive(Copy, Clone, Debug, Default, PartialEq)]
38pub enum JoinType {
39    #[default]
40    Inner,
41    LeftOuter,
42    /// Semi join when probe side should output when matched
43    LeftSemi,
44    /// Anti join when probe side should not output when matched
45    LeftAnti,
46    RightOuter,
47    /// Semi join when build side should output when matched
48    RightSemi,
49    /// Anti join when build side should output when matched
50    RightAnti,
51    FullOuter,
52    AsOfInner,
53    AsOfLeftOuter,
54}
55
56impl JoinType {
57    pub fn from_prost(prost: PbJoinType) -> Self {
58        match prost {
59            PbJoinType::Inner => JoinType::Inner,
60            PbJoinType::LeftOuter => JoinType::LeftOuter,
61            PbJoinType::LeftSemi => JoinType::LeftSemi,
62            PbJoinType::LeftAnti => JoinType::LeftAnti,
63            PbJoinType::RightOuter => JoinType::RightOuter,
64            PbJoinType::RightSemi => JoinType::RightSemi,
65            PbJoinType::RightAnti => JoinType::RightAnti,
66            PbJoinType::FullOuter => JoinType::FullOuter,
67            PbJoinType::AsofInner => JoinType::AsOfInner,
68            PbJoinType::AsofLeftOuter => JoinType::AsOfLeftOuter,
69            PbJoinType::Unspecified => {
70                unreachable!()
71            }
72        }
73    }
74}
75
76#[cfg(test)]
77impl JoinType {
78    fn keep_all(self) -> bool {
79        matches!(
80            self,
81            JoinType::FullOuter | JoinType::LeftOuter | JoinType::RightOuter | JoinType::Inner
82        )
83    }
84
85    fn keep_left(self) -> bool {
86        matches!(self, JoinType::LeftAnti | JoinType::LeftSemi)
87    }
88
89    fn keep_right(self) -> bool {
90        matches!(self, JoinType::RightAnti | JoinType::RightSemi)
91    }
92}
93
94#[derive(Clone, Debug)]
95pub enum AsOfInequalityType {
96    Le,
97    Lt,
98    Ge,
99    Gt,
100}
101
102#[derive(Clone, Debug)]
103pub struct AsOfDesc {
104    pub left_idx: usize,
105    pub right_idx: usize,
106    pub inequality_type: AsOfInequalityType,
107}
108
109impl AsOfDesc {
110    pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> crate::error::Result<Self> {
111        let typ = match desc_proto.inequality_type() {
112            AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
113            AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
114            AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
115            AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
116            AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
117                bail!("unspecified AsOf join inequality type")
118            }
119        };
120        Ok(Self {
121            left_idx: desc_proto.left_idx as usize,
122            right_idx: desc_proto.right_idx as usize,
123            inequality_type: typ,
124        })
125    }
126}
127
128/// The layout be like:
129///
130/// [ `left` chunk     |  `right` chunk     ]
131///
132/// # Arguments
133///
134/// * `left` Data chunk padded to the left half of result data chunk..
135/// * `right` Data chunk padded to the right half of result data chunk.
136///
137/// Note: Use this function with careful: It is not designed to be a general concatenate of two
138/// chunk: Usually one side should be const row chunk and the other side is normal chunk.
139/// Currently only feasible to use in join executor.
140/// If two normal chunk, the result is undefined.
141fn concatenate(left: &DataChunk, right: &DataChunk) -> Result<DataChunk> {
142    assert_eq!(left.capacity(), right.capacity());
143    let mut concated_columns = Vec::with_capacity(left.columns().len() + right.columns().len());
144    concated_columns.extend_from_slice(left.columns());
145    concated_columns.extend_from_slice(right.columns());
146    // Only handle one side is constant row chunk: One of visibility must be None.
147    let vis = match (left.is_vis_compacted(), right.is_vis_compacted()) {
148        (true, _) => right.visibility().clone(),
149        (_, true) => left.visibility().clone(),
150        (false, false) => {
151            panic!("The concatenate behaviour of two chunk with visibility is undefined")
152        }
153    };
154    let data_chunk = DataChunk::new(concated_columns, vis);
155    Ok(data_chunk)
156}
157
158/// Create constant data chunk (one tuple repeat `num_tuples` times).
159fn convert_datum_refs_to_chunk(
160    datum_refs: &[DatumRef<'_>],
161    num_tuples: usize,
162    data_types: &[DataType],
163) -> Result<DataChunk> {
164    let mut output_array_builders: Vec<_> = data_types
165        .iter()
166        .map(|data_type| data_type.create_array_builder(num_tuples))
167        .collect();
168    for _i in 0..num_tuples {
169        for (builder, datum_ref) in output_array_builders.iter_mut().zip_eq_fast(datum_refs) {
170            builder.append(*datum_ref);
171        }
172    }
173
174    // Finish each array builder and get Column.
175    let result_columns = output_array_builders
176        .into_iter()
177        .map(|b| b.finish().into())
178        .collect();
179
180    Ok(DataChunk::new(result_columns, num_tuples))
181}
182
183/// Create constant data chunk (one tuple repeat `num_tuples` times).
184fn convert_row_to_chunk(
185    row_ref: &RowRef<'_>,
186    num_tuples: usize,
187    data_types: &[DataType],
188) -> Result<DataChunk> {
189    let datum_refs = row_ref.iter().collect_vec();
190    convert_datum_refs_to_chunk(&datum_refs, num_tuples, data_types)
191}
192
193#[cfg(test)]
194mod tests {
195
196    use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder};
197    use risingwave_common::bitmap::Bitmap;
198    use risingwave_common::catalog::{Field, Schema};
199    use risingwave_common::row::Row;
200    use risingwave_common::types::{DataType, ScalarRefImpl};
201
202    use crate::executor::join::{concatenate, convert_datum_refs_to_chunk};
203
204    #[test]
205    fn test_concatenate() {
206        let num_of_columns: usize = 2;
207        let length = 5;
208        let mut columns = vec![];
209        for i in 0..num_of_columns {
210            let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
211            for _ in 0..length {
212                builder.append(Some(i as i32));
213            }
214            let arr = builder.finish();
215            columns.push(arr.into_ref())
216        }
217        let chunk1 = DataChunk::new(columns.clone(), length);
218        let visibility = Bitmap::from_bool_slice(&[true, false, true, false, false]);
219        let chunk2 = DataChunk::new(columns.clone(), visibility.clone());
220        let chunk = concatenate(&chunk1, &chunk2).unwrap();
221        assert_eq!(chunk.capacity(), chunk1.capacity());
222        assert_eq!(chunk.capacity(), chunk2.capacity());
223        assert_eq!(chunk.columns().len(), chunk1.columns().len() * 2);
224        assert_eq!(chunk.visibility(), &visibility);
225    }
226
227    /// Test the function of convert row into constant row chunk (one row repeat multiple times).
228    #[test]
229    fn test_convert_row_to_chunk() {
230        let row = vec![Some(ScalarRefImpl::Int32(3))];
231        let probe_side_schema = Schema {
232            fields: vec![Field::unnamed(DataType::Int32)],
233        };
234        let const_row_chunk =
235            convert_datum_refs_to_chunk(&row, 5, &probe_side_schema.data_types()).unwrap();
236        assert_eq!(const_row_chunk.capacity(), 5);
237        assert_eq!(
238            const_row_chunk.row_at(2).0.datum_at(0),
239            Some(ScalarRefImpl::Int32(3))
240        );
241    }
242}