risingwave_stream/from_proto/
temporal_join.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ColumnId;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::value_encoding::BasicSerde;
21use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
22use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
23use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc};
24use risingwave_storage::row_serde::value_serde::ValueRowSerde;
25
26use super::*;
27use crate::common::table::state_table::{
28 ReplicatedStateTable, StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
29};
30use crate::executor::monitor::StreamingMetrics;
31use crate::executor::{
32 ActorContextRef, JoinType, NestedLoopTemporalJoinExecutor, TemporalJoinExecutor,
33};
34use crate::task::AtomicU64Ref;
35
36pub struct TemporalJoinExecutorBuilder;
37
38impl ExecutorBuilder for TemporalJoinExecutorBuilder {
39 type Node = TemporalJoinNode;
40
41 async fn new_boxed_executor(
42 params: ExecutorParams,
43 node: &Self::Node,
44 store: impl StateStore,
45 ) -> StreamResult<Executor> {
46 let table_desc: &StorageTableDesc = node.get_table_desc()?;
47 let condition = match node.get_condition() {
48 Ok(cond_prost) => Some(build_non_strict_from_prost(
49 cond_prost,
50 params.eval_error_report,
51 )?),
52 Err(_) => None,
53 };
54
55 let table_output_indices = node
56 .get_table_output_indices()
57 .iter()
58 .map(|&x| x as usize)
59 .collect_vec();
60
61 let output_indices = node
62 .get_output_indices()
63 .iter()
64 .map(|&x| x as usize)
65 .collect_vec();
66 let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
67
68 let versioned = table_desc.versioned;
69 let output_column_ids = table_output_indices
74 .iter()
75 .map(|&x| ColumnId::new(table_desc.columns[x].column_id))
76 .collect_vec();
77 let vnodes = params.vnode_bitmap.clone().map(Arc::new);
78
79 if node.get_is_nested_loop() {
80 macro_rules! build_nested_loop {
81 ($SD:ident) => {{
82 let right_table =
83 StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
84 table_desc,
85 store.clone(),
86 vnodes.clone(),
87 params.fragment_id.as_raw_id(),
88 )
89 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
90 .with_output_column_ids(output_column_ids.clone())
91 .forbid_preload_all_rows()
92 .build()
93 .await;
94
95 let dispatcher_args = NestedLoopTemporalJoinExecutorDispatcherArgs {
96 ctx: params.actor_context,
97 info: params.info.clone(),
98 left: source_l,
99 right: source_r,
100 right_table,
101 condition,
102 output_indices,
103 chunk_size: params.config.developer.chunk_size,
104 metrics: params.executor_stats,
105 join_type_proto: node.get_join_type()?,
106 };
107 Ok((params.info, dispatcher_args.dispatch()?).into())
108 }};
109 }
110 if versioned {
111 build_nested_loop!(ColumnAwareSerde)
112 } else {
113 build_nested_loop!(BasicSerde)
114 }
115 } else {
116 let table_stream_key_indices = table_desc
117 .stream_key
118 .iter()
119 .map(|&k| k as usize)
120 .collect_vec();
121
122 let left_join_keys = node
123 .get_left_key()
124 .iter()
125 .map(|key| *key as usize)
126 .collect_vec();
127
128 let right_join_keys = node
129 .get_right_key()
130 .iter()
131 .map(|key| *key as usize)
132 .collect_vec();
133
134 let null_safe = node.get_null_safe().clone();
135
136 let join_key_data_types = left_join_keys
137 .iter()
138 .map(|idx| source_l.schema().fields[*idx].data_type())
139 .collect_vec();
140
141 let memo_table = node.get_memo_table();
142 let memo_table = match memo_table {
143 Ok(memo_table) => {
144 let vnodes = Arc::new(
145 params
146 .vnode_bitmap
147 .expect("vnodes not set for temporal join"),
148 );
149 Some(
150 StateTableBuilder::new(memo_table, store.clone(), Some(vnodes.clone()))
151 .enable_preload_all_rows_by_config(¶ms.config)
152 .build()
153 .await,
154 )
155 }
156 Err(_) => None,
157 };
158 let append_only = memo_table.is_none();
159
160 macro_rules! build_hash {
161 ($SD:ident) => {{
162 let right_table =
163 StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
164 table_desc,
165 store.clone(),
166 vnodes.clone(),
167 params.fragment_id.as_raw_id(),
168 )
169 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
170 .with_output_column_ids(output_column_ids.clone())
171 .forbid_preload_all_rows()
172 .build()
173 .await;
174
175 let dispatcher_args = TemporalJoinExecutorDispatcherArgs::<_, $SD> {
176 ctx: params.actor_context,
177 info: params.info.clone(),
178 left: source_l,
179 right: source_r,
180 right_table,
181 left_join_keys,
182 right_join_keys,
183 null_safe,
184 condition,
185 output_indices,
186 table_output_indices,
187 table_stream_key_indices,
188 watermark_epoch: params.watermark_epoch,
189 chunk_size: params.config.developer.chunk_size,
190 metrics: params.executor_stats,
191 join_type_proto: node.get_join_type()?,
192 join_key_data_types,
193 memo_table,
194 append_only,
195 };
196
197 Ok((params.info, dispatcher_args.dispatch()?).into())
198 }};
199 }
200 if versioned {
201 build_hash!(ColumnAwareSerde)
202 } else {
203 build_hash!(BasicSerde)
204 }
205 }
206 }
207}
208
209struct TemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
210 ctx: ActorContextRef,
211 info: ExecutorInfo,
212 left: Executor,
213 right: Executor,
214 right_table: ReplicatedStateTable<S, SD>,
215 left_join_keys: Vec<usize>,
216 right_join_keys: Vec<usize>,
217 null_safe: Vec<bool>,
218 condition: Option<NonStrictExpression>,
219 output_indices: Vec<usize>,
220 table_output_indices: Vec<usize>,
221 table_stream_key_indices: Vec<usize>,
222 watermark_epoch: AtomicU64Ref,
223 chunk_size: usize,
224 metrics: Arc<StreamingMetrics>,
225 join_type_proto: JoinTypeProto,
226 join_key_data_types: Vec<DataType>,
227 memo_table: Option<StateTable<S>>,
228 append_only: bool,
229}
230
231impl<S: StateStore, SD: ValueRowSerde> HashKeyDispatcher
232 for TemporalJoinExecutorDispatcherArgs<S, SD>
233{
234 type Output = StreamResult<Box<dyn Execute>>;
235
236 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
237 macro_rules! build {
239 ($join_type:ident, $append_only:ident) => {
240 Ok(Box::new(TemporalJoinExecutor::<
241 K,
242 S,
243 SD,
244 { JoinType::$join_type },
245 { $append_only },
246 >::new(
247 self.ctx,
248 self.info,
249 self.left,
250 self.right,
251 self.right_table,
252 self.left_join_keys,
253 self.right_join_keys,
254 self.null_safe,
255 self.condition,
256 self.output_indices,
257 self.table_output_indices,
258 self.table_stream_key_indices,
259 self.watermark_epoch,
260 self.metrics,
261 self.chunk_size,
262 self.join_key_data_types,
263 self.memo_table,
264 )))
265 };
266 }
267 match self.join_type_proto {
268 JoinTypeProto::Inner => {
269 if self.append_only {
270 build!(Inner, true)
271 } else {
272 build!(Inner, false)
273 }
274 }
275 JoinTypeProto::LeftOuter => {
276 if self.append_only {
277 build!(LeftOuter, true)
278 } else {
279 build!(LeftOuter, false)
280 }
281 }
282 _ => unreachable!(),
283 }
284 }
285
286 fn data_types(&self) -> &[DataType] {
287 &self.join_key_data_types
288 }
289}
290
291struct NestedLoopTemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
292 ctx: ActorContextRef,
293 info: ExecutorInfo,
294 left: Executor,
295 right: Executor,
296 right_table: ReplicatedStateTable<S, SD>,
297 condition: Option<NonStrictExpression>,
298 output_indices: Vec<usize>,
299 chunk_size: usize,
300 metrics: Arc<StreamingMetrics>,
301 join_type_proto: JoinTypeProto,
302}
303
304impl<S: StateStore, SD: ValueRowSerde> NestedLoopTemporalJoinExecutorDispatcherArgs<S, SD> {
305 fn dispatch(self) -> StreamResult<Box<dyn Execute>> {
306 macro_rules! build {
308 ($join_type:ident) => {
309 Ok(Box::new(NestedLoopTemporalJoinExecutor::<
310 S,
311 SD,
312 { JoinType::$join_type },
313 >::new(
314 self.ctx,
315 self.info,
316 self.left,
317 self.right,
318 self.right_table,
319 self.condition,
320 self.output_indices,
321 self.metrics,
322 self.chunk_size,
323 )))
324 };
325 }
326 match self.join_type_proto {
327 JoinTypeProto::Inner => {
328 build!(Inner)
329 }
330 JoinTypeProto::LeftOuter => {
331 build!(LeftOuter)
332 }
333 _ => unreachable!(),
334 }
335 }
336}