risingwave_batch_executors/executor/join/
distributed_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::mem::swap;
17
18use futures::pin_mut;
19use itertools::Itertools;
20use risingwave_batch::task::ShutdownToken;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
23use risingwave_common::hash::{HashKey, HashKeyDispatcher, VnodeCountCompat};
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::scan_range::ScanRange;
30use risingwave_expr::expr::{BoxedExpression, build_from_prost};
31use risingwave_pb::batch_plan::plan_node::NodeBody;
32use risingwave_pb::common::BatchQueryEpoch;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37
38use super::AsOfDesc;
39use crate::error::Result;
40use crate::executor::join::JoinType;
41use crate::executor::{
42    AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor,
43    ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, unix_timestamp_sec_to_epoch,
44};
45
46/// Distributed Lookup Join Executor.
47/// High level Execution flow:
48/// Repeat 1-3:
49///   1. Read N rows from outer side input and send keys to inner side builder after deduplication.
50///   2. Inner side input lookups inner side table with keys and builds hash map.
51///   3. Outer side rows join each inner side rows by probing the hash map.
52///
53/// Distributed lookup join already scheduled to its inner side corresponding compute node, so that
54/// it can just lookup the compute node locally without sending RPCs to other compute nodes.
55pub struct DistributedLookupJoinExecutor<K, S: StateStore> {
56    base: LookupJoinBase<K, InnerSideExecutorBuilder<S>>,
57}
58
59impl<K: HashKey, S: StateStore> Executor for DistributedLookupJoinExecutor<K, S> {
60    fn schema(&self) -> &Schema {
61        &self.base.schema
62    }
63
64    fn identity(&self) -> &str {
65        &self.base.identity
66    }
67
68    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
69        Box::new(self.base).do_execute()
70    }
71}
72
73impl<K, S: StateStore> DistributedLookupJoinExecutor<K, S> {
74    fn new(base: LookupJoinBase<K, InnerSideExecutorBuilder<S>>) -> Self {
75        Self { base }
76    }
77}
78
79pub struct DistributedLookupJoinExecutorBuilder {}
80
81impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
82    async fn new_boxed_executor(
83        source: &ExecutorBuilder<'_>,
84        inputs: Vec<BoxedExecutor>,
85    ) -> Result<BoxedExecutor> {
86        let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
87
88        let distributed_lookup_join_node = try_match_expand!(
89            source.plan_node().get_node_body().unwrap(),
90            NodeBody::DistributedLookupJoin
91        )?;
92
93        // as_of takes precedence
94        let as_of = distributed_lookup_join_node
95            .as_of
96            .as_ref()
97            .map(AsOf::try_from)
98            .transpose()?;
99        let query_epoch = as_of
100            .map(|a| {
101                let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
102                tracing::debug!(epoch, "time travel");
103                risingwave_pb::common::BatchQueryEpoch {
104                    epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
105                        epoch,
106                    )),
107                }
108            })
109            .unwrap_or_else(|| source.epoch());
110
111        let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?);
112        let condition = match distributed_lookup_join_node.get_condition() {
113            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
114            Err(_) => None,
115        };
116
117        let output_indices: Vec<usize> = distributed_lookup_join_node
118            .get_output_indices()
119            .iter()
120            .map(|&x| x as usize)
121            .collect();
122
123        let outer_side_data_types = outer_side_input.schema().data_types();
124
125        let table_desc = distributed_lookup_join_node.get_inner_side_table_desc()?;
126        let inner_side_column_ids = distributed_lookup_join_node
127            .get_inner_side_column_ids()
128            .to_vec();
129
130        let inner_side_schema = Schema {
131            fields: inner_side_column_ids
132                .iter()
133                .map(|&id| {
134                    let column = table_desc
135                        .columns
136                        .iter()
137                        .find(|c| c.column_id == id)
138                        .unwrap();
139                    Field::from(&ColumnDesc::from(column))
140                })
141                .collect_vec(),
142        };
143
144        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
145            outer_side_input.schema().fields.clone()
146        } else {
147            [
148                outer_side_input.schema().fields.clone(),
149                inner_side_schema.fields.clone(),
150            ]
151            .concat()
152        };
153
154        let original_schema = Schema { fields };
155        let actual_schema = output_indices
156            .iter()
157            .map(|&idx| original_schema[idx].clone())
158            .collect();
159
160        let mut outer_side_key_idxs = vec![];
161        for outer_side_key in distributed_lookup_join_node.get_outer_side_key() {
162            outer_side_key_idxs.push(*outer_side_key as usize)
163        }
164
165        let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
166            .iter()
167            .map(|&i| outer_side_data_types[i].clone())
168            .collect_vec();
169
170        let lookup_prefix_len: usize =
171            distributed_lookup_join_node.get_lookup_prefix_len() as usize;
172
173        let mut inner_side_key_idxs = vec![];
174        for inner_side_key in distributed_lookup_join_node.get_inner_side_key() {
175            inner_side_key_idxs.push(*inner_side_key as usize)
176        }
177
178        let inner_side_key_types = inner_side_key_idxs
179            .iter()
180            .map(|&i| inner_side_schema.fields[i].data_type.clone())
181            .collect_vec();
182
183        let null_safe = distributed_lookup_join_node.get_null_safe().to_vec();
184
185        let chunk_size = source.context().get_config().developer.chunk_size;
186
187        let asof_desc = distributed_lookup_join_node
188            .asof_desc
189            .map(|desc| AsOfDesc::from_protobuf(&desc))
190            .transpose()?;
191
192        let column_ids = inner_side_column_ids
193            .iter()
194            .copied()
195            .map(ColumnId::from)
196            .collect();
197
198        // Lookup Join always contains distribution key, so we don't need vnode bitmap
199        let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
200
201        dispatch_state_store!(source.context().state_store(), state_store, {
202            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
203            let inner_side_builder = InnerSideExecutorBuilder::new(
204                outer_side_key_types,
205                inner_side_key_types.clone(),
206                lookup_prefix_len,
207                query_epoch,
208                vec![],
209                table,
210                chunk_size,
211            );
212
213            let identity = source.plan_node().get_identity().clone();
214
215            Ok(DistributedLookupJoinExecutorArgs {
216                join_type,
217                condition,
218                outer_side_input,
219                outer_side_data_types,
220                outer_side_key_idxs,
221                inner_side_builder,
222                inner_side_key_types,
223                inner_side_key_idxs,
224                null_safe,
225                lookup_prefix_len,
226                chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
227                schema: actual_schema,
228                output_indices,
229                chunk_size,
230                asof_desc,
231                identity: identity.clone(),
232                shutdown_rx: source.shutdown_rx().clone(),
233                mem_ctx: source.context().create_executor_mem_context(&identity),
234            }
235            .dispatch())
236        })
237    }
238}
239
240struct DistributedLookupJoinExecutorArgs<S: StateStore> {
241    join_type: JoinType,
242    condition: Option<BoxedExpression>,
243    outer_side_input: BoxedExecutor,
244    outer_side_data_types: Vec<DataType>,
245    outer_side_key_idxs: Vec<usize>,
246    inner_side_builder: InnerSideExecutorBuilder<S>,
247    inner_side_key_types: Vec<DataType>,
248    inner_side_key_idxs: Vec<usize>,
249    null_safe: Vec<bool>,
250    lookup_prefix_len: usize,
251    chunk_builder: DataChunkBuilder,
252    schema: Schema,
253    output_indices: Vec<usize>,
254    chunk_size: usize,
255    asof_desc: Option<AsOfDesc>,
256    identity: String,
257    shutdown_rx: ShutdownToken,
258    mem_ctx: MemoryContext,
259}
260
261impl<S: StateStore> HashKeyDispatcher for DistributedLookupJoinExecutorArgs<S> {
262    type Output = BoxedExecutor;
263
264    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
265        Box::new(DistributedLookupJoinExecutor::<K, S>::new(LookupJoinBase {
266            join_type: self.join_type,
267            condition: self.condition,
268            outer_side_input: self.outer_side_input,
269            outer_side_data_types: self.outer_side_data_types,
270            outer_side_key_idxs: self.outer_side_key_idxs,
271            inner_side_builder: self.inner_side_builder,
272            inner_side_key_types: self.inner_side_key_types,
273            inner_side_key_idxs: self.inner_side_key_idxs,
274            null_safe: self.null_safe,
275            lookup_prefix_len: self.lookup_prefix_len,
276            chunk_builder: self.chunk_builder,
277            schema: self.schema,
278            output_indices: self.output_indices,
279            chunk_size: self.chunk_size,
280            asof_desc: self.asof_desc,
281            identity: self.identity,
282            shutdown_rx: self.shutdown_rx,
283            mem_ctx: self.mem_ctx,
284            _phantom: PhantomData,
285        }))
286    }
287
288    fn data_types(&self) -> &[DataType] {
289        &self.inner_side_key_types
290    }
291}
292
293/// Inner side executor builder for the `DistributedLookupJoinExecutor`
294struct InnerSideExecutorBuilder<S: StateStore> {
295    outer_side_key_types: Vec<DataType>,
296    inner_side_key_types: Vec<DataType>,
297    lookup_prefix_len: usize,
298    epoch: BatchQueryEpoch,
299    row_list: Vec<OwnedRow>,
300    table: BatchTable<S>,
301    chunk_size: usize,
302}
303
304impl<S: StateStore> InnerSideExecutorBuilder<S> {
305    fn new(
306        outer_side_key_types: Vec<DataType>,
307        inner_side_key_types: Vec<DataType>,
308        lookup_prefix_len: usize,
309        epoch: BatchQueryEpoch,
310        row_list: Vec<OwnedRow>,
311        table: BatchTable<S>,
312        chunk_size: usize,
313    ) -> Self {
314        Self {
315            outer_side_key_types,
316            inner_side_key_types,
317            lookup_prefix_len,
318            epoch,
319            row_list,
320            table,
321            chunk_size,
322        }
323    }
324}
325
326impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
327    fn reset(&mut self) {
328        // PASS
329    }
330
331    /// Fetch row from inner side table by the scan range added.
332    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
333        let mut scan_range = ScanRange::full_table_scan();
334
335        for ((datum, outer_type), inner_type) in key_datums
336            .into_iter()
337            .zip_eq_fast(
338                self.outer_side_key_types
339                    .iter()
340                    .take(self.lookup_prefix_len),
341            )
342            .zip_eq_fast(
343                self.inner_side_key_types
344                    .iter()
345                    .take(self.lookup_prefix_len),
346            )
347        {
348            let datum = if inner_type == outer_type {
349                datum
350            } else {
351                bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
352            };
353
354            scan_range.eq_conds.push(datum);
355        }
356
357        let pk_prefix = OwnedRow::new(scan_range.eq_conds);
358
359        if self.lookup_prefix_len == self.table.pk_indices().len() {
360            let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;
361
362            if let Some(row) = row {
363                self.row_list.push(row);
364            }
365        } else {
366            let iter = self
367                .table
368                .batch_iter_with_pk_bounds(
369                    self.epoch.into(),
370                    &pk_prefix,
371                    ..,
372                    false,
373                    PrefetchOptions::default(),
374                )
375                .await?;
376
377            pin_mut!(iter);
378            while let Some(row) = iter.next_row().await? {
379                self.row_list.push(row);
380            }
381        }
382
383        Ok(())
384    }
385
386    /// Build a `BufferChunkExecutor` to return all its rows fetched by `add_scan_range` before.
387    async fn build_executor(&mut self) -> Result<BoxedExecutor> {
388        let mut data_chunk_builder =
389            DataChunkBuilder::new(self.table.schema().data_types(), self.chunk_size);
390        let mut chunk_list = Vec::new();
391
392        let mut new_row_list = vec![];
393        swap(&mut new_row_list, &mut self.row_list);
394
395        for row in new_row_list {
396            if let Some(chunk) = data_chunk_builder.append_one_row(row) {
397                chunk_list.push(chunk);
398            }
399        }
400        if let Some(chunk) = data_chunk_builder.consume_all() {
401            chunk_list.push(chunk);
402        }
403
404        Ok(Box::new(BufferChunkExecutor::new(
405            self.table.schema().clone(),
406            chunk_list,
407        )))
408    }
409}