risingwave_batch_executors/executor/join/
mod.rs1mod chunked_data;
16mod distributed_lookup_join;
17pub mod hash_join;
18pub mod local_lookup_join;
19mod lookup_join_base;
20pub mod nested_loop_join;
21
22pub use chunked_data::*;
23pub use distributed_lookup_join::*;
24pub use hash_join::*;
25use itertools::Itertools;
26pub use local_lookup_join::*;
27pub use lookup_join_base::*;
28pub use nested_loop_join::*;
29use risingwave_common::array::{DataChunk, RowRef};
30use risingwave_common::row::Row;
31use risingwave_common::types::{DataType, DatumRef};
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType, JoinType as PbJoinType};
34
35use crate::error::Result;
36
37#[derive(Copy, Clone, Debug, Default, PartialEq)]
38pub enum JoinType {
39 #[default]
40 Inner,
41 LeftOuter,
42 LeftSemi,
44 LeftAnti,
46 RightOuter,
47 RightSemi,
49 RightAnti,
51 FullOuter,
52 AsOfInner,
53 AsOfLeftOuter,
54}
55
56impl JoinType {
57 pub fn from_prost(prost: PbJoinType) -> Self {
58 match prost {
59 PbJoinType::Inner => JoinType::Inner,
60 PbJoinType::LeftOuter => JoinType::LeftOuter,
61 PbJoinType::LeftSemi => JoinType::LeftSemi,
62 PbJoinType::LeftAnti => JoinType::LeftAnti,
63 PbJoinType::RightOuter => JoinType::RightOuter,
64 PbJoinType::RightSemi => JoinType::RightSemi,
65 PbJoinType::RightAnti => JoinType::RightAnti,
66 PbJoinType::FullOuter => JoinType::FullOuter,
67 PbJoinType::AsofInner => JoinType::AsOfInner,
68 PbJoinType::AsofLeftOuter => JoinType::AsOfLeftOuter,
69 PbJoinType::Unspecified => {
70 unreachable!()
71 }
72 }
73 }
74}
75
76#[cfg(test)]
77impl JoinType {
78 fn keep_all(self) -> bool {
79 matches!(
80 self,
81 JoinType::FullOuter | JoinType::LeftOuter | JoinType::RightOuter | JoinType::Inner
82 )
83 }
84
85 fn keep_left(self) -> bool {
86 matches!(self, JoinType::LeftAnti | JoinType::LeftSemi)
87 }
88
89 fn keep_right(self) -> bool {
90 matches!(self, JoinType::RightAnti | JoinType::RightSemi)
91 }
92}
93
94#[derive(Clone, Debug)]
95pub enum AsOfInequalityType {
96 Le,
97 Lt,
98 Ge,
99 Gt,
100}
101
102#[derive(Clone, Debug)]
103pub struct AsOfDesc {
104 pub left_idx: usize,
105 pub right_idx: usize,
106 pub inequality_type: AsOfInequalityType,
107}
108
109impl AsOfDesc {
110 pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> crate::error::Result<Self> {
111 let typ = match desc_proto.inequality_type() {
112 AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
113 AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
114 AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
115 AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
116 AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
117 bail!("unspecified AsOf join inequality type")
118 }
119 };
120 Ok(Self {
121 left_idx: desc_proto.left_idx as usize,
122 right_idx: desc_proto.right_idx as usize,
123 inequality_type: typ,
124 })
125 }
126}
127
128fn concatenate(left: &DataChunk, right: &DataChunk) -> Result<DataChunk> {
142 assert_eq!(left.capacity(), right.capacity());
143 let mut concated_columns = Vec::with_capacity(left.columns().len() + right.columns().len());
144 concated_columns.extend_from_slice(left.columns());
145 concated_columns.extend_from_slice(right.columns());
146 let vis = match (left.is_vis_compacted(), right.is_vis_compacted()) {
148 (true, _) => right.visibility().clone(),
149 (_, true) => left.visibility().clone(),
150 (false, false) => {
151 panic!("The concatenate behaviour of two chunk with visibility is undefined")
152 }
153 };
154 let data_chunk = DataChunk::new(concated_columns, vis);
155 Ok(data_chunk)
156}
157
158fn convert_datum_refs_to_chunk(
160 datum_refs: &[DatumRef<'_>],
161 num_tuples: usize,
162 data_types: &[DataType],
163) -> Result<DataChunk> {
164 let mut output_array_builders: Vec<_> = data_types
165 .iter()
166 .map(|data_type| data_type.create_array_builder(num_tuples))
167 .collect();
168 for _i in 0..num_tuples {
169 for (builder, datum_ref) in output_array_builders.iter_mut().zip_eq_fast(datum_refs) {
170 builder.append(*datum_ref);
171 }
172 }
173
174 let result_columns = output_array_builders
176 .into_iter()
177 .map(|b| b.finish().into())
178 .collect();
179
180 Ok(DataChunk::new(result_columns, num_tuples))
181}
182
183fn convert_row_to_chunk(
185 row_ref: &RowRef<'_>,
186 num_tuples: usize,
187 data_types: &[DataType],
188) -> Result<DataChunk> {
189 let datum_refs = row_ref.iter().collect_vec();
190 convert_datum_refs_to_chunk(&datum_refs, num_tuples, data_types)
191}
192
193#[cfg(test)]
194mod tests {
195
196 use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder};
197 use risingwave_common::bitmap::Bitmap;
198 use risingwave_common::catalog::{Field, Schema};
199 use risingwave_common::row::Row;
200 use risingwave_common::types::{DataType, ScalarRefImpl};
201
202 use crate::executor::join::{concatenate, convert_datum_refs_to_chunk};
203
204 #[test]
205 fn test_concatenate() {
206 let num_of_columns: usize = 2;
207 let length = 5;
208 let mut columns = vec![];
209 for i in 0..num_of_columns {
210 let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
211 for _ in 0..length {
212 builder.append(Some(i as i32));
213 }
214 let arr = builder.finish();
215 columns.push(arr.into_ref())
216 }
217 let chunk1 = DataChunk::new(columns.clone(), length);
218 let visibility = Bitmap::from_bool_slice(&[true, false, true, false, false]);
219 let chunk2 = DataChunk::new(columns.clone(), visibility.clone());
220 let chunk = concatenate(&chunk1, &chunk2).unwrap();
221 assert_eq!(chunk.capacity(), chunk1.capacity());
222 assert_eq!(chunk.capacity(), chunk2.capacity());
223 assert_eq!(chunk.columns().len(), chunk1.columns().len() * 2);
224 assert_eq!(chunk.visibility(), &visibility);
225 }
226
227 #[test]
229 fn test_convert_row_to_chunk() {
230 let row = vec![Some(ScalarRefImpl::Int32(3))];
231 let probe_side_schema = Schema {
232 fields: vec![Field::unnamed(DataType::Int32)],
233 };
234 let const_row_chunk =
235 convert_datum_refs_to_chunk(&row, 5, &probe_side_schema.data_types()).unwrap();
236 assert_eq!(const_row_chunk.capacity(), 5);
237 assert_eq!(
238 const_row_chunk.row_at(2).0.datum_at(0),
239 Some(ScalarRefImpl::Int32(3))
240 );
241 }
242}