risingwave_stream/executor/
nested_loop_temporal_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound;
17use std::sync::Arc;
18
19use futures::{StreamExt, TryStreamExt, pin_mut, stream};
20use futures_async_stream::try_stream;
21use risingwave_common::array::StreamChunk;
22use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::DataType;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_expr::expr::NonStrictExpression;
29use risingwave_storage::StateStore;
30use risingwave_storage::row_serde::value_serde::ValueRowSerde;
31use risingwave_storage::store::PrefetchOptions;
32
33use super::join::{JoinType, JoinTypePrimitive};
34use super::temporal_join::{
35    InternalMessage, align_input, apply_indices_map, expect_first_barrier, phase1,
36};
37use super::{Execute, ExecutorInfo, Message, StreamExecutorError};
38use crate::common::metrics::MetricsInfo;
39use crate::common::table::state_table::ReplicatedStateTable;
40use crate::executor::join::builder::JoinStreamChunkBuilder;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::{ActorContextRef, Executor};
43
44pub struct NestedLoopTemporalJoinExecutor<
45    S: StateStore,
46    SD: ValueRowSerde,
47    const T: JoinTypePrimitive,
48> {
49    ctx: ActorContextRef,
50    #[expect(dead_code)]
51    info: ExecutorInfo,
52    left: Executor,
53    right: Executor,
54    right_table: TemporalSide<S, SD>,
55    condition: Option<NonStrictExpression>,
56    output_indices: Vec<usize>,
57    chunk_size: usize,
58    // TODO: update metrics
59    #[expect(dead_code)]
60    metrics: Arc<StreamingMetrics>,
61}
62
63struct TemporalSide<S: StateStore, SD: ValueRowSerde> {
64    source: ReplicatedStateTable<S, SD>,
65}
66
67impl<S: StateStore, SD: ValueRowSerde> TemporalSide<S, SD> {}
68
69#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
70async fn phase1_handle_chunk<S: StateStore, SD: ValueRowSerde, E: phase1::Phase1Evaluation>(
71    chunk_size: usize,
72    right_size: usize,
73    full_schema: Vec<DataType>,
74    right_table: &mut TemporalSide<S, SD>,
75    chunk: StreamChunk,
76) {
77    let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
78
79    for (op, left_row) in chunk.rows() {
80        let mut matched = false;
81        // Create a per-vnode stream for each vnode and flatten them concurrently.
82        let source = &right_table.source;
83        let right_rows = stream::iter(source.vnodes().iter_vnodes())
84            .then(|vnode| async move {
85                Ok::<_, StreamExecutorError>(Box::pin(
86                    source
87                        .iter_with_vnode_and_output_indices(
88                            vnode,
89                            &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
90                            PrefetchOptions::prefetch_for_large_range_scan(),
91                        )
92                        .await?,
93                ))
94            })
95            .try_flatten_unordered(None);
96        pin_mut!(right_rows);
97        #[for_await]
98        for right_row in right_rows {
99            let right_row = right_row?;
100            matched = true;
101            if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) {
102                yield chunk;
103            }
104        }
105        if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) {
106            yield chunk;
107        }
108    }
109    if let Some(chunk) = builder.take() {
110        yield chunk;
111    }
112}
113
114impl<S: StateStore, SD: ValueRowSerde, const T: JoinTypePrimitive>
115    NestedLoopTemporalJoinExecutor<S, SD, T>
116{
117    #[expect(clippy::too_many_arguments)]
118    pub fn new(
119        ctx: ActorContextRef,
120        info: ExecutorInfo,
121        left: Executor,
122        right: Executor,
123        table: ReplicatedStateTable<S, SD>,
124        condition: Option<NonStrictExpression>,
125        output_indices: Vec<usize>,
126        metrics: Arc<StreamingMetrics>,
127        chunk_size: usize,
128    ) -> Self {
129        let _metrics_info = MetricsInfo::new(
130            metrics.clone(),
131            table.table_id(),
132            ctx.id,
133            "nested loop temporal join",
134        );
135
136        Self {
137            ctx,
138            info,
139            left,
140            right,
141            right_table: TemporalSide { source: table },
142            condition,
143            output_indices,
144            chunk_size,
145            metrics,
146        }
147    }
148
149    #[try_stream(ok = Message, error = StreamExecutorError)]
150    async fn into_stream(mut self) {
151        let right_size = self.right.schema().len();
152
153        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
154            &self.output_indices,
155            self.left.schema().len(),
156            right_size,
157        );
158
159        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
160
161        let full_schema: Vec<_> = self
162            .left
163            .schema()
164            .data_types()
165            .into_iter()
166            .chain(self.right.schema().data_types().into_iter())
167            .collect();
168
169        let input = align_input::<true>(self.left, self.right);
170        pin_mut!(input);
171
172        let barrier = expect_first_barrier(&mut input).await?;
173        let barrier_epoch = barrier.epoch;
174        yield Message::Barrier(barrier);
175        self.right_table.source.init_epoch(barrier_epoch).await?;
176
177        #[for_await]
178        for msg in input {
179            match msg? {
180                InternalMessage::WaterMark(watermark) => {
181                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
182                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
183                }
184                InternalMessage::Chunk(chunk) => {
185                    let full_schema = full_schema.clone();
186
187                    if T == JoinType::Inner {
188                        let st1 = phase1_handle_chunk::<S, SD, phase1::Inner>(
189                            self.chunk_size,
190                            right_size,
191                            full_schema,
192                            &mut self.right_table,
193                            chunk,
194                        );
195                        #[for_await]
196                        for chunk in st1 {
197                            let chunk = chunk?;
198                            let new_chunk = if let Some(ref cond) = self.condition {
199                                let (data_chunk, ops) = chunk.into_parts();
200                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
201                                let passed_bitmap =
202                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
203                                let (columns, vis) = data_chunk.into_parts();
204                                let new_vis = vis & passed_bitmap;
205                                StreamChunk::with_visibility(ops, columns, new_vis)
206                            } else {
207                                chunk
208                            };
209                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
210                            yield Message::Chunk(new_chunk);
211                        }
212                    } else if let Some(ref cond) = self.condition {
213                        // Joined result without evaluating non-lookup conditions.
214                        let st1 = phase1_handle_chunk::<S, SD, phase1::LeftOuterWithCond>(
215                            self.chunk_size,
216                            right_size,
217                            full_schema,
218                            &mut self.right_table,
219                            chunk,
220                        );
221                        let mut matched_count = 0usize;
222                        #[for_await]
223                        for chunk in st1 {
224                            let chunk = chunk?;
225                            let (data_chunk, ops) = chunk.into_parts();
226                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
227                            let passed_bitmap =
228                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
229                            let (columns, vis) = data_chunk.into_parts();
230                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
231                            for (passed, not_match_end) in
232                                passed_bitmap.iter().zip_eq_debug(vis.iter())
233                            {
234                                let is_match_end = !not_match_end;
235                                let vis = if is_match_end && matched_count == 0 {
236                                    // Nothing is matched, so the marker row should be visible.
237                                    true
238                                } else if is_match_end {
239                                    // reset the count
240                                    matched_count = 0;
241                                    // rows found, so the marker row should be invisible.
242                                    false
243                                } else {
244                                    if passed {
245                                        matched_count += 1;
246                                    }
247                                    passed
248                                };
249                                new_vis.append(vis);
250                            }
251                            let new_chunk = apply_indices_map(
252                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
253                                &self.output_indices,
254                            );
255                            yield Message::Chunk(new_chunk);
256                        }
257                        // The last row should always be marker row,
258                        assert_eq!(matched_count, 0);
259                    } else {
260                        let st1 = phase1_handle_chunk::<S, SD, phase1::LeftOuter>(
261                            self.chunk_size,
262                            right_size,
263                            full_schema,
264                            &mut self.right_table,
265                            chunk,
266                        );
267                        #[for_await]
268                        for chunk in st1 {
269                            let chunk = chunk?;
270                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
271                            yield Message::Chunk(new_chunk);
272                        }
273                    }
274                }
275                InternalMessage::Barrier(updates, barrier) => {
276                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
277
278                    // Write right-side chunks to the replicated state table
279                    for chunk in updates {
280                        self.right_table.source.write_chunk(chunk);
281                    }
282
283                    let right_post_commit = self.right_table.source.commit(barrier.epoch).await?;
284
285                    yield Message::Barrier(barrier);
286
287                    right_post_commit
288                        .post_yield_barrier(update_vnode_bitmap)
289                        .await?;
290                }
291            }
292        }
293    }
294}
295
296impl<S: StateStore, SD: ValueRowSerde, const T: JoinTypePrimitive> Execute
297    for NestedLoopTemporalJoinExecutor<S, SD, T>
298{
299    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
300        self.into_stream().boxed()
301    }
302}