risingwave_batch_executors/executor/join/
lookup_join_base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::DataChunk;
21use risingwave_common::bitmap::FilterByBitmap;
22use risingwave_common::catalog::Schema;
23use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher};
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::row::Row;
26use risingwave_common::types::{DataType, ToOwnedDatum};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_expr::expr::BoxedExpression;
31
32use super::AsOfDesc;
33use crate::error::BatchError;
34use crate::executor::join::chunked_data::ChunkedData;
35use crate::executor::{
36    BoxedDataChunkListStream, BoxedExecutor, BufferChunkExecutor, EquiJoinParams, HashJoinExecutor,
37    JoinHashMap, JoinType, LookupExecutorBuilder, RowId, utils,
38};
39use crate::task::ShutdownToken;
40
41/// Lookup Join Base.
42/// Used by `LocalLookupJoinExecutor` and `DistributedLookupJoinExecutor`.
43pub struct LookupJoinBase<K> {
44    pub join_type: JoinType,
45    pub condition: Option<BoxedExpression>,
46    pub outer_side_input: BoxedExecutor,
47    pub outer_side_data_types: Vec<DataType>, // Data types of all columns of outer side table
48    pub outer_side_key_idxs: Vec<usize>,
49    pub inner_side_builder: Box<dyn LookupExecutorBuilder>,
50    pub inner_side_key_types: Vec<DataType>, // Data types only of key columns of inner side table
51    pub inner_side_key_idxs: Vec<usize>,
52    pub null_safe: Vec<bool>,
53    pub lookup_prefix_len: usize,
54    pub chunk_builder: DataChunkBuilder,
55    pub schema: Schema,
56    pub output_indices: Vec<usize>,
57    pub chunk_size: usize,
58    pub asof_desc: Option<AsOfDesc>,
59    pub identity: String,
60    pub shutdown_rx: ShutdownToken,
61    pub mem_ctx: MemoryContext,
62    pub _phantom: PhantomData<K>,
63}
64
65const AT_LEAST_OUTER_SIDE_ROWS: usize = 512;
66
67impl<K: HashKey> LookupJoinBase<K> {
68    /// High level Execution flow:
69    /// Repeat 1-3:
70    ///   1. Read N rows from outer side input and send keys to inner side builder after
71    ///      deduplication.
72    ///   2. Inner side input lookups inner side table with keys and builds hash map.
73    ///   3. Outer side rows join each inner side rows by probing the hash map.
74    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
75    pub async fn do_execute(mut self: Box<Self>) {
76        let outer_side_schema = self.outer_side_input.schema().clone();
77
78        let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
79
80        let mut outer_side_batch_read_stream: BoxedDataChunkListStream =
81            utils::batch_read(self.outer_side_input.execute(), AT_LEAST_OUTER_SIDE_ROWS);
82
83        while let Some(chunk_list) = outer_side_batch_read_stream.next().await {
84            let chunk_list = chunk_list?;
85
86            // Group rows with the same key datums together
87            let groups = chunk_list
88                .iter()
89                .flat_map(|chunk| {
90                    chunk.rows().map(|row| {
91                        self.outer_side_key_idxs
92                            .iter()
93                            .take(self.lookup_prefix_len)
94                            .map(|&idx| row.datum_at(idx).to_owned_datum())
95                            .collect_vec()
96                    })
97                })
98                .sorted_by(|a, b| cmp_datum_iter(a, b, std::iter::repeat(OrderType::default())))
99                .dedup()
100                .collect_vec();
101
102            self.inner_side_builder.reset();
103            for row_key in groups {
104                self.inner_side_builder.add_scan_range(row_key).await?;
105            }
106            let inner_side_input = self.inner_side_builder.build_executor().await?;
107
108            // Lookup join outer side will become the probe side of hash join,
109            // while its inner side will become the build side of hash join.
110            let hash_join_probe_side_input = Box::new(BufferChunkExecutor::new(
111                outer_side_schema.clone(),
112                chunk_list,
113            ));
114            let hash_join_build_side_input = inner_side_input;
115            let hash_join_probe_data_types = self.outer_side_data_types.clone();
116            let hash_join_build_data_types = hash_join_build_side_input.schema().data_types();
117            let hash_join_probe_side_key_idxs = self.outer_side_key_idxs.clone();
118            let hash_join_build_side_key_idxs = self.inner_side_key_idxs.clone();
119
120            let full_data_types = [
121                hash_join_probe_data_types.clone(),
122                hash_join_build_data_types.clone(),
123            ]
124            .concat();
125
126            // We need to temporary variable to record heap size, since in each loop we
127            // will free build side hash map, and the subtraction is not executed automatically.
128            let mut tmp_heap_size = 0i64;
129
130            let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
131            let mut build_row_count = 0;
132            #[for_await]
133            for build_chunk in hash_join_build_side_input.execute() {
134                let build_chunk = build_chunk?;
135                if build_chunk.cardinality() > 0 {
136                    build_row_count += build_chunk.cardinality();
137                    let chunk_estimated_heap_size = build_chunk.estimated_heap_size() as i64;
138                    self.mem_ctx.add(chunk_estimated_heap_size);
139                    tmp_heap_size += chunk_estimated_heap_size;
140                    build_side.push(build_chunk);
141                }
142            }
143            let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
144                build_row_count,
145                PrecomputedBuildHasher,
146                self.mem_ctx.global_allocator(),
147            );
148            let mut next_build_row_with_same_key =
149                ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
150
151            // Build hash map
152            for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
153                let build_keys = K::build_many(&hash_join_build_side_key_idxs, build_chunk);
154
155                for (build_row_id, build_key) in build_keys
156                    .into_iter()
157                    .enumerate()
158                    .filter_by_bitmap(build_chunk.visibility())
159                {
160                    // Only insert key to hash map if it is consistent with the null safe
161                    // restriction.
162                    if build_key.null_bitmap().is_subset(&null_matched) {
163                        let row_id = RowId::new(build_chunk_id, build_row_id);
164                        let build_key_estimated_heap_size = build_key.estimated_heap_size() as i64;
165                        self.mem_ctx.add(build_key_estimated_heap_size);
166                        tmp_heap_size += build_key_estimated_heap_size;
167                        next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
168                    }
169                }
170            }
171
172            let params = EquiJoinParams::new(
173                hash_join_probe_side_input,
174                hash_join_probe_data_types,
175                hash_join_probe_side_key_idxs,
176                build_side,
177                hash_join_build_data_types,
178                full_data_types,
179                hash_map,
180                next_build_row_with_same_key,
181                self.chunk_size,
182                self.shutdown_rx.clone(),
183                self.asof_desc.clone(),
184            );
185
186            if let Some(cond) = self.condition.as_ref()
187                && !params.is_asof_join()
188            {
189                let stream = match self.join_type {
190                    JoinType::Inner => {
191                        HashJoinExecutor::do_inner_join_with_non_equi_condition(params, cond)
192                    }
193                    JoinType::LeftOuter => {
194                        HashJoinExecutor::do_left_outer_join_with_non_equi_condition(params, cond)
195                    }
196                    JoinType::LeftSemi => {
197                        HashJoinExecutor::do_left_semi_join_with_non_equi_condition(params, cond)
198                    }
199                    JoinType::LeftAnti => {
200                        HashJoinExecutor::do_left_anti_join_with_non_equi_condition(params, cond)
201                    }
202                    JoinType::RightOuter
203                    | JoinType::RightSemi
204                    | JoinType::RightAnti
205                    | JoinType::FullOuter
206                    | JoinType::AsOfInner
207                    | JoinType::AsOfLeftOuter => unimplemented!(),
208                };
209                // For non-equi join, we need an output chunk builder to align the output chunks.
210                let mut output_chunk_builder =
211                    DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
212                #[for_await]
213                for chunk in stream {
214                    for output_chunk in
215                        output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
216                    {
217                        yield output_chunk
218                    }
219                }
220                if let Some(output_chunk) = output_chunk_builder.consume_all() {
221                    yield output_chunk
222                }
223            } else {
224                let stream = match self.join_type {
225                    JoinType::Inner | JoinType::AsOfInner => {
226                        HashJoinExecutor::do_inner_join(params)
227                    }
228                    JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
229                        HashJoinExecutor::do_left_outer_join(params)
230                    }
231                    JoinType::LeftSemi => HashJoinExecutor::do_left_semi_anti_join::<false>(params),
232                    JoinType::LeftAnti => HashJoinExecutor::do_left_semi_anti_join::<true>(params),
233                    JoinType::RightOuter
234                    | JoinType::RightSemi
235                    | JoinType::RightAnti
236                    | JoinType::FullOuter => unimplemented!(),
237                };
238                #[for_await]
239                for chunk in stream {
240                    yield chunk?.project(&self.output_indices)
241                }
242            }
243
244            self.mem_ctx.add(-tmp_heap_size);
245        }
246    }
247}