risingwave_batch_executors/executor/join/
distributed_lookup_join.rs1use std::marker::PhantomData;
16use std::mem::swap;
17
18use futures::pin_mut;
19use itertools::Itertools;
20use risingwave_batch::task::ShutdownToken;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
23use risingwave_common::hash::{HashKey, HashKeyDispatcher, VnodeCountCompat};
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::scan_range::ScanRange;
30use risingwave_expr::expr::{BoxedExpression, build_from_prost};
31use risingwave_pb::batch_plan::plan_node::NodeBody;
32use risingwave_pb::common::BatchQueryEpoch;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37
38use super::AsOfDesc;
39use crate::error::Result;
40use crate::executor::join::JoinType;
41use crate::executor::{
42 AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor,
43 ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, unix_timestamp_sec_to_epoch,
44};
45
46pub struct DistributedLookupJoinExecutor<K> {
56 base: LookupJoinBase<K>,
57 _phantom: PhantomData<K>,
58}
59
60impl<K: HashKey> Executor for DistributedLookupJoinExecutor<K> {
61 fn schema(&self) -> &Schema {
62 &self.base.schema
63 }
64
65 fn identity(&self) -> &str {
66 &self.base.identity
67 }
68
69 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
70 Box::new(self.base).do_execute()
71 }
72}
73
74impl<K> DistributedLookupJoinExecutor<K> {
75 pub fn new(base: LookupJoinBase<K>) -> Self {
76 Self {
77 base,
78 _phantom: PhantomData,
79 }
80 }
81}
82
83pub struct DistributedLookupJoinExecutorBuilder {}
84
85impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
86 async fn new_boxed_executor(
87 source: &ExecutorBuilder<'_>,
88 inputs: Vec<BoxedExecutor>,
89 ) -> Result<BoxedExecutor> {
90 let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
91
92 let distributed_lookup_join_node = try_match_expand!(
93 source.plan_node().get_node_body().unwrap(),
94 NodeBody::DistributedLookupJoin
95 )?;
96
97 let as_of = distributed_lookup_join_node
99 .as_of
100 .as_ref()
101 .map(AsOf::try_from)
102 .transpose()?;
103 let query_epoch = as_of
104 .map(|a| {
105 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
106 tracing::debug!(epoch, "time travel");
107 risingwave_pb::common::BatchQueryEpoch {
108 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
109 epoch,
110 )),
111 }
112 })
113 .unwrap_or_else(|| source.epoch());
114
115 let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?);
116 let condition = match distributed_lookup_join_node.get_condition() {
117 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
118 Err(_) => None,
119 };
120
121 let output_indices: Vec<usize> = distributed_lookup_join_node
122 .get_output_indices()
123 .iter()
124 .map(|&x| x as usize)
125 .collect();
126
127 let outer_side_data_types = outer_side_input.schema().data_types();
128
129 let table_desc = distributed_lookup_join_node.get_inner_side_table_desc()?;
130 let inner_side_column_ids = distributed_lookup_join_node
131 .get_inner_side_column_ids()
132 .to_vec();
133
134 let inner_side_schema = Schema {
135 fields: inner_side_column_ids
136 .iter()
137 .map(|&id| {
138 let column = table_desc
139 .columns
140 .iter()
141 .find(|c| c.column_id == id)
142 .unwrap();
143 Field::from(&ColumnDesc::from(column))
144 })
145 .collect_vec(),
146 };
147
148 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
149 outer_side_input.schema().fields.clone()
150 } else {
151 [
152 outer_side_input.schema().fields.clone(),
153 inner_side_schema.fields.clone(),
154 ]
155 .concat()
156 };
157
158 let original_schema = Schema { fields };
159 let actual_schema = output_indices
160 .iter()
161 .map(|&idx| original_schema[idx].clone())
162 .collect();
163
164 let mut outer_side_key_idxs = vec![];
165 for outer_side_key in distributed_lookup_join_node.get_outer_side_key() {
166 outer_side_key_idxs.push(*outer_side_key as usize)
167 }
168
169 let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
170 .iter()
171 .map(|&i| outer_side_data_types[i].clone())
172 .collect_vec();
173
174 let lookup_prefix_len: usize =
175 distributed_lookup_join_node.get_lookup_prefix_len() as usize;
176
177 let mut inner_side_key_idxs = vec![];
178 for inner_side_key in distributed_lookup_join_node.get_inner_side_key() {
179 inner_side_key_idxs.push(*inner_side_key as usize)
180 }
181
182 let inner_side_key_types = inner_side_key_idxs
183 .iter()
184 .map(|&i| inner_side_schema.fields[i].data_type.clone())
185 .collect_vec();
186
187 let null_safe = distributed_lookup_join_node.get_null_safe().to_vec();
188
189 let chunk_size = source.context().get_config().developer.chunk_size;
190
191 let asof_desc = distributed_lookup_join_node
192 .asof_desc
193 .map(|desc| AsOfDesc::from_protobuf(&desc))
194 .transpose()?;
195
196 let column_ids = inner_side_column_ids
197 .iter()
198 .copied()
199 .map(ColumnId::from)
200 .collect();
201
202 let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
204
205 dispatch_state_store!(source.context().state_store(), state_store, {
206 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
207 let inner_side_builder = InnerSideExecutorBuilder::new(
208 outer_side_key_types,
209 inner_side_key_types.clone(),
210 lookup_prefix_len,
211 query_epoch,
212 vec![],
213 table,
214 chunk_size,
215 );
216
217 let identity = source.plan_node().get_identity().clone();
218
219 Ok(DistributedLookupJoinExecutorArgs {
220 join_type,
221 condition,
222 outer_side_input,
223 outer_side_data_types,
224 outer_side_key_idxs,
225 inner_side_builder: Box::new(inner_side_builder),
226 inner_side_key_types,
227 inner_side_key_idxs,
228 null_safe,
229 lookup_prefix_len,
230 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
231 schema: actual_schema,
232 output_indices,
233 chunk_size,
234 asof_desc,
235 identity: identity.clone(),
236 shutdown_rx: source.shutdown_rx().clone(),
237 mem_ctx: source.context().create_executor_mem_context(&identity),
238 }
239 .dispatch())
240 })
241 }
242}
243
244struct DistributedLookupJoinExecutorArgs {
245 join_type: JoinType,
246 condition: Option<BoxedExpression>,
247 outer_side_input: BoxedExecutor,
248 outer_side_data_types: Vec<DataType>,
249 outer_side_key_idxs: Vec<usize>,
250 inner_side_builder: Box<dyn LookupExecutorBuilder>,
251 inner_side_key_types: Vec<DataType>,
252 inner_side_key_idxs: Vec<usize>,
253 null_safe: Vec<bool>,
254 lookup_prefix_len: usize,
255 chunk_builder: DataChunkBuilder,
256 schema: Schema,
257 output_indices: Vec<usize>,
258 chunk_size: usize,
259 asof_desc: Option<AsOfDesc>,
260 identity: String,
261 shutdown_rx: ShutdownToken,
262 mem_ctx: MemoryContext,
263}
264
265impl HashKeyDispatcher for DistributedLookupJoinExecutorArgs {
266 type Output = BoxedExecutor;
267
268 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
269 Box::new(DistributedLookupJoinExecutor::<K>::new(
270 LookupJoinBase::<K> {
271 join_type: self.join_type,
272 condition: self.condition,
273 outer_side_input: self.outer_side_input,
274 outer_side_data_types: self.outer_side_data_types,
275 outer_side_key_idxs: self.outer_side_key_idxs,
276 inner_side_builder: self.inner_side_builder,
277 inner_side_key_types: self.inner_side_key_types,
278 inner_side_key_idxs: self.inner_side_key_idxs,
279 null_safe: self.null_safe,
280 lookup_prefix_len: self.lookup_prefix_len,
281 chunk_builder: self.chunk_builder,
282 schema: self.schema,
283 output_indices: self.output_indices,
284 chunk_size: self.chunk_size,
285 asof_desc: self.asof_desc,
286 identity: self.identity,
287 shutdown_rx: self.shutdown_rx,
288 mem_ctx: self.mem_ctx,
289 _phantom: PhantomData,
290 },
291 ))
292 }
293
294 fn data_types(&self) -> &[DataType] {
295 &self.inner_side_key_types
296 }
297}
298
299struct InnerSideExecutorBuilder<S: StateStore> {
301 outer_side_key_types: Vec<DataType>,
302 inner_side_key_types: Vec<DataType>,
303 lookup_prefix_len: usize,
304 epoch: BatchQueryEpoch,
305 row_list: Vec<OwnedRow>,
306 table: BatchTable<S>,
307 chunk_size: usize,
308}
309
310impl<S: StateStore> InnerSideExecutorBuilder<S> {
311 fn new(
312 outer_side_key_types: Vec<DataType>,
313 inner_side_key_types: Vec<DataType>,
314 lookup_prefix_len: usize,
315 epoch: BatchQueryEpoch,
316 row_list: Vec<OwnedRow>,
317 table: BatchTable<S>,
318 chunk_size: usize,
319 ) -> Self {
320 Self {
321 outer_side_key_types,
322 inner_side_key_types,
323 lookup_prefix_len,
324 epoch,
325 row_list,
326 table,
327 chunk_size,
328 }
329 }
330}
331
332#[async_trait::async_trait]
333impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
334 fn reset(&mut self) {
335 }
337
338 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
340 let mut scan_range = ScanRange::full_table_scan();
341
342 for ((datum, outer_type), inner_type) in key_datums
343 .into_iter()
344 .zip_eq_fast(
345 self.outer_side_key_types
346 .iter()
347 .take(self.lookup_prefix_len),
348 )
349 .zip_eq_fast(
350 self.inner_side_key_types
351 .iter()
352 .take(self.lookup_prefix_len),
353 )
354 {
355 let datum = if inner_type == outer_type {
356 datum
357 } else {
358 bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
359 };
360
361 scan_range.eq_conds.push(datum);
362 }
363
364 let pk_prefix = OwnedRow::new(scan_range.eq_conds);
365
366 if self.lookup_prefix_len == self.table.pk_indices().len() {
367 let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;
368
369 if let Some(row) = row {
370 self.row_list.push(row);
371 }
372 } else {
373 let iter = self
374 .table
375 .batch_iter_with_pk_bounds(
376 self.epoch.into(),
377 &pk_prefix,
378 ..,
379 false,
380 PrefetchOptions::default(),
381 )
382 .await?;
383
384 pin_mut!(iter);
385 while let Some(row) = iter.next_row().await? {
386 self.row_list.push(row);
387 }
388 }
389
390 Ok(())
391 }
392
393 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
395 let mut data_chunk_builder =
396 DataChunkBuilder::new(self.table.schema().data_types(), self.chunk_size);
397 let mut chunk_list = Vec::new();
398
399 let mut new_row_list = vec![];
400 swap(&mut new_row_list, &mut self.row_list);
401
402 for row in new_row_list {
403 if let Some(chunk) = data_chunk_builder.append_one_row(row) {
404 chunk_list.push(chunk);
405 }
406 }
407 if let Some(chunk) = data_chunk_builder.consume_all() {
408 chunk_list.push(chunk);
409 }
410
411 Ok(Box::new(BufferChunkExecutor::new(
412 self.table.schema().clone(),
413 chunk_list,
414 )))
415 }
416}