risingwave_stream/from_proto/
temporal_join.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::ColumnId;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::value_encoding::BasicSerde;
21use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
22use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
23use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc};
24use risingwave_storage::row_serde::value_serde::ValueRowSerde;
25
26use super::*;
27use crate::common::table::state_table::{
28    ReplicatedStateTable, StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
29};
30use crate::executor::monitor::StreamingMetrics;
31use crate::executor::{
32    ActorContextRef, JoinType, NestedLoopTemporalJoinExecutor, TemporalJoinExecutor,
33};
34use crate::task::AtomicU64Ref;
35
36pub struct TemporalJoinExecutorBuilder;
37
38impl ExecutorBuilder for TemporalJoinExecutorBuilder {
39    type Node = TemporalJoinNode;
40
41    async fn new_boxed_executor(
42        params: ExecutorParams,
43        node: &Self::Node,
44        store: impl StateStore,
45    ) -> StreamResult<Executor> {
46        let table_desc: &StorageTableDesc = node.get_table_desc()?;
47        let condition = match node.get_condition() {
48            Ok(cond_prost) => Some(build_non_strict_from_prost(
49                cond_prost,
50                params.eval_error_report,
51            )?),
52            Err(_) => None,
53        };
54
55        let table_output_indices = node
56            .get_table_output_indices()
57            .iter()
58            .map(|&x| x as usize)
59            .collect_vec();
60
61        let output_indices = node
62            .get_output_indices()
63            .iter()
64            .map(|&x| x as usize)
65            .collect_vec();
66        let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
67
68        let versioned = table_desc.versioned;
69        // Use table_output_indices to select only the column IDs that the right-side
70        // upstream actually delivers. The planner may prune unused columns from the
71        // right table scan, so the upstream chunks may have fewer columns than the
72        // full table.
73        let output_column_ids = table_output_indices
74            .iter()
75            .map(|&x| ColumnId::new(table_desc.columns[x].column_id))
76            .collect_vec();
77        let vnodes = params.vnode_bitmap.clone().map(Arc::new);
78
79        if node.get_is_nested_loop() {
80            macro_rules! build_nested_loop {
81                ($SD:ident) => {{
82                    let right_table =
83                        StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
84                            table_desc,
85                            store.clone(),
86                            vnodes.clone(),
87                            params.fragment_id.as_raw_id(),
88                        )
89                        .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
90                        .with_output_column_ids(output_column_ids.clone())
91                        .forbid_preload_all_rows()
92                        .build()
93                        .await;
94
95                    let dispatcher_args = NestedLoopTemporalJoinExecutorDispatcherArgs {
96                        ctx: params.actor_context,
97                        info: params.info.clone(),
98                        left: source_l,
99                        right: source_r,
100                        right_table,
101                        condition,
102                        output_indices,
103                        chunk_size: params.config.developer.chunk_size,
104                        metrics: params.executor_stats,
105                        join_type_proto: node.get_join_type()?,
106                    };
107                    Ok((params.info, dispatcher_args.dispatch()?).into())
108                }};
109            }
110            if versioned {
111                build_nested_loop!(ColumnAwareSerde)
112            } else {
113                build_nested_loop!(BasicSerde)
114            }
115        } else {
116            let table_stream_key_indices = table_desc
117                .stream_key
118                .iter()
119                .map(|&k| k as usize)
120                .collect_vec();
121
122            let left_join_keys = node
123                .get_left_key()
124                .iter()
125                .map(|key| *key as usize)
126                .collect_vec();
127
128            let right_join_keys = node
129                .get_right_key()
130                .iter()
131                .map(|key| *key as usize)
132                .collect_vec();
133
134            let null_safe = node.get_null_safe().clone();
135
136            let join_key_data_types = left_join_keys
137                .iter()
138                .map(|idx| source_l.schema().fields[*idx].data_type())
139                .collect_vec();
140
141            let memo_table = node.get_memo_table();
142            let memo_table = match memo_table {
143                Ok(memo_table) => {
144                    let vnodes = Arc::new(
145                        params
146                            .vnode_bitmap
147                            .expect("vnodes not set for temporal join"),
148                    );
149                    Some(
150                        StateTableBuilder::new(memo_table, store.clone(), Some(vnodes.clone()))
151                            .enable_preload_all_rows_by_config(&params.config)
152                            .build()
153                            .await,
154                    )
155                }
156                Err(_) => None,
157            };
158            let append_only = memo_table.is_none();
159
160            macro_rules! build_hash {
161                ($SD:ident) => {{
162                    let right_table =
163                        StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
164                            table_desc,
165                            store.clone(),
166                            vnodes.clone(),
167                            params.fragment_id.as_raw_id(),
168                        )
169                        .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
170                        .with_output_column_ids(output_column_ids.clone())
171                        .forbid_preload_all_rows()
172                        .build()
173                        .await;
174
175                    let dispatcher_args = TemporalJoinExecutorDispatcherArgs::<_, $SD> {
176                        ctx: params.actor_context,
177                        info: params.info.clone(),
178                        left: source_l,
179                        right: source_r,
180                        right_table,
181                        left_join_keys,
182                        right_join_keys,
183                        null_safe,
184                        condition,
185                        output_indices,
186                        table_output_indices,
187                        table_stream_key_indices,
188                        watermark_epoch: params.watermark_epoch,
189                        chunk_size: params.config.developer.chunk_size,
190                        metrics: params.executor_stats,
191                        join_type_proto: node.get_join_type()?,
192                        join_key_data_types,
193                        memo_table,
194                        append_only,
195                    };
196
197                    Ok((params.info, dispatcher_args.dispatch()?).into())
198                }};
199            }
200            if versioned {
201                build_hash!(ColumnAwareSerde)
202            } else {
203                build_hash!(BasicSerde)
204            }
205        }
206    }
207}
208
209struct TemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
210    ctx: ActorContextRef,
211    info: ExecutorInfo,
212    left: Executor,
213    right: Executor,
214    right_table: ReplicatedStateTable<S, SD>,
215    left_join_keys: Vec<usize>,
216    right_join_keys: Vec<usize>,
217    null_safe: Vec<bool>,
218    condition: Option<NonStrictExpression>,
219    output_indices: Vec<usize>,
220    table_output_indices: Vec<usize>,
221    table_stream_key_indices: Vec<usize>,
222    watermark_epoch: AtomicU64Ref,
223    chunk_size: usize,
224    metrics: Arc<StreamingMetrics>,
225    join_type_proto: JoinTypeProto,
226    join_key_data_types: Vec<DataType>,
227    memo_table: Option<StateTable<S>>,
228    append_only: bool,
229}
230
231impl<S: StateStore, SD: ValueRowSerde> HashKeyDispatcher
232    for TemporalJoinExecutorDispatcherArgs<S, SD>
233{
234    type Output = StreamResult<Box<dyn Execute>>;
235
236    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
237        /// This macro helps to fill the const generic type parameter.
238        macro_rules! build {
239            ($join_type:ident, $append_only:ident) => {
240                Ok(Box::new(TemporalJoinExecutor::<
241                    K,
242                    S,
243                    SD,
244                    { JoinType::$join_type },
245                    { $append_only },
246                >::new(
247                    self.ctx,
248                    self.info,
249                    self.left,
250                    self.right,
251                    self.right_table,
252                    self.left_join_keys,
253                    self.right_join_keys,
254                    self.null_safe,
255                    self.condition,
256                    self.output_indices,
257                    self.table_output_indices,
258                    self.table_stream_key_indices,
259                    self.watermark_epoch,
260                    self.metrics,
261                    self.chunk_size,
262                    self.join_key_data_types,
263                    self.memo_table,
264                )))
265            };
266        }
267        match self.join_type_proto {
268            JoinTypeProto::Inner => {
269                if self.append_only {
270                    build!(Inner, true)
271                } else {
272                    build!(Inner, false)
273                }
274            }
275            JoinTypeProto::LeftOuter => {
276                if self.append_only {
277                    build!(LeftOuter, true)
278                } else {
279                    build!(LeftOuter, false)
280                }
281            }
282            _ => unreachable!(),
283        }
284    }
285
286    fn data_types(&self) -> &[DataType] {
287        &self.join_key_data_types
288    }
289}
290
291struct NestedLoopTemporalJoinExecutorDispatcherArgs<S: StateStore, SD: ValueRowSerde> {
292    ctx: ActorContextRef,
293    info: ExecutorInfo,
294    left: Executor,
295    right: Executor,
296    right_table: ReplicatedStateTable<S, SD>,
297    condition: Option<NonStrictExpression>,
298    output_indices: Vec<usize>,
299    chunk_size: usize,
300    metrics: Arc<StreamingMetrics>,
301    join_type_proto: JoinTypeProto,
302}
303
304impl<S: StateStore, SD: ValueRowSerde> NestedLoopTemporalJoinExecutorDispatcherArgs<S, SD> {
305    fn dispatch(self) -> StreamResult<Box<dyn Execute>> {
306        /// This macro helps to fill the const generic type parameter.
307        macro_rules! build {
308            ($join_type:ident) => {
309                Ok(Box::new(NestedLoopTemporalJoinExecutor::<
310                    S,
311                    SD,
312                    { JoinType::$join_type },
313                >::new(
314                    self.ctx,
315                    self.info,
316                    self.left,
317                    self.right,
318                    self.right_table,
319                    self.condition,
320                    self.output_indices,
321                    self.metrics,
322                    self.chunk_size,
323                )))
324            };
325        }
326        match self.join_type_proto {
327            JoinTypeProto::Inner => {
328                build!(Inner)
329            }
330            JoinTypeProto::LeftOuter => {
331                build!(LeftOuter)
332            }
333            _ => unreachable!(),
334        }
335    }
336}