risingwave_batch_executors/executor/join/
distributed_lookup_join.rs1use std::marker::PhantomData;
16use std::mem::swap;
17
18use futures::pin_mut;
19use itertools::Itertools;
20use risingwave_batch::task::ShutdownToken;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
23use risingwave_common::hash::{HashKey, HashKeyDispatcher, VnodeCountCompat};
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::scan_range::ScanRange;
30use risingwave_expr::expr::{BoxedExpression, build_from_prost};
31use risingwave_pb::batch_plan::plan_node::NodeBody;
32use risingwave_pb::common::BatchQueryEpoch;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37
38use super::AsOfDesc;
39use crate::error::Result;
40use crate::executor::join::JoinType;
41use crate::executor::{
42 AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor,
43 ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, unix_timestamp_sec_to_epoch,
44};
45
46pub struct DistributedLookupJoinExecutor<K, S: StateStore> {
56 base: LookupJoinBase<K, InnerSideExecutorBuilder<S>>,
57}
58
59impl<K: HashKey, S: StateStore> Executor for DistributedLookupJoinExecutor<K, S> {
60 fn schema(&self) -> &Schema {
61 &self.base.schema
62 }
63
64 fn identity(&self) -> &str {
65 &self.base.identity
66 }
67
68 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
69 Box::new(self.base).do_execute()
70 }
71}
72
73impl<K, S: StateStore> DistributedLookupJoinExecutor<K, S> {
74 fn new(base: LookupJoinBase<K, InnerSideExecutorBuilder<S>>) -> Self {
75 Self { base }
76 }
77}
78
79pub struct DistributedLookupJoinExecutorBuilder {}
80
81impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
82 async fn new_boxed_executor(
83 source: &ExecutorBuilder<'_>,
84 inputs: Vec<BoxedExecutor>,
85 ) -> Result<BoxedExecutor> {
86 let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
87
88 let distributed_lookup_join_node = try_match_expand!(
89 source.plan_node().get_node_body().unwrap(),
90 NodeBody::DistributedLookupJoin
91 )?;
92
93 let as_of = distributed_lookup_join_node
95 .as_of
96 .as_ref()
97 .map(AsOf::try_from)
98 .transpose()?;
99 let query_epoch = as_of
100 .map(|a| {
101 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
102 tracing::debug!(epoch, "time travel");
103 risingwave_pb::common::BatchQueryEpoch {
104 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
105 epoch,
106 )),
107 }
108 })
109 .unwrap_or_else(|| source.epoch());
110
111 let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?);
112 let condition = match distributed_lookup_join_node.get_condition() {
113 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
114 Err(_) => None,
115 };
116
117 let output_indices: Vec<usize> = distributed_lookup_join_node
118 .get_output_indices()
119 .iter()
120 .map(|&x| x as usize)
121 .collect();
122
123 let outer_side_data_types = outer_side_input.schema().data_types();
124
125 let table_desc = distributed_lookup_join_node.get_inner_side_table_desc()?;
126 let inner_side_column_ids = distributed_lookup_join_node
127 .get_inner_side_column_ids()
128 .to_vec();
129
130 let inner_side_schema = Schema {
131 fields: inner_side_column_ids
132 .iter()
133 .map(|&id| {
134 let column = table_desc
135 .columns
136 .iter()
137 .find(|c| c.column_id == id)
138 .unwrap();
139 Field::from(&ColumnDesc::from(column))
140 })
141 .collect_vec(),
142 };
143
144 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
145 outer_side_input.schema().fields.clone()
146 } else {
147 [
148 outer_side_input.schema().fields.clone(),
149 inner_side_schema.fields.clone(),
150 ]
151 .concat()
152 };
153
154 let original_schema = Schema { fields };
155 let actual_schema = output_indices
156 .iter()
157 .map(|&idx| original_schema[idx].clone())
158 .collect();
159
160 let mut outer_side_key_idxs = vec![];
161 for outer_side_key in distributed_lookup_join_node.get_outer_side_key() {
162 outer_side_key_idxs.push(*outer_side_key as usize)
163 }
164
165 let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
166 .iter()
167 .map(|&i| outer_side_data_types[i].clone())
168 .collect_vec();
169
170 let lookup_prefix_len: usize =
171 distributed_lookup_join_node.get_lookup_prefix_len() as usize;
172
173 let mut inner_side_key_idxs = vec![];
174 for inner_side_key in distributed_lookup_join_node.get_inner_side_key() {
175 inner_side_key_idxs.push(*inner_side_key as usize)
176 }
177
178 let inner_side_key_types = inner_side_key_idxs
179 .iter()
180 .map(|&i| inner_side_schema.fields[i].data_type.clone())
181 .collect_vec();
182
183 let null_safe = distributed_lookup_join_node.get_null_safe().to_vec();
184
185 let chunk_size = source.context().get_config().developer.chunk_size;
186
187 let asof_desc = distributed_lookup_join_node
188 .asof_desc
189 .map(|desc| AsOfDesc::from_protobuf(&desc))
190 .transpose()?;
191
192 let column_ids = inner_side_column_ids
193 .iter()
194 .copied()
195 .map(ColumnId::from)
196 .collect();
197
198 let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
200
201 dispatch_state_store!(source.context().state_store(), state_store, {
202 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
203 let inner_side_builder = InnerSideExecutorBuilder::new(
204 outer_side_key_types,
205 inner_side_key_types.clone(),
206 lookup_prefix_len,
207 query_epoch,
208 vec![],
209 table,
210 chunk_size,
211 );
212
213 let identity = source.plan_node().get_identity().clone();
214
215 Ok(DistributedLookupJoinExecutorArgs {
216 join_type,
217 condition,
218 outer_side_input,
219 outer_side_data_types,
220 outer_side_key_idxs,
221 inner_side_builder,
222 inner_side_key_types,
223 inner_side_key_idxs,
224 null_safe,
225 lookup_prefix_len,
226 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
227 schema: actual_schema,
228 output_indices,
229 chunk_size,
230 asof_desc,
231 identity: identity.clone(),
232 shutdown_rx: source.shutdown_rx().clone(),
233 mem_ctx: source.context().create_executor_mem_context(&identity),
234 }
235 .dispatch())
236 })
237 }
238}
239
240struct DistributedLookupJoinExecutorArgs<S: StateStore> {
241 join_type: JoinType,
242 condition: Option<BoxedExpression>,
243 outer_side_input: BoxedExecutor,
244 outer_side_data_types: Vec<DataType>,
245 outer_side_key_idxs: Vec<usize>,
246 inner_side_builder: InnerSideExecutorBuilder<S>,
247 inner_side_key_types: Vec<DataType>,
248 inner_side_key_idxs: Vec<usize>,
249 null_safe: Vec<bool>,
250 lookup_prefix_len: usize,
251 chunk_builder: DataChunkBuilder,
252 schema: Schema,
253 output_indices: Vec<usize>,
254 chunk_size: usize,
255 asof_desc: Option<AsOfDesc>,
256 identity: String,
257 shutdown_rx: ShutdownToken,
258 mem_ctx: MemoryContext,
259}
260
261impl<S: StateStore> HashKeyDispatcher for DistributedLookupJoinExecutorArgs<S> {
262 type Output = BoxedExecutor;
263
264 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
265 Box::new(DistributedLookupJoinExecutor::<K, S>::new(LookupJoinBase {
266 join_type: self.join_type,
267 condition: self.condition,
268 outer_side_input: self.outer_side_input,
269 outer_side_data_types: self.outer_side_data_types,
270 outer_side_key_idxs: self.outer_side_key_idxs,
271 inner_side_builder: self.inner_side_builder,
272 inner_side_key_types: self.inner_side_key_types,
273 inner_side_key_idxs: self.inner_side_key_idxs,
274 null_safe: self.null_safe,
275 lookup_prefix_len: self.lookup_prefix_len,
276 chunk_builder: self.chunk_builder,
277 schema: self.schema,
278 output_indices: self.output_indices,
279 chunk_size: self.chunk_size,
280 asof_desc: self.asof_desc,
281 identity: self.identity,
282 shutdown_rx: self.shutdown_rx,
283 mem_ctx: self.mem_ctx,
284 _phantom: PhantomData,
285 }))
286 }
287
288 fn data_types(&self) -> &[DataType] {
289 &self.inner_side_key_types
290 }
291}
292
293struct InnerSideExecutorBuilder<S: StateStore> {
295 outer_side_key_types: Vec<DataType>,
296 inner_side_key_types: Vec<DataType>,
297 lookup_prefix_len: usize,
298 epoch: BatchQueryEpoch,
299 row_list: Vec<OwnedRow>,
300 table: BatchTable<S>,
301 chunk_size: usize,
302}
303
304impl<S: StateStore> InnerSideExecutorBuilder<S> {
305 fn new(
306 outer_side_key_types: Vec<DataType>,
307 inner_side_key_types: Vec<DataType>,
308 lookup_prefix_len: usize,
309 epoch: BatchQueryEpoch,
310 row_list: Vec<OwnedRow>,
311 table: BatchTable<S>,
312 chunk_size: usize,
313 ) -> Self {
314 Self {
315 outer_side_key_types,
316 inner_side_key_types,
317 lookup_prefix_len,
318 epoch,
319 row_list,
320 table,
321 chunk_size,
322 }
323 }
324}
325
326impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
327 fn reset(&mut self) {
328 }
330
331 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
333 let mut scan_range = ScanRange::full_table_scan();
334
335 for ((datum, outer_type), inner_type) in key_datums
336 .into_iter()
337 .zip_eq_fast(
338 self.outer_side_key_types
339 .iter()
340 .take(self.lookup_prefix_len),
341 )
342 .zip_eq_fast(
343 self.inner_side_key_types
344 .iter()
345 .take(self.lookup_prefix_len),
346 )
347 {
348 let datum = if inner_type == outer_type {
349 datum
350 } else {
351 bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
352 };
353
354 scan_range.eq_conds.push(datum);
355 }
356
357 let pk_prefix = OwnedRow::new(scan_range.eq_conds);
358
359 if self.lookup_prefix_len == self.table.pk_indices().len() {
360 let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;
361
362 if let Some(row) = row {
363 self.row_list.push(row);
364 }
365 } else {
366 let iter = self
367 .table
368 .batch_iter_with_pk_bounds(
369 self.epoch.into(),
370 &pk_prefix,
371 ..,
372 false,
373 PrefetchOptions::default(),
374 )
375 .await?;
376
377 pin_mut!(iter);
378 while let Some(row) = iter.next_row().await? {
379 self.row_list.push(row);
380 }
381 }
382
383 Ok(())
384 }
385
386 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
388 let mut data_chunk_builder =
389 DataChunkBuilder::new(self.table.schema().data_types(), self.chunk_size);
390 let mut chunk_list = Vec::new();
391
392 let mut new_row_list = vec![];
393 swap(&mut new_row_list, &mut self.row_list);
394
395 for row in new_row_list {
396 if let Some(chunk) = data_chunk_builder.append_one_row(row) {
397 chunk_list.push(chunk);
398 }
399 }
400 if let Some(chunk) = data_chunk_builder.consume_all() {
401 chunk_list.push(chunk);
402 }
403
404 Ok(Box::new(BufferChunkExecutor::new(
405 self.table.schema().clone(),
406 chunk_list,
407 )))
408 }
409}