risingwave_batch_executors/executor/join/
mod.rs
1mod chunked_data;
16mod distributed_lookup_join;
17pub mod hash_join;
18pub mod local_lookup_join;
19mod lookup_join_base;
20pub mod nested_loop_join;
21
22pub use chunked_data::*;
23pub use distributed_lookup_join::*;
24pub use hash_join::*;
25use itertools::Itertools;
26pub use local_lookup_join::*;
27pub use lookup_join_base::*;
28pub use nested_loop_join::*;
29use risingwave_common::array::{DataChunk, RowRef};
30use risingwave_common::row::Row;
31use risingwave_common::types::{DataType, DatumRef};
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType, JoinType as PbJoinType};
34
35use crate::error::Result;
36
37#[derive(Copy, Clone, Debug, Default, PartialEq)]
38pub enum JoinType {
39 #[default]
40 Inner,
41 LeftOuter,
42 LeftSemi,
44 LeftAnti,
46 RightOuter,
47 RightSemi,
49 RightAnti,
51 FullOuter,
52 AsOfInner,
53 AsOfLeftOuter,
54}
55
56impl JoinType {
57 pub fn from_prost(prost: PbJoinType) -> Self {
58 match prost {
59 PbJoinType::Inner => JoinType::Inner,
60 PbJoinType::LeftOuter => JoinType::LeftOuter,
61 PbJoinType::LeftSemi => JoinType::LeftSemi,
62 PbJoinType::LeftAnti => JoinType::LeftAnti,
63 PbJoinType::RightOuter => JoinType::RightOuter,
64 PbJoinType::RightSemi => JoinType::RightSemi,
65 PbJoinType::RightAnti => JoinType::RightAnti,
66 PbJoinType::FullOuter => JoinType::FullOuter,
67 PbJoinType::AsofInner => JoinType::AsOfInner,
68 PbJoinType::AsofLeftOuter => JoinType::AsOfLeftOuter,
69 PbJoinType::Unspecified => {
70 unreachable!()
71 }
72 }
73 }
74}
75
76#[cfg(test)]
77impl JoinType {
78 #![allow(dead_code)]
79
80 fn keep_all(self) -> bool {
81 matches!(
82 self,
83 JoinType::FullOuter | JoinType::LeftOuter | JoinType::RightOuter | JoinType::Inner
84 )
85 }
86
87 fn keep_left(self) -> bool {
88 matches!(self, JoinType::LeftAnti | JoinType::LeftSemi)
89 }
90
91 fn keep_right(self) -> bool {
92 matches!(self, JoinType::RightAnti | JoinType::RightSemi)
93 }
94}
95
96#[derive(Clone, Debug)]
97pub enum AsOfInequalityType {
98 Le,
99 Lt,
100 Ge,
101 Gt,
102}
103
104#[derive(Clone, Debug)]
105pub struct AsOfDesc {
106 pub left_idx: usize,
107 pub right_idx: usize,
108 pub inequality_type: AsOfInequalityType,
109}
110
111impl AsOfDesc {
112 pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> crate::error::Result<Self> {
113 let typ = match desc_proto.inequality_type() {
114 AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
115 AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
116 AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
117 AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
118 AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
119 bail!("unspecified AsOf join inequality type")
120 }
121 };
122 Ok(Self {
123 left_idx: desc_proto.left_idx as usize,
124 right_idx: desc_proto.right_idx as usize,
125 inequality_type: typ,
126 })
127 }
128}
129
130fn concatenate(left: &DataChunk, right: &DataChunk) -> Result<DataChunk> {
144 assert_eq!(left.capacity(), right.capacity());
145 let mut concated_columns = Vec::with_capacity(left.columns().len() + right.columns().len());
146 concated_columns.extend_from_slice(left.columns());
147 concated_columns.extend_from_slice(right.columns());
148 let vis = match (left.is_compacted(), right.is_compacted()) {
150 (true, _) => right.visibility().clone(),
151 (_, true) => left.visibility().clone(),
152 (false, false) => {
153 panic!("The concatenate behaviour of two chunk with visibility is undefined")
154 }
155 };
156 let data_chunk = DataChunk::new(concated_columns, vis);
157 Ok(data_chunk)
158}
159
160fn convert_datum_refs_to_chunk(
162 datum_refs: &[DatumRef<'_>],
163 num_tuples: usize,
164 data_types: &[DataType],
165) -> Result<DataChunk> {
166 let mut output_array_builders: Vec<_> = data_types
167 .iter()
168 .map(|data_type| data_type.create_array_builder(num_tuples))
169 .collect();
170 for _i in 0..num_tuples {
171 for (builder, datum_ref) in output_array_builders.iter_mut().zip_eq_fast(datum_refs) {
172 builder.append(*datum_ref);
173 }
174 }
175
176 let result_columns = output_array_builders
178 .into_iter()
179 .map(|b| b.finish().into())
180 .collect();
181
182 Ok(DataChunk::new(result_columns, num_tuples))
183}
184
185fn convert_row_to_chunk(
187 row_ref: &RowRef<'_>,
188 num_tuples: usize,
189 data_types: &[DataType],
190) -> Result<DataChunk> {
191 let datum_refs = row_ref.iter().collect_vec();
192 convert_datum_refs_to_chunk(&datum_refs, num_tuples, data_types)
193}
194
195#[cfg(test)]
196mod tests {
197
198 use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder};
199 use risingwave_common::bitmap::Bitmap;
200 use risingwave_common::catalog::{Field, Schema};
201 use risingwave_common::row::Row;
202 use risingwave_common::types::{DataType, ScalarRefImpl};
203
204 use crate::executor::join::{concatenate, convert_datum_refs_to_chunk};
205
206 #[test]
207 fn test_concatenate() {
208 let num_of_columns: usize = 2;
209 let length = 5;
210 let mut columns = vec![];
211 for i in 0..num_of_columns {
212 let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
213 for _ in 0..length {
214 builder.append(Some(i as i32));
215 }
216 let arr = builder.finish();
217 columns.push(arr.into_ref())
218 }
219 let chunk1 = DataChunk::new(columns.clone(), length);
220 let visibility = Bitmap::from_bool_slice(&[true, false, true, false, false]);
221 let chunk2 = DataChunk::new(columns.clone(), visibility.clone());
222 let chunk = concatenate(&chunk1, &chunk2).unwrap();
223 assert_eq!(chunk.capacity(), chunk1.capacity());
224 assert_eq!(chunk.capacity(), chunk2.capacity());
225 assert_eq!(chunk.columns().len(), chunk1.columns().len() * 2);
226 assert_eq!(chunk.visibility(), &visibility);
227 }
228
229 #[test]
231 fn test_convert_row_to_chunk() {
232 let row = vec![Some(ScalarRefImpl::Int32(3))];
233 let probe_side_schema = Schema {
234 fields: vec![Field::unnamed(DataType::Int32)],
235 };
236 let const_row_chunk =
237 convert_datum_refs_to_chunk(&row, 5, &probe_side_schema.data_types()).unwrap();
238 assert_eq!(const_row_chunk.capacity(), 5);
239 assert_eq!(
240 const_row_chunk.row_at(2).0.datum_at(0),
241 Some(ScalarRefImpl::Int32(3))
242 );
243 }
244}