risingwave_batch_executors/executor/join/
distributed_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::mem::swap;
17
18use futures::pin_mut;
19use itertools::Itertools;
20use risingwave_batch::task::ShutdownToken;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
23use risingwave_common::hash::{HashKey, HashKeyDispatcher, VnodeCountCompat};
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::scan_range::ScanRange;
30use risingwave_expr::expr::{BoxedExpression, build_from_prost};
31use risingwave_pb::batch_plan::plan_node::NodeBody;
32use risingwave_pb::common::BatchQueryEpoch;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37
38use super::AsOfDesc;
39use crate::error::Result;
40use crate::executor::join::JoinType;
41use crate::executor::{
42    AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor,
43    ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, unix_timestamp_sec_to_epoch,
44};
45
46/// Distributed Lookup Join Executor.
47/// High level Execution flow:
48/// Repeat 1-3:
49///   1. Read N rows from outer side input and send keys to inner side builder after deduplication.
50///   2. Inner side input lookups inner side table with keys and builds hash map.
51///   3. Outer side rows join each inner side rows by probing the hash map.
52///
53/// Distributed lookup join already scheduled to its inner side corresponding compute node, so that
54/// it can just lookup the compute node locally without sending RPCs to other compute nodes.
55pub struct DistributedLookupJoinExecutor<K> {
56    base: LookupJoinBase<K>,
57    _phantom: PhantomData<K>,
58}
59
60impl<K: HashKey> Executor for DistributedLookupJoinExecutor<K> {
61    fn schema(&self) -> &Schema {
62        &self.base.schema
63    }
64
65    fn identity(&self) -> &str {
66        &self.base.identity
67    }
68
69    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
70        Box::new(self.base).do_execute()
71    }
72}
73
74impl<K> DistributedLookupJoinExecutor<K> {
75    pub fn new(base: LookupJoinBase<K>) -> Self {
76        Self {
77            base,
78            _phantom: PhantomData,
79        }
80    }
81}
82
83pub struct DistributedLookupJoinExecutorBuilder {}
84
85impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
86    async fn new_boxed_executor(
87        source: &ExecutorBuilder<'_>,
88        inputs: Vec<BoxedExecutor>,
89    ) -> Result<BoxedExecutor> {
90        let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
91
92        let distributed_lookup_join_node = try_match_expand!(
93            source.plan_node().get_node_body().unwrap(),
94            NodeBody::DistributedLookupJoin
95        )?;
96
97        // as_of takes precedence
98        let as_of = distributed_lookup_join_node
99            .as_of
100            .as_ref()
101            .map(AsOf::try_from)
102            .transpose()?;
103        let query_epoch = as_of
104            .map(|a| {
105                let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
106                tracing::debug!(epoch, "time travel");
107                risingwave_pb::common::BatchQueryEpoch {
108                    epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
109                        epoch,
110                    )),
111                }
112            })
113            .unwrap_or_else(|| source.epoch());
114
115        let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?);
116        let condition = match distributed_lookup_join_node.get_condition() {
117            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
118            Err(_) => None,
119        };
120
121        let output_indices: Vec<usize> = distributed_lookup_join_node
122            .get_output_indices()
123            .iter()
124            .map(|&x| x as usize)
125            .collect();
126
127        let outer_side_data_types = outer_side_input.schema().data_types();
128
129        let table_desc = distributed_lookup_join_node.get_inner_side_table_desc()?;
130        let inner_side_column_ids = distributed_lookup_join_node
131            .get_inner_side_column_ids()
132            .to_vec();
133
134        let inner_side_schema = Schema {
135            fields: inner_side_column_ids
136                .iter()
137                .map(|&id| {
138                    let column = table_desc
139                        .columns
140                        .iter()
141                        .find(|c| c.column_id == id)
142                        .unwrap();
143                    Field::from(&ColumnDesc::from(column))
144                })
145                .collect_vec(),
146        };
147
148        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
149            outer_side_input.schema().fields.clone()
150        } else {
151            [
152                outer_side_input.schema().fields.clone(),
153                inner_side_schema.fields.clone(),
154            ]
155            .concat()
156        };
157
158        let original_schema = Schema { fields };
159        let actual_schema = output_indices
160            .iter()
161            .map(|&idx| original_schema[idx].clone())
162            .collect();
163
164        let mut outer_side_key_idxs = vec![];
165        for outer_side_key in distributed_lookup_join_node.get_outer_side_key() {
166            outer_side_key_idxs.push(*outer_side_key as usize)
167        }
168
169        let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
170            .iter()
171            .map(|&i| outer_side_data_types[i].clone())
172            .collect_vec();
173
174        let lookup_prefix_len: usize =
175            distributed_lookup_join_node.get_lookup_prefix_len() as usize;
176
177        let mut inner_side_key_idxs = vec![];
178        for inner_side_key in distributed_lookup_join_node.get_inner_side_key() {
179            inner_side_key_idxs.push(*inner_side_key as usize)
180        }
181
182        let inner_side_key_types = inner_side_key_idxs
183            .iter()
184            .map(|&i| inner_side_schema.fields[i].data_type.clone())
185            .collect_vec();
186
187        let null_safe = distributed_lookup_join_node.get_null_safe().to_vec();
188
189        let chunk_size = source.context().get_config().developer.chunk_size;
190
191        let asof_desc = distributed_lookup_join_node
192            .asof_desc
193            .map(|desc| AsOfDesc::from_protobuf(&desc))
194            .transpose()?;
195
196        let column_ids = inner_side_column_ids
197            .iter()
198            .copied()
199            .map(ColumnId::from)
200            .collect();
201
202        // Lookup Join always contains distribution key, so we don't need vnode bitmap
203        let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
204
205        dispatch_state_store!(source.context().state_store(), state_store, {
206            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
207            let inner_side_builder = InnerSideExecutorBuilder::new(
208                outer_side_key_types,
209                inner_side_key_types.clone(),
210                lookup_prefix_len,
211                query_epoch,
212                vec![],
213                table,
214                chunk_size,
215            );
216
217            let identity = source.plan_node().get_identity().clone();
218
219            Ok(DistributedLookupJoinExecutorArgs {
220                join_type,
221                condition,
222                outer_side_input,
223                outer_side_data_types,
224                outer_side_key_idxs,
225                inner_side_builder: Box::new(inner_side_builder),
226                inner_side_key_types,
227                inner_side_key_idxs,
228                null_safe,
229                lookup_prefix_len,
230                chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
231                schema: actual_schema,
232                output_indices,
233                chunk_size,
234                asof_desc,
235                identity: identity.clone(),
236                shutdown_rx: source.shutdown_rx().clone(),
237                mem_ctx: source.context().create_executor_mem_context(&identity),
238            }
239            .dispatch())
240        })
241    }
242}
243
244struct DistributedLookupJoinExecutorArgs {
245    join_type: JoinType,
246    condition: Option<BoxedExpression>,
247    outer_side_input: BoxedExecutor,
248    outer_side_data_types: Vec<DataType>,
249    outer_side_key_idxs: Vec<usize>,
250    inner_side_builder: Box<dyn LookupExecutorBuilder>,
251    inner_side_key_types: Vec<DataType>,
252    inner_side_key_idxs: Vec<usize>,
253    null_safe: Vec<bool>,
254    lookup_prefix_len: usize,
255    chunk_builder: DataChunkBuilder,
256    schema: Schema,
257    output_indices: Vec<usize>,
258    chunk_size: usize,
259    asof_desc: Option<AsOfDesc>,
260    identity: String,
261    shutdown_rx: ShutdownToken,
262    mem_ctx: MemoryContext,
263}
264
265impl HashKeyDispatcher for DistributedLookupJoinExecutorArgs {
266    type Output = BoxedExecutor;
267
268    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
269        Box::new(DistributedLookupJoinExecutor::<K>::new(
270            LookupJoinBase::<K> {
271                join_type: self.join_type,
272                condition: self.condition,
273                outer_side_input: self.outer_side_input,
274                outer_side_data_types: self.outer_side_data_types,
275                outer_side_key_idxs: self.outer_side_key_idxs,
276                inner_side_builder: self.inner_side_builder,
277                inner_side_key_types: self.inner_side_key_types,
278                inner_side_key_idxs: self.inner_side_key_idxs,
279                null_safe: self.null_safe,
280                lookup_prefix_len: self.lookup_prefix_len,
281                chunk_builder: self.chunk_builder,
282                schema: self.schema,
283                output_indices: self.output_indices,
284                chunk_size: self.chunk_size,
285                asof_desc: self.asof_desc,
286                identity: self.identity,
287                shutdown_rx: self.shutdown_rx,
288                mem_ctx: self.mem_ctx,
289                _phantom: PhantomData,
290            },
291        ))
292    }
293
294    fn data_types(&self) -> &[DataType] {
295        &self.inner_side_key_types
296    }
297}
298
299/// Inner side executor builder for the `DistributedLookupJoinExecutor`
300struct InnerSideExecutorBuilder<S: StateStore> {
301    outer_side_key_types: Vec<DataType>,
302    inner_side_key_types: Vec<DataType>,
303    lookup_prefix_len: usize,
304    epoch: BatchQueryEpoch,
305    row_list: Vec<OwnedRow>,
306    table: BatchTable<S>,
307    chunk_size: usize,
308}
309
310impl<S: StateStore> InnerSideExecutorBuilder<S> {
311    fn new(
312        outer_side_key_types: Vec<DataType>,
313        inner_side_key_types: Vec<DataType>,
314        lookup_prefix_len: usize,
315        epoch: BatchQueryEpoch,
316        row_list: Vec<OwnedRow>,
317        table: BatchTable<S>,
318        chunk_size: usize,
319    ) -> Self {
320        Self {
321            outer_side_key_types,
322            inner_side_key_types,
323            lookup_prefix_len,
324            epoch,
325            row_list,
326            table,
327            chunk_size,
328        }
329    }
330}
331
332#[async_trait::async_trait]
333impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
334    fn reset(&mut self) {
335        // PASS
336    }
337
338    /// Fetch row from inner side table by the scan range added.
339    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
340        let mut scan_range = ScanRange::full_table_scan();
341
342        for ((datum, outer_type), inner_type) in key_datums
343            .into_iter()
344            .zip_eq_fast(
345                self.outer_side_key_types
346                    .iter()
347                    .take(self.lookup_prefix_len),
348            )
349            .zip_eq_fast(
350                self.inner_side_key_types
351                    .iter()
352                    .take(self.lookup_prefix_len),
353            )
354        {
355            let datum = if inner_type == outer_type {
356                datum
357            } else {
358                bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
359            };
360
361            scan_range.eq_conds.push(datum);
362        }
363
364        let pk_prefix = OwnedRow::new(scan_range.eq_conds);
365
366        if self.lookup_prefix_len == self.table.pk_indices().len() {
367            let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;
368
369            if let Some(row) = row {
370                self.row_list.push(row);
371            }
372        } else {
373            let iter = self
374                .table
375                .batch_iter_with_pk_bounds(
376                    self.epoch.into(),
377                    &pk_prefix,
378                    ..,
379                    false,
380                    PrefetchOptions::default(),
381                )
382                .await?;
383
384            pin_mut!(iter);
385            while let Some(row) = iter.next_row().await? {
386                self.row_list.push(row);
387            }
388        }
389
390        Ok(())
391    }
392
393    /// Build a `BufferChunkExecutor` to return all its rows fetched by `add_scan_range` before.
394    async fn build_executor(&mut self) -> Result<BoxedExecutor> {
395        let mut data_chunk_builder =
396            DataChunkBuilder::new(self.table.schema().data_types(), self.chunk_size);
397        let mut chunk_list = Vec::new();
398
399        let mut new_row_list = vec![];
400        swap(&mut new_row_list, &mut self.row_list);
401
402        for row in new_row_list {
403            if let Some(chunk) = data_chunk_builder.append_one_row(row) {
404                chunk_list.push(chunk);
405            }
406        }
407        if let Some(chunk) = data_chunk_builder.consume_all() {
408            chunk_list.push(chunk);
409        }
410
411        Ok(Box::new(BufferChunkExecutor::new(
412            self.table.schema().clone(),
413            chunk_list,
414        )))
415    }
416}