risingwave_batch_executors/executor/join/
lookup_join_base.rs1use std::marker::PhantomData;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::DataChunk;
21use risingwave_common::bitmap::FilterByBitmap;
22use risingwave_common::catalog::Schema;
23use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher};
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::row::Row;
26use risingwave_common::types::{DataType, ToOwnedDatum};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_expr::expr::BoxedExpression;
31
32use super::AsOfDesc;
33use crate::error::BatchError;
34use crate::executor::join::chunked_data::ChunkedData;
35use crate::executor::{
36 BoxedDataChunkListStream, BoxedExecutor, BufferChunkExecutor, EquiJoinParams, HashJoinExecutor,
37 JoinHashMap, JoinType, LookupExecutorBuilder, RowId, utils,
38};
39use crate::task::ShutdownToken;
40
41pub struct LookupJoinBase<K> {
44 pub join_type: JoinType,
45 pub condition: Option<BoxedExpression>,
46 pub outer_side_input: BoxedExecutor,
47 pub outer_side_data_types: Vec<DataType>, pub outer_side_key_idxs: Vec<usize>,
49 pub inner_side_builder: Box<dyn LookupExecutorBuilder>,
50 pub inner_side_key_types: Vec<DataType>, pub inner_side_key_idxs: Vec<usize>,
52 pub null_safe: Vec<bool>,
53 pub lookup_prefix_len: usize,
54 pub chunk_builder: DataChunkBuilder,
55 pub schema: Schema,
56 pub output_indices: Vec<usize>,
57 pub chunk_size: usize,
58 pub asof_desc: Option<AsOfDesc>,
59 pub identity: String,
60 pub shutdown_rx: ShutdownToken,
61 pub mem_ctx: MemoryContext,
62 pub _phantom: PhantomData<K>,
63}
64
65const AT_LEAST_OUTER_SIDE_ROWS: usize = 512;
66
67impl<K: HashKey> LookupJoinBase<K> {
68 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
75 pub async fn do_execute(mut self: Box<Self>) {
76 let outer_side_schema = self.outer_side_input.schema().clone();
77
78 let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
79
80 let mut outer_side_batch_read_stream: BoxedDataChunkListStream =
81 utils::batch_read(self.outer_side_input.execute(), AT_LEAST_OUTER_SIDE_ROWS);
82
83 while let Some(chunk_list) = outer_side_batch_read_stream.next().await {
84 let chunk_list = chunk_list?;
85
86 let groups = chunk_list
88 .iter()
89 .flat_map(|chunk| {
90 chunk.rows().map(|row| {
91 self.outer_side_key_idxs
92 .iter()
93 .take(self.lookup_prefix_len)
94 .map(|&idx| row.datum_at(idx).to_owned_datum())
95 .collect_vec()
96 })
97 })
98 .sorted_by(|a, b| cmp_datum_iter(a, b, std::iter::repeat(OrderType::default())))
99 .dedup()
100 .collect_vec();
101
102 self.inner_side_builder.reset();
103 for row_key in groups {
104 self.inner_side_builder.add_scan_range(row_key).await?;
105 }
106 let inner_side_input = self.inner_side_builder.build_executor().await?;
107
108 let hash_join_probe_side_input = Box::new(BufferChunkExecutor::new(
111 outer_side_schema.clone(),
112 chunk_list,
113 ));
114 let hash_join_build_side_input = inner_side_input;
115 let hash_join_probe_data_types = self.outer_side_data_types.clone();
116 let hash_join_build_data_types = hash_join_build_side_input.schema().data_types();
117 let hash_join_probe_side_key_idxs = self.outer_side_key_idxs.clone();
118 let hash_join_build_side_key_idxs = self.inner_side_key_idxs.clone();
119
120 let full_data_types = [
121 hash_join_probe_data_types.clone(),
122 hash_join_build_data_types.clone(),
123 ]
124 .concat();
125
126 let mut tmp_heap_size = 0i64;
129
130 let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
131 let mut build_row_count = 0;
132 #[for_await]
133 for build_chunk in hash_join_build_side_input.execute() {
134 let build_chunk = build_chunk?;
135 if build_chunk.cardinality() > 0 {
136 build_row_count += build_chunk.cardinality();
137 let chunk_estimated_heap_size = build_chunk.estimated_heap_size() as i64;
138 self.mem_ctx.add(chunk_estimated_heap_size);
139 tmp_heap_size += chunk_estimated_heap_size;
140 build_side.push(build_chunk);
141 }
142 }
143 let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
144 build_row_count,
145 PrecomputedBuildHasher,
146 self.mem_ctx.global_allocator(),
147 );
148 let mut next_build_row_with_same_key =
149 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
150
151 for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
153 let build_keys = K::build_many(&hash_join_build_side_key_idxs, build_chunk);
154
155 for (build_row_id, build_key) in build_keys
156 .into_iter()
157 .enumerate()
158 .filter_by_bitmap(build_chunk.visibility())
159 {
160 if build_key.null_bitmap().is_subset(&null_matched) {
163 let row_id = RowId::new(build_chunk_id, build_row_id);
164 let build_key_estimated_heap_size = build_key.estimated_heap_size() as i64;
165 self.mem_ctx.add(build_key_estimated_heap_size);
166 tmp_heap_size += build_key_estimated_heap_size;
167 next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
168 }
169 }
170 }
171
172 let params = EquiJoinParams::new(
173 hash_join_probe_side_input,
174 hash_join_probe_data_types,
175 hash_join_probe_side_key_idxs,
176 build_side,
177 hash_join_build_data_types,
178 full_data_types,
179 hash_map,
180 next_build_row_with_same_key,
181 self.chunk_size,
182 self.shutdown_rx.clone(),
183 self.asof_desc.clone(),
184 );
185
186 if let Some(cond) = self.condition.as_ref()
187 && !params.is_asof_join()
188 {
189 let stream = match self.join_type {
190 JoinType::Inner => {
191 HashJoinExecutor::do_inner_join_with_non_equi_condition(params, cond)
192 }
193 JoinType::LeftOuter => {
194 HashJoinExecutor::do_left_outer_join_with_non_equi_condition(params, cond)
195 }
196 JoinType::LeftSemi => {
197 HashJoinExecutor::do_left_semi_join_with_non_equi_condition(params, cond)
198 }
199 JoinType::LeftAnti => {
200 HashJoinExecutor::do_left_anti_join_with_non_equi_condition(params, cond)
201 }
202 JoinType::RightOuter
203 | JoinType::RightSemi
204 | JoinType::RightAnti
205 | JoinType::FullOuter
206 | JoinType::AsOfInner
207 | JoinType::AsOfLeftOuter => unimplemented!(),
208 };
209 let mut output_chunk_builder =
211 DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
212 #[for_await]
213 for chunk in stream {
214 for output_chunk in
215 output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
216 {
217 yield output_chunk
218 }
219 }
220 if let Some(output_chunk) = output_chunk_builder.consume_all() {
221 yield output_chunk
222 }
223 } else {
224 let stream = match self.join_type {
225 JoinType::Inner | JoinType::AsOfInner => {
226 HashJoinExecutor::do_inner_join(params)
227 }
228 JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
229 HashJoinExecutor::do_left_outer_join(params)
230 }
231 JoinType::LeftSemi => HashJoinExecutor::do_left_semi_anti_join::<false>(params),
232 JoinType::LeftAnti => HashJoinExecutor::do_left_semi_anti_join::<true>(params),
233 JoinType::RightOuter
234 | JoinType::RightSemi
235 | JoinType::RightAnti
236 | JoinType::FullOuter => unimplemented!(),
237 };
238 #[for_await]
239 for chunk in stream {
240 yield chunk?.project(&self.output_indices)
241 }
242 }
243
244 self.mem_ctx.add(-tmp_heap_size);
245 }
246 }
247}