risingwave_stream/executor/
nested_loop_temporal_join.rs1use std::collections::HashMap;
16use std::ops::Bound;
17use std::sync::Arc;
18
19use futures::{StreamExt, TryStreamExt, pin_mut, stream};
20use futures_async_stream::try_stream;
21use risingwave_common::array::StreamChunk;
22use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::hash::VnodeBitmapExt;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::DataType;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_expr::expr::NonStrictExpression;
29use risingwave_storage::StateStore;
30use risingwave_storage::row_serde::value_serde::ValueRowSerde;
31use risingwave_storage::store::PrefetchOptions;
32
33use super::join::{JoinType, JoinTypePrimitive};
34use super::temporal_join::{
35 InternalMessage, align_input, apply_indices_map, expect_first_barrier, phase1,
36};
37use super::{Execute, ExecutorInfo, Message, StreamExecutorError};
38use crate::common::metrics::MetricsInfo;
39use crate::common::table::state_table::ReplicatedStateTable;
40use crate::executor::join::builder::JoinStreamChunkBuilder;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::{ActorContextRef, Executor};
43
44pub struct NestedLoopTemporalJoinExecutor<
45 S: StateStore,
46 SD: ValueRowSerde,
47 const T: JoinTypePrimitive,
48> {
49 ctx: ActorContextRef,
50 #[expect(dead_code)]
51 info: ExecutorInfo,
52 left: Executor,
53 right: Executor,
54 right_table: TemporalSide<S, SD>,
55 condition: Option<NonStrictExpression>,
56 output_indices: Vec<usize>,
57 chunk_size: usize,
58 #[expect(dead_code)]
60 metrics: Arc<StreamingMetrics>,
61}
62
63struct TemporalSide<S: StateStore, SD: ValueRowSerde> {
64 source: ReplicatedStateTable<S, SD>,
65}
66
67impl<S: StateStore, SD: ValueRowSerde> TemporalSide<S, SD> {}
68
69#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
70async fn phase1_handle_chunk<S: StateStore, SD: ValueRowSerde, E: phase1::Phase1Evaluation>(
71 chunk_size: usize,
72 right_size: usize,
73 full_schema: Vec<DataType>,
74 right_table: &mut TemporalSide<S, SD>,
75 chunk: StreamChunk,
76) {
77 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
78
79 for (op, left_row) in chunk.rows() {
80 let mut matched = false;
81 let source = &right_table.source;
83 let right_rows = stream::iter(source.vnodes().iter_vnodes())
84 .then(|vnode| async move {
85 Ok::<_, StreamExecutorError>(Box::pin(
86 source
87 .iter_with_vnode_and_output_indices(
88 vnode,
89 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
90 PrefetchOptions::prefetch_for_large_range_scan(),
91 )
92 .await?,
93 ))
94 })
95 .try_flatten_unordered(None);
96 pin_mut!(right_rows);
97 #[for_await]
98 for right_row in right_rows {
99 let right_row = right_row?;
100 matched = true;
101 if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) {
102 yield chunk;
103 }
104 }
105 if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) {
106 yield chunk;
107 }
108 }
109 if let Some(chunk) = builder.take() {
110 yield chunk;
111 }
112}
113
114impl<S: StateStore, SD: ValueRowSerde, const T: JoinTypePrimitive>
115 NestedLoopTemporalJoinExecutor<S, SD, T>
116{
117 #[expect(clippy::too_many_arguments)]
118 pub fn new(
119 ctx: ActorContextRef,
120 info: ExecutorInfo,
121 left: Executor,
122 right: Executor,
123 table: ReplicatedStateTable<S, SD>,
124 condition: Option<NonStrictExpression>,
125 output_indices: Vec<usize>,
126 metrics: Arc<StreamingMetrics>,
127 chunk_size: usize,
128 ) -> Self {
129 let _metrics_info = MetricsInfo::new(
130 metrics.clone(),
131 table.table_id(),
132 ctx.id,
133 "nested loop temporal join",
134 );
135
136 Self {
137 ctx,
138 info,
139 left,
140 right,
141 right_table: TemporalSide { source: table },
142 condition,
143 output_indices,
144 chunk_size,
145 metrics,
146 }
147 }
148
149 #[try_stream(ok = Message, error = StreamExecutorError)]
150 async fn into_stream(mut self) {
151 let right_size = self.right.schema().len();
152
153 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
154 &self.output_indices,
155 self.left.schema().len(),
156 right_size,
157 );
158
159 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
160
161 let full_schema: Vec<_> = self
162 .left
163 .schema()
164 .data_types()
165 .into_iter()
166 .chain(self.right.schema().data_types().into_iter())
167 .collect();
168
169 let input = align_input::<true>(self.left, self.right);
170 pin_mut!(input);
171
172 let barrier = expect_first_barrier(&mut input).await?;
173 let barrier_epoch = barrier.epoch;
174 yield Message::Barrier(barrier);
175 self.right_table.source.init_epoch(barrier_epoch).await?;
176
177 #[for_await]
178 for msg in input {
179 match msg? {
180 InternalMessage::WaterMark(watermark) => {
181 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
182 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
183 }
184 InternalMessage::Chunk(chunk) => {
185 let full_schema = full_schema.clone();
186
187 if T == JoinType::Inner {
188 let st1 = phase1_handle_chunk::<S, SD, phase1::Inner>(
189 self.chunk_size,
190 right_size,
191 full_schema,
192 &mut self.right_table,
193 chunk,
194 );
195 #[for_await]
196 for chunk in st1 {
197 let chunk = chunk?;
198 let new_chunk = if let Some(ref cond) = self.condition {
199 let (data_chunk, ops) = chunk.into_parts();
200 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
201 let passed_bitmap =
202 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
203 let (columns, vis) = data_chunk.into_parts();
204 let new_vis = vis & passed_bitmap;
205 StreamChunk::with_visibility(ops, columns, new_vis)
206 } else {
207 chunk
208 };
209 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
210 yield Message::Chunk(new_chunk);
211 }
212 } else if let Some(ref cond) = self.condition {
213 let st1 = phase1_handle_chunk::<S, SD, phase1::LeftOuterWithCond>(
215 self.chunk_size,
216 right_size,
217 full_schema,
218 &mut self.right_table,
219 chunk,
220 );
221 let mut matched_count = 0usize;
222 #[for_await]
223 for chunk in st1 {
224 let chunk = chunk?;
225 let (data_chunk, ops) = chunk.into_parts();
226 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
227 let passed_bitmap =
228 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
229 let (columns, vis) = data_chunk.into_parts();
230 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
231 for (passed, not_match_end) in
232 passed_bitmap.iter().zip_eq_debug(vis.iter())
233 {
234 let is_match_end = !not_match_end;
235 let vis = if is_match_end && matched_count == 0 {
236 true
238 } else if is_match_end {
239 matched_count = 0;
241 false
243 } else {
244 if passed {
245 matched_count += 1;
246 }
247 passed
248 };
249 new_vis.append(vis);
250 }
251 let new_chunk = apply_indices_map(
252 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
253 &self.output_indices,
254 );
255 yield Message::Chunk(new_chunk);
256 }
257 assert_eq!(matched_count, 0);
259 } else {
260 let st1 = phase1_handle_chunk::<S, SD, phase1::LeftOuter>(
261 self.chunk_size,
262 right_size,
263 full_schema,
264 &mut self.right_table,
265 chunk,
266 );
267 #[for_await]
268 for chunk in st1 {
269 let chunk = chunk?;
270 let new_chunk = apply_indices_map(chunk, &self.output_indices);
271 yield Message::Chunk(new_chunk);
272 }
273 }
274 }
275 InternalMessage::Barrier(updates, barrier) => {
276 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
277
278 for chunk in updates {
280 self.right_table.source.write_chunk(chunk);
281 }
282
283 let right_post_commit = self.right_table.source.commit(barrier.epoch).await?;
284
285 yield Message::Barrier(barrier);
286
287 right_post_commit
288 .post_yield_barrier(update_vnode_bitmap)
289 .await?;
290 }
291 }
292 }
293 }
294}
295
296impl<S: StateStore, SD: ValueRowSerde, const T: JoinTypePrimitive> Execute
297 for NestedLoopTemporalJoinExecutor<S, SD, T>
298{
299 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
300 self.into_stream().boxed()
301 }
302}