risingwave_stream/from_proto/
temporal_join.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::ColumnId;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::value_encoding::BasicSerde;
21use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
22use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
23use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc};
24use risingwave_storage::row_serde::value_serde::ValueRowSerde;
25
26use super::*;
27use crate::common::table::state_table::{
28    ReplicatedStateTable, StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
29};
30use crate::executor::monitor::StreamingMetrics;
31use crate::executor::{
32    ActorContextRef, JoinType, NestedLoopTemporalJoinExecutor, TemporalJoinExecutor,
33};
34use crate::task::AtomicU64Ref;
35
36pub struct TemporalJoinExecutorBuilder;
37
38impl ExecutorBuilder for TemporalJoinExecutorBuilder {
39    type Node = TemporalJoinNode;
40
41    async fn new_boxed_executor(
42        params: ExecutorParams,
43        node: &Self::Node,
44        store: impl StateStore,
45    ) -> StreamResult<Executor> {
46        let table_desc: &StorageTableDesc = node.get_table_desc()?;
47        let condition = match node.get_condition() {
48            Ok(cond_prost) => Some(build_non_strict_from_prost(
49                cond_prost,
50                params.eval_error_report,
51            )?),
52            Err(_) => None,
53        };
54
55        let table_output_indices = node
56            .get_table_output_indices()
57            .iter()
58            .map(|&x| x as usize)
59            .collect_vec();
60
61        let output_indices = node
62            .get_output_indices()
63            .iter()
64            .map(|&x| x as usize)
65            .collect_vec();
66        let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
67
68        let versioned = table_desc.versioned;
69        // Use table_output_indices to select only the column IDs that the right-side
70        // upstream actually delivers. The planner may prune unused columns from the
71        // right table scan, so the upstream chunks may have fewer columns than the
72        // full table.
73        let output_column_ids = table_output_indices
74            .iter()
75            .map(|&x| ColumnId::new(table_desc.columns[x].column_id))
76            .collect_vec();
77        let vnodes = params.vnode_bitmap.clone().map(Arc::new);
78
79        if node.get_is_nested_loop() {
80            macro_rules! build_nested_loop {
81                ($SD:ident) => {{
82                    let right_table =
83                        StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
84                            table_desc,
85                            store.clone(),
86                            vnodes.clone(),
87                            params.fragment_id,
88                        )
89                        .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
90                        .with_output_column_ids(output_column_ids.clone())
91                        .forbid_preload_all_rows()
92                        .build()
93                        .await;
94
95                    let dispatcher_args = NestedLoopTemporalJoinExecutorDispatcherArgs {
96                        ctx: params.actor_context,
97                        info: params.info.clone(),
98                        left: source_l,
99                        right: source_r,
100                        right_table,
101                        condition,
102                        output_indices,
103                        chunk_size: params.config.developer.chunk_size,
104                        metrics: params.executor_stats,
105                        join_type_proto: node.get_join_type()?,
106                    };
107                    Ok((params.info, dispatcher_args.dispatch()?).into())
108                }};
109            }
110            if versioned {
111                build_nested_loop!(ColumnAwareSerde)
112            } else {
113                build_nested_loop!(BasicSerde)
114            }
115        } else {
116            // `ReplicatedStateTable::iter_with_prefix` returns rows in `table_output_indices`
117            // order. The hash temporal join cache therefore needs stream-key positions within
118            // that projected row, not within the full table schema.
119            let table_stream_key_indices = table_desc
120                .stream_key
121                .iter()
122                .map(|&k| {
123                    table_output_indices
124                        .iter()
125                        .position(|&idx| idx == k as usize)
126                        .expect("stream key should be included in table output")
127                })
128                .collect_vec();
129
130            let left_join_keys = node
131                .get_left_key()
132                .iter()
133                .map(|key| *key as usize)
134                .collect_vec();
135
136            let right_join_keys = node
137                .get_right_key()
138                .iter()
139                .map(|key| *key as usize)
140                .collect_vec();
141
142            let null_safe = node.get_null_safe().clone();
143
144            let join_key_data_types = left_join_keys
145                .iter()
146                .map(|idx| source_l.schema().fields[*idx].data_type())
147                .collect_vec();
148
149            let memo_table = node.get_memo_table();
150            let memo_table = match memo_table {
151                Ok(memo_table) => {
152                    let vnodes = Arc::new(
153                        params
154                            .vnode_bitmap
155                            .expect("vnodes not set for temporal join"),
156                    );
157                    Some(
158                        StateTableBuilder::new(memo_table, store.clone(), Some(vnodes.clone()))
159                            .enable_preload_all_rows_by_config(&params.config)
160                            .build()
161                            .await,
162                    )
163                }
164                Err(_) => None,
165            };
166            let append_only = memo_table.is_none();
167
168            macro_rules! build_hash {
169                ($SD:ident) => {{
170                    let right_table =
171                        StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
172                            table_desc,
173                            store.clone(),
174                            vnodes.clone(),
175                            params.fragment_id,
176                        )
177                        .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
178                        .with_output_column_ids(output_column_ids.clone())
179                        .forbid_preload_all_rows()
180                        .build()
181                        .await;
182
183                    let dispatcher_args = TemporalJoinExecutorDispatcherArgs::<_, $SD> {
184                        ctx: params.actor_context,
185                        info: params.info.clone(),
186                        left: source_l,
187                        right: source_r,
188                        right_table,
189                        left_join_keys,
190                        right_join_keys,
191                        null_safe,
192                        condition,
193                        output_indices,
194                        table_stream_key_indices,
195                        watermark_epoch: params.watermark_epoch,
196                        chunk_size: params.config.developer.chunk_size,
197                        metrics: params.executor_stats,
198                        join_type_proto: node.get_join_type()?,
199                        join_key_data_types,
200                        memo_table,
201                        append_only,
202                    };
203
204                    Ok((params.info, dispatcher_args.dispatch()?).into())
205                }};
206            }
207            if versioned {
208                build_hash!(ColumnAwareSerde)
209            } else {
210                build_hash!(BasicSerde)
211            }
212        }
213    }
214}
215
216struct TemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
217    ctx: ActorContextRef,
218    info: ExecutorInfo,
219    left: Executor,
220    right: Executor,
221    right_table: ReplicatedStateTable<S, SD>,
222    left_join_keys: Vec<usize>,
223    right_join_keys: Vec<usize>,
224    null_safe: Vec<bool>,
225    condition: Option<NonStrictExpression>,
226    output_indices: Vec<usize>,
227    table_stream_key_indices: Vec<usize>,
228    watermark_epoch: AtomicU64Ref,
229    chunk_size: usize,
230    metrics: Arc<StreamingMetrics>,
231    join_type_proto: JoinTypeProto,
232    join_key_data_types: Vec<DataType>,
233    memo_table: Option<StateTable<S>>,
234    append_only: bool,
235}
236
237impl<S: StateStore, SD: ValueRowSerde> HashKeyDispatcher
238    for TemporalJoinExecutorDispatcherArgs<S, SD>
239{
240    type Output = StreamResult<Box<dyn Execute>>;
241
242    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
243        /// This macro helps to fill the const generic type parameter.
244        macro_rules! build {
245            ($join_type:ident, $append_only:ident) => {
246                Ok(Box::new(TemporalJoinExecutor::<
247                    K,
248                    S,
249                    SD,
250                    { JoinType::$join_type },
251                    { $append_only },
252                >::new(
253                    self.ctx,
254                    self.info,
255                    self.left,
256                    self.right,
257                    self.right_table,
258                    self.left_join_keys,
259                    self.right_join_keys,
260                    self.null_safe,
261                    self.condition,
262                    self.output_indices,
263                    self.table_stream_key_indices,
264                    self.watermark_epoch,
265                    self.metrics,
266                    self.chunk_size,
267                    self.join_key_data_types,
268                    self.memo_table,
269                )))
270            };
271        }
272        match self.join_type_proto {
273            JoinTypeProto::Inner => {
274                if self.append_only {
275                    build!(Inner, true)
276                } else {
277                    build!(Inner, false)
278                }
279            }
280            JoinTypeProto::LeftOuter => {
281                if self.append_only {
282                    build!(LeftOuter, true)
283                } else {
284                    build!(LeftOuter, false)
285                }
286            }
287            _ => unreachable!(),
288        }
289    }
290
291    fn data_types(&self) -> &[DataType] {
292        &self.join_key_data_types
293    }
294}
295
296struct NestedLoopTemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
297    ctx: ActorContextRef,
298    info: ExecutorInfo,
299    left: Executor,
300    right: Executor,
301    right_table: ReplicatedStateTable<S, SD>,
302    condition: Option<NonStrictExpression>,
303    output_indices: Vec<usize>,
304    chunk_size: usize,
305    metrics: Arc<StreamingMetrics>,
306    join_type_proto: JoinTypeProto,
307}
308
309impl<S: StateStore, SD: ValueRowSerde> NestedLoopTemporalJoinExecutorDispatcherArgs<S, SD> {
310    fn dispatch(self) -> StreamResult<Box<dyn Execute>> {
311        /// This macro helps to fill the const generic type parameter.
312        macro_rules! build {
313            ($join_type:ident) => {
314                Ok(Box::new(NestedLoopTemporalJoinExecutor::<
315                    S,
316                    SD,
317                    { JoinType::$join_type },
318                >::new(
319                    self.ctx,
320                    self.info,
321                    self.left,
322                    self.right,
323                    self.right_table,
324                    self.condition,
325                    self.output_indices,
326                    self.metrics,
327                    self.chunk_size,
328                )))
329            };
330        }
331        match self.join_type_proto {
332            JoinTypeProto::Inner => {
333                build!(Inner)
334            }
335            JoinTypeProto::LeftOuter => {
336                build!(LeftOuter)
337            }
338            _ => unreachable!(),
339        }
340    }
341}