Skip to main content

risingwave_stream/from_proto/
asof_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_pb::plan_common::AsOfJoinType as JoinTypeProto;
19use risingwave_pb::stream_plan::AsOfJoinNode;
20
21use super::*;
22use crate::common::table::state_table::StateTableBuilder;
23use crate::executor::asof_join::*;
24use crate::executor::{AsOfCpuEncoding, AsOfDesc, AsOfJoinType, AsOfMemoryEncoding, JoinType};
25
26pub struct AsOfJoinExecutorBuilder;
27
28impl_stream_node_body!(AsOfJoin(AsOfJoinNode) => AsOfJoinExecutorBuilder);
29
30impl ExecutorBuilder for AsOfJoinExecutorBuilder {
31    type Node = AsOfJoinNode;
32
33    async fn new_boxed_executor(
34        params: ExecutorParams,
35        node: &Self::Node,
36        store: impl StateStore,
37    ) -> StreamResult<Executor> {
38        // This assert is to make sure AsOf join can use `JoinChunkBuilder` as Hash join.
39        assert_eq!(AsOfJoinType::Inner, JoinType::Inner);
40        assert_eq!(AsOfJoinType::LeftOuter, JoinType::LeftOuter);
41        let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for AsOf join"));
42
43        let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
44
45        let table_l = node.get_left_table()?;
46        let table_r = node.get_right_table()?;
47
48        let params_l = JoinParams::new(
49            node.get_left_key()
50                .iter()
51                .map(|key| *key as usize)
52                .collect_vec(),
53            node.get_left_deduped_input_pk_indices()
54                .iter()
55                .map(|key| *key as usize)
56                .collect_vec(),
57        );
58        let params_r = JoinParams::new(
59            node.get_right_key()
60                .iter()
61                .map(|key| *key as usize)
62                .collect_vec(),
63            node.get_right_deduped_input_pk_indices()
64                .iter()
65                .map(|key| *key as usize)
66                .collect_vec(),
67        );
68        let output_indices = node
69            .get_output_indices()
70            .iter()
71            .map(|&x| x as usize)
72            .collect_vec();
73
74        let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
75            .enable_preload_all_rows_by_config(&params.config)
76            .build()
77            .await;
78
79        let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
80            .enable_preload_all_rows_by_config(&params.config)
81            .build()
82            .await;
83
84        let join_type_proto = node.get_join_type()?;
85        let as_of_desc_proto = node.get_asof_desc()?;
86        let asof_desc = AsOfDesc::from_protobuf(as_of_desc_proto)?;
87
88        let null_safe = node.get_null_safe().clone();
89        let use_cache = node.use_cache.unwrap_or(true);
90
91        // Previously, the `join_encoding_type` is persisted in the plan node.
92        // Now it's always `Unspecified` and we should refer to the job's config override.
93        #[expect(deprecated)]
94        let join_encoding_type = node
95            .get_join_encoding_type()
96            .map_or(params.config.developer.join_encoding_type, Into::into);
97
98        macro_rules! build {
99            ($join_type:ident, $encoding:ident) => {
100                AsOfJoinExecutor::<_, { AsOfJoinType::$join_type }, $encoding>::new(
101                    params.actor_context,
102                    params.info.clone(),
103                    source_l,
104                    source_r,
105                    params_l,
106                    params_r,
107                    null_safe,
108                    output_indices,
109                    state_table_l,
110                    state_table_r,
111                    params.watermark_epoch,
112                    params.executor_stats,
113                    params.config.developer.chunk_size,
114                    asof_desc,
115                    use_cache,
116                    params.config.developer.high_join_amplification_threshold,
117                )
118                .boxed()
119            };
120        }
121
122        let exec = match (join_type_proto, join_encoding_type) {
123            (JoinTypeProto::Inner, JoinEncodingType::Memory) => {
124                build!(Inner, AsOfMemoryEncoding)
125            }
126            (JoinTypeProto::Inner, JoinEncodingType::Cpu) => {
127                build!(Inner, AsOfCpuEncoding)
128            }
129            (JoinTypeProto::LeftOuter, JoinEncodingType::Memory) => {
130                build!(LeftOuter, AsOfMemoryEncoding)
131            }
132            (JoinTypeProto::LeftOuter, JoinEncodingType::Cpu) => {
133                build!(LeftOuter, AsOfCpuEncoding)
134            }
135            (JoinTypeProto::Unspecified, _) => unreachable!(),
136        };
137        Ok((params.info, exec).into())
138    }
139}