risingwave_stream/executor/
nested_loop_temporal_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::StreamExt;
19use futures_async_stream::try_stream;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
22use risingwave_common::bitmap::BitmapBuilder;
23use risingwave_common::types::DataType;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_expr::expr::NonStrictExpression;
26use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
27use risingwave_storage::StateStore;
28use risingwave_storage::store::PrefetchOptions;
29use risingwave_storage::table::batch_table::BatchTable;
30
31use super::join::{JoinType, JoinTypePrimitive};
32use super::temporal_join::{InternalMessage, align_input, apply_indices_map, phase1};
33use super::{Execute, ExecutorInfo, Message, StreamExecutorError};
34use crate::common::metrics::MetricsInfo;
35use crate::executor::join::builder::JoinStreamChunkBuilder;
36use crate::executor::monitor::StreamingMetrics;
37use crate::executor::{ActorContextRef, Executor};
38
39pub struct NestedLoopTemporalJoinExecutor<S: StateStore, const T: JoinTypePrimitive> {
40    ctx: ActorContextRef,
41    #[allow(dead_code)]
42    info: ExecutorInfo,
43    left: Executor,
44    right: Executor,
45    right_table: TemporalSide<S>,
46    condition: Option<NonStrictExpression>,
47    output_indices: Vec<usize>,
48    chunk_size: usize,
49    // TODO: update metrics
50    #[allow(dead_code)]
51    metrics: Arc<StreamingMetrics>,
52}
53
54struct TemporalSide<S: StateStore> {
55    source: BatchTable<S>,
56}
57
58impl<S: StateStore> TemporalSide<S> {}
59
60#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
61#[allow(clippy::too_many_arguments)]
62async fn phase1_handle_chunk<S: StateStore, E: phase1::Phase1Evaluation>(
63    chunk_size: usize,
64    right_size: usize,
65    full_schema: Vec<DataType>,
66    epoch: HummockEpoch,
67    right_table: &mut TemporalSide<S>,
68    chunk: StreamChunk,
69) {
70    let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
71
72    for (op, left_row) in chunk.rows() {
73        let mut matched = false;
74        #[for_await]
75        for right_row in right_table
76            .source
77            .batch_iter(
78                HummockReadEpoch::NoWait(epoch),
79                false,
80                PrefetchOptions::prefetch_for_large_range_scan(),
81            )
82            .await?
83        {
84            let right_row = right_row?;
85            matched = true;
86            if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) {
87                yield chunk;
88            }
89        }
90        if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) {
91            yield chunk;
92        }
93    }
94    if let Some(chunk) = builder.take() {
95        yield chunk;
96    }
97}
98
99impl<S: StateStore, const T: JoinTypePrimitive> NestedLoopTemporalJoinExecutor<S, T> {
100    #[expect(clippy::too_many_arguments)]
101    pub fn new(
102        ctx: ActorContextRef,
103        info: ExecutorInfo,
104        left: Executor,
105        right: Executor,
106        table: BatchTable<S>,
107        condition: Option<NonStrictExpression>,
108        output_indices: Vec<usize>,
109        metrics: Arc<StreamingMetrics>,
110        chunk_size: usize,
111    ) -> Self {
112        let _metrics_info = MetricsInfo::new(
113            metrics.clone(),
114            table.table_id().table_id,
115            ctx.id,
116            "nested loop temporal join",
117        );
118
119        Self {
120            ctx: ctx.clone(),
121            info,
122            left,
123            right,
124            right_table: TemporalSide { source: table },
125            condition,
126            output_indices,
127            chunk_size,
128            metrics,
129        }
130    }
131
132    #[try_stream(ok = Message, error = StreamExecutorError)]
133    async fn into_stream(mut self) {
134        let right_size = self.right.schema().len();
135
136        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
137            &self.output_indices,
138            self.left.schema().len(),
139            right_size,
140        );
141
142        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
143
144        let mut prev_epoch = None;
145
146        let full_schema: Vec<_> = self
147            .left
148            .schema()
149            .data_types()
150            .into_iter()
151            .chain(self.right.schema().data_types().into_iter())
152            .collect();
153
154        #[for_await]
155        for msg in align_input::<false>(self.left, self.right) {
156            match msg? {
157                InternalMessage::WaterMark(watermark) => {
158                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
159                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
160                }
161                InternalMessage::Chunk(chunk) => {
162                    let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
163
164                    let full_schema = full_schema.clone();
165
166                    if T == JoinType::Inner {
167                        let st1 = phase1_handle_chunk::<S, phase1::Inner>(
168                            self.chunk_size,
169                            right_size,
170                            full_schema,
171                            epoch,
172                            &mut self.right_table,
173                            chunk,
174                        );
175                        #[for_await]
176                        for chunk in st1 {
177                            let chunk = chunk?;
178                            let new_chunk = if let Some(ref cond) = self.condition {
179                                let (data_chunk, ops) = chunk.into_parts();
180                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
181                                let passed_bitmap =
182                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
183                                let (columns, vis) = data_chunk.into_parts();
184                                let new_vis = vis & passed_bitmap;
185                                StreamChunk::with_visibility(ops, columns, new_vis)
186                            } else {
187                                chunk
188                            };
189                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
190                            yield Message::Chunk(new_chunk);
191                        }
192                    } else if let Some(ref cond) = self.condition {
193                        // Joined result without evaluating non-lookup conditions.
194                        let st1 = phase1_handle_chunk::<S, phase1::LeftOuterWithCond>(
195                            self.chunk_size,
196                            right_size,
197                            full_schema,
198                            epoch,
199                            &mut self.right_table,
200                            chunk,
201                        );
202                        let mut matched_count = 0usize;
203                        #[for_await]
204                        for chunk in st1 {
205                            let chunk = chunk?;
206                            let (data_chunk, ops) = chunk.into_parts();
207                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
208                            let passed_bitmap =
209                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
210                            let (columns, vis) = data_chunk.into_parts();
211                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
212                            for (passed, not_match_end) in
213                                passed_bitmap.iter().zip_eq_debug(vis.iter())
214                            {
215                                let is_match_end = !not_match_end;
216                                let vis = if is_match_end && matched_count == 0 {
217                                    // Nothing is matched, so the marker row should be visible.
218                                    true
219                                } else if is_match_end {
220                                    // reset the count
221                                    matched_count = 0;
222                                    // rows found, so the marker row should be invisible.
223                                    false
224                                } else {
225                                    if passed {
226                                        matched_count += 1;
227                                    }
228                                    passed
229                                };
230                                new_vis.append(vis);
231                            }
232                            let new_chunk = apply_indices_map(
233                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
234                                &self.output_indices,
235                            );
236                            yield Message::Chunk(new_chunk);
237                        }
238                        // The last row should always be marker row,
239                        assert_eq!(matched_count, 0);
240                    } else {
241                        let st1 = phase1_handle_chunk::<S, phase1::LeftOuter>(
242                            self.chunk_size,
243                            right_size,
244                            full_schema,
245                            epoch,
246                            &mut self.right_table,
247                            chunk,
248                        );
249                        #[for_await]
250                        for chunk in st1 {
251                            let chunk = chunk?;
252                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
253                            yield Message::Chunk(new_chunk);
254                        }
255                    }
256                }
257                InternalMessage::Barrier(chunk, barrier) => {
258                    assert!(chunk.is_empty());
259                    if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
260                        let _vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone());
261                    }
262                    prev_epoch = Some(barrier.epoch.curr);
263                    yield Message::Barrier(barrier)
264                }
265            }
266        }
267    }
268}
269
270impl<S: StateStore, const T: JoinTypePrimitive> Execute for NestedLoopTemporalJoinExecutor<S, T> {
271    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
272        self.into_stream().boxed()
273    }
274}