risingwave_batch_executors/executor/join/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod chunked_data;
16mod distributed_lookup_join;
17pub mod hash_join;
18pub mod local_lookup_join;
19mod lookup_join_base;
20pub mod nested_loop_join;
21
22pub use chunked_data::*;
23pub use distributed_lookup_join::*;
24pub use hash_join::*;
25use itertools::Itertools;
26pub use local_lookup_join::*;
27pub use lookup_join_base::*;
28pub use nested_loop_join::*;
29use risingwave_common::array::{DataChunk, RowRef};
30use risingwave_common::row::Row;
31use risingwave_common::types::{DataType, DatumRef};
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType, JoinType as PbJoinType};
34
35use crate::error::Result;
36
37#[derive(Copy, Clone, Debug, Default, PartialEq)]
38pub enum JoinType {
39    #[default]
40    Inner,
41    LeftOuter,
42    /// Semi join when probe side should output when matched
43    LeftSemi,
44    /// Anti join when probe side should not output when matched
45    LeftAnti,
46    RightOuter,
47    /// Semi join when build side should output when matched
48    RightSemi,
49    /// Anti join when build side should output when matched
50    RightAnti,
51    FullOuter,
52    AsOfInner,
53    AsOfLeftOuter,
54}
55
56impl JoinType {
57    pub fn from_prost(prost: PbJoinType) -> Self {
58        match prost {
59            PbJoinType::Inner => JoinType::Inner,
60            PbJoinType::LeftOuter => JoinType::LeftOuter,
61            PbJoinType::LeftSemi => JoinType::LeftSemi,
62            PbJoinType::LeftAnti => JoinType::LeftAnti,
63            PbJoinType::RightOuter => JoinType::RightOuter,
64            PbJoinType::RightSemi => JoinType::RightSemi,
65            PbJoinType::RightAnti => JoinType::RightAnti,
66            PbJoinType::FullOuter => JoinType::FullOuter,
67            PbJoinType::AsofInner => JoinType::AsOfInner,
68            PbJoinType::AsofLeftOuter => JoinType::AsOfLeftOuter,
69            PbJoinType::Unspecified => {
70                unreachable!()
71            }
72        }
73    }
74}
75
76#[cfg(test)]
77impl JoinType {
78    #![allow(dead_code)]
79
80    fn keep_all(self) -> bool {
81        matches!(
82            self,
83            JoinType::FullOuter | JoinType::LeftOuter | JoinType::RightOuter | JoinType::Inner
84        )
85    }
86
87    fn keep_left(self) -> bool {
88        matches!(self, JoinType::LeftAnti | JoinType::LeftSemi)
89    }
90
91    fn keep_right(self) -> bool {
92        matches!(self, JoinType::RightAnti | JoinType::RightSemi)
93    }
94}
95
96#[derive(Clone, Debug)]
97pub enum AsOfInequalityType {
98    Le,
99    Lt,
100    Ge,
101    Gt,
102}
103
104#[derive(Clone, Debug)]
105pub struct AsOfDesc {
106    pub left_idx: usize,
107    pub right_idx: usize,
108    pub inequality_type: AsOfInequalityType,
109}
110
111impl AsOfDesc {
112    pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> crate::error::Result<Self> {
113        let typ = match desc_proto.inequality_type() {
114            AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
115            AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
116            AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
117            AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
118            AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
119                bail!("unspecified AsOf join inequality type")
120            }
121        };
122        Ok(Self {
123            left_idx: desc_proto.left_idx as usize,
124            right_idx: desc_proto.right_idx as usize,
125            inequality_type: typ,
126        })
127    }
128}
129
130/// The layout be like:
131///
132/// [ `left` chunk     |  `right` chunk     ]
133///
134/// # Arguments
135///
136/// * `left` Data chunk padded to the left half of result data chunk..
137/// * `right` Data chunk padded to the right half of result data chunk.
138///
139/// Note: Use this function with careful: It is not designed to be a general concatenate of two
140/// chunk: Usually one side should be const row chunk and the other side is normal chunk.
141/// Currently only feasible to use in join executor.
142/// If two normal chunk, the result is undefined.
143fn concatenate(left: &DataChunk, right: &DataChunk) -> Result<DataChunk> {
144    assert_eq!(left.capacity(), right.capacity());
145    let mut concated_columns = Vec::with_capacity(left.columns().len() + right.columns().len());
146    concated_columns.extend_from_slice(left.columns());
147    concated_columns.extend_from_slice(right.columns());
148    // Only handle one side is constant row chunk: One of visibility must be None.
149    let vis = match (left.is_compacted(), right.is_compacted()) {
150        (true, _) => right.visibility().clone(),
151        (_, true) => left.visibility().clone(),
152        (false, false) => {
153            panic!("The concatenate behaviour of two chunk with visibility is undefined")
154        }
155    };
156    let data_chunk = DataChunk::new(concated_columns, vis);
157    Ok(data_chunk)
158}
159
160/// Create constant data chunk (one tuple repeat `num_tuples` times).
161fn convert_datum_refs_to_chunk(
162    datum_refs: &[DatumRef<'_>],
163    num_tuples: usize,
164    data_types: &[DataType],
165) -> Result<DataChunk> {
166    let mut output_array_builders: Vec<_> = data_types
167        .iter()
168        .map(|data_type| data_type.create_array_builder(num_tuples))
169        .collect();
170    for _i in 0..num_tuples {
171        for (builder, datum_ref) in output_array_builders.iter_mut().zip_eq_fast(datum_refs) {
172            builder.append(*datum_ref);
173        }
174    }
175
176    // Finish each array builder and get Column.
177    let result_columns = output_array_builders
178        .into_iter()
179        .map(|b| b.finish().into())
180        .collect();
181
182    Ok(DataChunk::new(result_columns, num_tuples))
183}
184
185/// Create constant data chunk (one tuple repeat `num_tuples` times).
186fn convert_row_to_chunk(
187    row_ref: &RowRef<'_>,
188    num_tuples: usize,
189    data_types: &[DataType],
190) -> Result<DataChunk> {
191    let datum_refs = row_ref.iter().collect_vec();
192    convert_datum_refs_to_chunk(&datum_refs, num_tuples, data_types)
193}
194
195#[cfg(test)]
196mod tests {
197
198    use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder};
199    use risingwave_common::bitmap::Bitmap;
200    use risingwave_common::catalog::{Field, Schema};
201    use risingwave_common::row::Row;
202    use risingwave_common::types::{DataType, ScalarRefImpl};
203
204    use crate::executor::join::{concatenate, convert_datum_refs_to_chunk};
205
206    #[test]
207    fn test_concatenate() {
208        let num_of_columns: usize = 2;
209        let length = 5;
210        let mut columns = vec![];
211        for i in 0..num_of_columns {
212            let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
213            for _ in 0..length {
214                builder.append(Some(i as i32));
215            }
216            let arr = builder.finish();
217            columns.push(arr.into_ref())
218        }
219        let chunk1 = DataChunk::new(columns.clone(), length);
220        let visibility = Bitmap::from_bool_slice(&[true, false, true, false, false]);
221        let chunk2 = DataChunk::new(columns.clone(), visibility.clone());
222        let chunk = concatenate(&chunk1, &chunk2).unwrap();
223        assert_eq!(chunk.capacity(), chunk1.capacity());
224        assert_eq!(chunk.capacity(), chunk2.capacity());
225        assert_eq!(chunk.columns().len(), chunk1.columns().len() * 2);
226        assert_eq!(chunk.visibility(), &visibility);
227    }
228
229    /// Test the function of convert row into constant row chunk (one row repeat multiple times).
230    #[test]
231    fn test_convert_row_to_chunk() {
232        let row = vec![Some(ScalarRefImpl::Int32(3))];
233        let probe_side_schema = Schema {
234            fields: vec![Field::unnamed(DataType::Int32)],
235        };
236        let const_row_chunk =
237            convert_datum_refs_to_chunk(&row, 5, &probe_side_schema.data_types()).unwrap();
238        assert_eq!(const_row_chunk.capacity(), 5);
239        assert_eq!(
240            const_row_chunk.row_at(2).0.datum_at(0),
241            Some(ScalarRefImpl::Int32(3))
242        );
243    }
244}