risingwave_stream/from_proto/
temporal_join.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ColumnId;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::value_encoding::BasicSerde;
21use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
22use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
23use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc};
24use risingwave_storage::row_serde::value_serde::ValueRowSerde;
25
26use super::*;
27use crate::common::table::state_table::{
28 ReplicatedStateTable, StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
29};
30use crate::executor::monitor::StreamingMetrics;
31use crate::executor::{
32 ActorContextRef, JoinType, NestedLoopTemporalJoinExecutor, TemporalJoinExecutor,
33};
34use crate::task::AtomicU64Ref;
35
36pub struct TemporalJoinExecutorBuilder;
37
38impl ExecutorBuilder for TemporalJoinExecutorBuilder {
39 type Node = TemporalJoinNode;
40
41 async fn new_boxed_executor(
42 params: ExecutorParams,
43 node: &Self::Node,
44 store: impl StateStore,
45 ) -> StreamResult<Executor> {
46 let table_desc: &StorageTableDesc = node.get_table_desc()?;
47 let condition = match node.get_condition() {
48 Ok(cond_prost) => Some(build_non_strict_from_prost(
49 cond_prost,
50 params.eval_error_report,
51 )?),
52 Err(_) => None,
53 };
54
55 let table_output_indices = node
56 .get_table_output_indices()
57 .iter()
58 .map(|&x| x as usize)
59 .collect_vec();
60
61 let output_indices = node
62 .get_output_indices()
63 .iter()
64 .map(|&x| x as usize)
65 .collect_vec();
66 let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
67
68 let versioned = table_desc.versioned;
69 let output_column_ids = table_output_indices
74 .iter()
75 .map(|&x| ColumnId::new(table_desc.columns[x].column_id))
76 .collect_vec();
77 let vnodes = params.vnode_bitmap.clone().map(Arc::new);
78
79 if node.get_is_nested_loop() {
80 macro_rules! build_nested_loop {
81 ($SD:ident) => {{
82 let right_table =
83 StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
84 table_desc,
85 store.clone(),
86 vnodes.clone(),
87 params.fragment_id,
88 )
89 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
90 .with_output_column_ids(output_column_ids.clone())
91 .forbid_preload_all_rows()
92 .build()
93 .await;
94
95 let dispatcher_args = NestedLoopTemporalJoinExecutorDispatcherArgs {
96 ctx: params.actor_context,
97 info: params.info.clone(),
98 left: source_l,
99 right: source_r,
100 right_table,
101 condition,
102 output_indices,
103 chunk_size: params.config.developer.chunk_size,
104 metrics: params.executor_stats,
105 join_type_proto: node.get_join_type()?,
106 };
107 Ok((params.info, dispatcher_args.dispatch()?).into())
108 }};
109 }
110 if versioned {
111 build_nested_loop!(ColumnAwareSerde)
112 } else {
113 build_nested_loop!(BasicSerde)
114 }
115 } else {
116 let table_stream_key_indices = table_desc
120 .stream_key
121 .iter()
122 .map(|&k| {
123 table_output_indices
124 .iter()
125 .position(|&idx| idx == k as usize)
126 .expect("stream key should be included in table output")
127 })
128 .collect_vec();
129
130 let left_join_keys = node
131 .get_left_key()
132 .iter()
133 .map(|key| *key as usize)
134 .collect_vec();
135
136 let right_join_keys = node
137 .get_right_key()
138 .iter()
139 .map(|key| *key as usize)
140 .collect_vec();
141
142 let null_safe = node.get_null_safe().clone();
143
144 let join_key_data_types = left_join_keys
145 .iter()
146 .map(|idx| source_l.schema().fields[*idx].data_type())
147 .collect_vec();
148
149 let memo_table = node.get_memo_table();
150 let memo_table = match memo_table {
151 Ok(memo_table) => {
152 let vnodes = Arc::new(
153 params
154 .vnode_bitmap
155 .expect("vnodes not set for temporal join"),
156 );
157 Some(
158 StateTableBuilder::new(memo_table, store.clone(), Some(vnodes.clone()))
159 .enable_preload_all_rows_by_config(¶ms.config)
160 .build()
161 .await,
162 )
163 }
164 Err(_) => None,
165 };
166 let append_only = memo_table.is_none();
167
168 macro_rules! build_hash {
169 ($SD:ident) => {{
170 let right_table =
171 StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
172 table_desc,
173 store.clone(),
174 vnodes.clone(),
175 params.fragment_id,
176 )
177 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
178 .with_output_column_ids(output_column_ids.clone())
179 .forbid_preload_all_rows()
180 .build()
181 .await;
182
183 let dispatcher_args = TemporalJoinExecutorDispatcherArgs::<_, $SD> {
184 ctx: params.actor_context,
185 info: params.info.clone(),
186 left: source_l,
187 right: source_r,
188 right_table,
189 left_join_keys,
190 right_join_keys,
191 null_safe,
192 condition,
193 output_indices,
194 table_stream_key_indices,
195 watermark_epoch: params.watermark_epoch,
196 chunk_size: params.config.developer.chunk_size,
197 metrics: params.executor_stats,
198 join_type_proto: node.get_join_type()?,
199 join_key_data_types,
200 memo_table,
201 append_only,
202 };
203
204 Ok((params.info, dispatcher_args.dispatch()?).into())
205 }};
206 }
207 if versioned {
208 build_hash!(ColumnAwareSerde)
209 } else {
210 build_hash!(BasicSerde)
211 }
212 }
213 }
214}
215
216struct TemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
217 ctx: ActorContextRef,
218 info: ExecutorInfo,
219 left: Executor,
220 right: Executor,
221 right_table: ReplicatedStateTable<S, SD>,
222 left_join_keys: Vec<usize>,
223 right_join_keys: Vec<usize>,
224 null_safe: Vec<bool>,
225 condition: Option<NonStrictExpression>,
226 output_indices: Vec<usize>,
227 table_stream_key_indices: Vec<usize>,
228 watermark_epoch: AtomicU64Ref,
229 chunk_size: usize,
230 metrics: Arc<StreamingMetrics>,
231 join_type_proto: JoinTypeProto,
232 join_key_data_types: Vec<DataType>,
233 memo_table: Option<StateTable<S>>,
234 append_only: bool,
235}
236
237impl<S: StateStore, SD: ValueRowSerde> HashKeyDispatcher
238 for TemporalJoinExecutorDispatcherArgs<S, SD>
239{
240 type Output = StreamResult<Box<dyn Execute>>;
241
242 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
243 macro_rules! build {
245 ($join_type:ident, $append_only:ident) => {
246 Ok(Box::new(TemporalJoinExecutor::<
247 K,
248 S,
249 SD,
250 { JoinType::$join_type },
251 { $append_only },
252 >::new(
253 self.ctx,
254 self.info,
255 self.left,
256 self.right,
257 self.right_table,
258 self.left_join_keys,
259 self.right_join_keys,
260 self.null_safe,
261 self.condition,
262 self.output_indices,
263 self.table_stream_key_indices,
264 self.watermark_epoch,
265 self.metrics,
266 self.chunk_size,
267 self.join_key_data_types,
268 self.memo_table,
269 )))
270 };
271 }
272 match self.join_type_proto {
273 JoinTypeProto::Inner => {
274 if self.append_only {
275 build!(Inner, true)
276 } else {
277 build!(Inner, false)
278 }
279 }
280 JoinTypeProto::LeftOuter => {
281 if self.append_only {
282 build!(LeftOuter, true)
283 } else {
284 build!(LeftOuter, false)
285 }
286 }
287 _ => unreachable!(),
288 }
289 }
290
291 fn data_types(&self) -> &[DataType] {
292 &self.join_key_data_types
293 }
294}
295
296struct NestedLoopTemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
297 ctx: ActorContextRef,
298 info: ExecutorInfo,
299 left: Executor,
300 right: Executor,
301 right_table: ReplicatedStateTable<S, SD>,
302 condition: Option<NonStrictExpression>,
303 output_indices: Vec<usize>,
304 chunk_size: usize,
305 metrics: Arc<StreamingMetrics>,
306 join_type_proto: JoinTypeProto,
307}
308
309impl<S: StateStore, SD: ValueRowSerde> NestedLoopTemporalJoinExecutorDispatcherArgs<S, SD> {
310 fn dispatch(self) -> StreamResult<Box<dyn Execute>> {
311 macro_rules! build {
313 ($join_type:ident) => {
314 Ok(Box::new(NestedLoopTemporalJoinExecutor::<
315 S,
316 SD,
317 { JoinType::$join_type },
318 >::new(
319 self.ctx,
320 self.info,
321 self.left,
322 self.right,
323 self.right_table,
324 self.condition,
325 self.output_indices,
326 self.metrics,
327 self.chunk_size,
328 )))
329 };
330 }
331 match self.join_type_proto {
332 JoinTypeProto::Inner => {
333 build!(Inner)
334 }
335 JoinTypeProto::LeftOuter => {
336 build!(LeftOuter)
337 }
338 _ => unreachable!(),
339 }
340 }
341}