risingwave_stream/from_proto/
asof_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_pb::plan_common::AsOfJoinType as JoinTypeProto;
19use risingwave_pb::stream_plan::AsOfJoinNode;
20
21use super::*;
22use crate::common::table::state_table::StateTableBuilder;
23use crate::executor::asof_join::*;
24use crate::executor::{AsOfCpuEncoding, AsOfDesc, AsOfJoinType, AsOfMemoryEncoding, JoinType};
25
26pub struct AsOfJoinExecutorBuilder;
27
28impl ExecutorBuilder for AsOfJoinExecutorBuilder {
29    type Node = AsOfJoinNode;
30
31    async fn new_boxed_executor(
32        params: ExecutorParams,
33        node: &Self::Node,
34        store: impl StateStore,
35    ) -> StreamResult<Executor> {
36        // This assert is to make sure AsOf join can use `JoinChunkBuilder` as Hash join.
37        assert_eq!(AsOfJoinType::Inner, JoinType::Inner);
38        assert_eq!(AsOfJoinType::LeftOuter, JoinType::LeftOuter);
39        let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for AsOf join"));
40
41        let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
42
43        let table_l = node.get_left_table()?;
44        let table_r = node.get_right_table()?;
45
46        let params_l = JoinParams::new(
47            node.get_left_key()
48                .iter()
49                .map(|key| *key as usize)
50                .collect_vec(),
51            node.get_left_deduped_input_pk_indices()
52                .iter()
53                .map(|key| *key as usize)
54                .collect_vec(),
55        );
56        let params_r = JoinParams::new(
57            node.get_right_key()
58                .iter()
59                .map(|key| *key as usize)
60                .collect_vec(),
61            node.get_right_deduped_input_pk_indices()
62                .iter()
63                .map(|key| *key as usize)
64                .collect_vec(),
65        );
66        let output_indices = node
67            .get_output_indices()
68            .iter()
69            .map(|&x| x as usize)
70            .collect_vec();
71
72        let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
73            .enable_preload_all_rows_by_config(&params.config)
74            .build()
75            .await;
76
77        let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
78            .enable_preload_all_rows_by_config(&params.config)
79            .build()
80            .await;
81
82        let join_type_proto = node.get_join_type()?;
83        let as_of_desc_proto = node.get_asof_desc()?;
84        let asof_desc = AsOfDesc::from_protobuf(as_of_desc_proto)?;
85
86        let null_safe = node.get_null_safe().clone();
87        let use_cache = node.use_cache.unwrap_or(true);
88
89        // Previously, the `join_encoding_type` is persisted in the plan node.
90        // Now it's always `Unspecified` and we should refer to the job's config override.
91        #[expect(deprecated)]
92        let join_encoding_type = node
93            .get_join_encoding_type()
94            .map_or(params.config.developer.join_encoding_type, Into::into);
95
96        macro_rules! build {
97            ($join_type:ident, $encoding:ident) => {
98                AsOfJoinExecutor::<_, { AsOfJoinType::$join_type }, $encoding>::new(
99                    params.actor_context,
100                    params.info.clone(),
101                    source_l,
102                    source_r,
103                    params_l,
104                    params_r,
105                    null_safe,
106                    output_indices,
107                    state_table_l,
108                    state_table_r,
109                    params.watermark_epoch,
110                    params.executor_stats,
111                    params.config.developer.chunk_size,
112                    asof_desc,
113                    use_cache,
114                    params.config.developer.high_join_amplification_threshold,
115                )
116                .boxed()
117            };
118        }
119
120        let exec = match (join_type_proto, join_encoding_type) {
121            (JoinTypeProto::Inner, JoinEncodingType::Memory) => {
122                build!(Inner, AsOfMemoryEncoding)
123            }
124            (JoinTypeProto::Inner, JoinEncodingType::Cpu) => {
125                build!(Inner, AsOfCpuEncoding)
126            }
127            (JoinTypeProto::LeftOuter, JoinEncodingType::Memory) => {
128                build!(LeftOuter, AsOfMemoryEncoding)
129            }
130            (JoinTypeProto::LeftOuter, JoinEncodingType::Cpu) => {
131                build!(LeftOuter, AsOfCpuEncoding)
132            }
133            (JoinTypeProto::Unspecified, _) => unreachable!(),
134        };
135        Ok((params.info, exec).into())
136    }
137}