risingwave_stream/executor/
nested_loop_temporal_join.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::StreamExt;
19use futures_async_stream::try_stream;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
22use risingwave_common::bitmap::BitmapBuilder;
23use risingwave_common::types::DataType;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_expr::expr::NonStrictExpression;
26use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
27use risingwave_storage::StateStore;
28use risingwave_storage::store::PrefetchOptions;
29use risingwave_storage::table::batch_table::BatchTable;
30
31use super::join::{JoinType, JoinTypePrimitive};
32use super::temporal_join::{InternalMessage, align_input, apply_indices_map, phase1};
33use super::{Execute, ExecutorInfo, Message, StreamExecutorError};
34use crate::common::metrics::MetricsInfo;
35use crate::executor::join::builder::JoinStreamChunkBuilder;
36use crate::executor::monitor::StreamingMetrics;
37use crate::executor::{ActorContextRef, Executor};
38
39pub struct NestedLoopTemporalJoinExecutor<S: StateStore, const T: JoinTypePrimitive> {
40 ctx: ActorContextRef,
41 #[allow(dead_code)]
42 info: ExecutorInfo,
43 left: Executor,
44 right: Executor,
45 right_table: TemporalSide<S>,
46 condition: Option<NonStrictExpression>,
47 output_indices: Vec<usize>,
48 chunk_size: usize,
49 #[allow(dead_code)]
51 metrics: Arc<StreamingMetrics>,
52}
53
54struct TemporalSide<S: StateStore> {
55 source: BatchTable<S>,
56}
57
58impl<S: StateStore> TemporalSide<S> {}
59
60#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
61#[allow(clippy::too_many_arguments)]
62async fn phase1_handle_chunk<S: StateStore, E: phase1::Phase1Evaluation>(
63 chunk_size: usize,
64 right_size: usize,
65 full_schema: Vec<DataType>,
66 epoch: HummockEpoch,
67 right_table: &mut TemporalSide<S>,
68 chunk: StreamChunk,
69) {
70 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
71
72 for (op, left_row) in chunk.rows() {
73 let mut matched = false;
74 #[for_await]
75 for right_row in right_table
76 .source
77 .batch_iter(
78 HummockReadEpoch::NoWait(epoch),
79 false,
80 PrefetchOptions::prefetch_for_large_range_scan(),
81 )
82 .await?
83 {
84 let right_row = right_row?;
85 matched = true;
86 if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) {
87 yield chunk;
88 }
89 }
90 if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) {
91 yield chunk;
92 }
93 }
94 if let Some(chunk) = builder.take() {
95 yield chunk;
96 }
97}
98
99impl<S: StateStore, const T: JoinTypePrimitive> NestedLoopTemporalJoinExecutor<S, T> {
100 #[expect(clippy::too_many_arguments)]
101 pub fn new(
102 ctx: ActorContextRef,
103 info: ExecutorInfo,
104 left: Executor,
105 right: Executor,
106 table: BatchTable<S>,
107 condition: Option<NonStrictExpression>,
108 output_indices: Vec<usize>,
109 metrics: Arc<StreamingMetrics>,
110 chunk_size: usize,
111 ) -> Self {
112 let _metrics_info = MetricsInfo::new(
113 metrics.clone(),
114 table.table_id().table_id,
115 ctx.id,
116 "nested loop temporal join",
117 );
118
119 Self {
120 ctx: ctx.clone(),
121 info,
122 left,
123 right,
124 right_table: TemporalSide { source: table },
125 condition,
126 output_indices,
127 chunk_size,
128 metrics,
129 }
130 }
131
132 #[try_stream(ok = Message, error = StreamExecutorError)]
133 async fn into_stream(mut self) {
134 let right_size = self.right.schema().len();
135
136 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
137 &self.output_indices,
138 self.left.schema().len(),
139 right_size,
140 );
141
142 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
143
144 let mut prev_epoch = None;
145
146 let full_schema: Vec<_> = self
147 .left
148 .schema()
149 .data_types()
150 .into_iter()
151 .chain(self.right.schema().data_types().into_iter())
152 .collect();
153
154 #[for_await]
155 for msg in align_input::<false>(self.left, self.right) {
156 match msg? {
157 InternalMessage::WaterMark(watermark) => {
158 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
159 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
160 }
161 InternalMessage::Chunk(chunk) => {
162 let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
163
164 let full_schema = full_schema.clone();
165
166 if T == JoinType::Inner {
167 let st1 = phase1_handle_chunk::<S, phase1::Inner>(
168 self.chunk_size,
169 right_size,
170 full_schema,
171 epoch,
172 &mut self.right_table,
173 chunk,
174 );
175 #[for_await]
176 for chunk in st1 {
177 let chunk = chunk?;
178 let new_chunk = if let Some(ref cond) = self.condition {
179 let (data_chunk, ops) = chunk.into_parts();
180 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
181 let passed_bitmap =
182 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
183 let (columns, vis) = data_chunk.into_parts();
184 let new_vis = vis & passed_bitmap;
185 StreamChunk::with_visibility(ops, columns, new_vis)
186 } else {
187 chunk
188 };
189 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
190 yield Message::Chunk(new_chunk);
191 }
192 } else if let Some(ref cond) = self.condition {
193 let st1 = phase1_handle_chunk::<S, phase1::LeftOuterWithCond>(
195 self.chunk_size,
196 right_size,
197 full_schema,
198 epoch,
199 &mut self.right_table,
200 chunk,
201 );
202 let mut matched_count = 0usize;
203 #[for_await]
204 for chunk in st1 {
205 let chunk = chunk?;
206 let (data_chunk, ops) = chunk.into_parts();
207 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
208 let passed_bitmap =
209 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
210 let (columns, vis) = data_chunk.into_parts();
211 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
212 for (passed, not_match_end) in
213 passed_bitmap.iter().zip_eq_debug(vis.iter())
214 {
215 let is_match_end = !not_match_end;
216 let vis = if is_match_end && matched_count == 0 {
217 true
219 } else if is_match_end {
220 matched_count = 0;
222 false
224 } else {
225 if passed {
226 matched_count += 1;
227 }
228 passed
229 };
230 new_vis.append(vis);
231 }
232 let new_chunk = apply_indices_map(
233 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
234 &self.output_indices,
235 );
236 yield Message::Chunk(new_chunk);
237 }
238 assert_eq!(matched_count, 0);
240 } else {
241 let st1 = phase1_handle_chunk::<S, phase1::LeftOuter>(
242 self.chunk_size,
243 right_size,
244 full_schema,
245 epoch,
246 &mut self.right_table,
247 chunk,
248 );
249 #[for_await]
250 for chunk in st1 {
251 let chunk = chunk?;
252 let new_chunk = apply_indices_map(chunk, &self.output_indices);
253 yield Message::Chunk(new_chunk);
254 }
255 }
256 }
257 InternalMessage::Barrier(chunk, barrier) => {
258 assert!(chunk.is_empty());
259 if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
260 let _vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone());
261 }
262 prev_epoch = Some(barrier.epoch.curr);
263 yield Message::Barrier(barrier)
264 }
265 }
266 }
267 }
268}
269
270impl<S: StateStore, const T: JoinTypePrimitive> Execute for NestedLoopTemporalJoinExecutor<S, T> {
271 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
272 self.into_stream().boxed()
273 }
274}