risingwave_stream/from_proto/
asof_join.rs1use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_pb::plan_common::AsOfJoinType as JoinTypeProto;
19use risingwave_pb::stream_plan::AsOfJoinNode;
20
21use super::*;
22use crate::common::table::state_table::StateTableBuilder;
23use crate::executor::asof_join::*;
24use crate::executor::{AsOfCpuEncoding, AsOfDesc, AsOfJoinType, AsOfMemoryEncoding, JoinType};
25
26pub struct AsOfJoinExecutorBuilder;
27
28impl_stream_node_body!(AsOfJoin(AsOfJoinNode) => AsOfJoinExecutorBuilder);
29
30impl ExecutorBuilder for AsOfJoinExecutorBuilder {
31 type Node = AsOfJoinNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &Self::Node,
36 store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 assert_eq!(AsOfJoinType::Inner, JoinType::Inner);
40 assert_eq!(AsOfJoinType::LeftOuter, JoinType::LeftOuter);
41 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for AsOf join"));
42
43 let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
44
45 let table_l = node.get_left_table()?;
46 let table_r = node.get_right_table()?;
47
48 let params_l = JoinParams::new(
49 node.get_left_key()
50 .iter()
51 .map(|key| *key as usize)
52 .collect_vec(),
53 node.get_left_deduped_input_pk_indices()
54 .iter()
55 .map(|key| *key as usize)
56 .collect_vec(),
57 );
58 let params_r = JoinParams::new(
59 node.get_right_key()
60 .iter()
61 .map(|key| *key as usize)
62 .collect_vec(),
63 node.get_right_deduped_input_pk_indices()
64 .iter()
65 .map(|key| *key as usize)
66 .collect_vec(),
67 );
68 let output_indices = node
69 .get_output_indices()
70 .iter()
71 .map(|&x| x as usize)
72 .collect_vec();
73
74 let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
75 .enable_preload_all_rows_by_config(¶ms.config)
76 .build()
77 .await;
78
79 let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
80 .enable_preload_all_rows_by_config(¶ms.config)
81 .build()
82 .await;
83
84 let join_type_proto = node.get_join_type()?;
85 let as_of_desc_proto = node.get_asof_desc()?;
86 let asof_desc = AsOfDesc::from_protobuf(as_of_desc_proto)?;
87
88 let null_safe = node.get_null_safe().clone();
89 let use_cache = node.use_cache.unwrap_or(true);
90
91 #[expect(deprecated)]
94 let join_encoding_type = node
95 .get_join_encoding_type()
96 .map_or(params.config.developer.join_encoding_type, Into::into);
97
98 macro_rules! build {
99 ($join_type:ident, $encoding:ident) => {
100 AsOfJoinExecutor::<_, { AsOfJoinType::$join_type }, $encoding>::new(
101 params.actor_context,
102 params.info.clone(),
103 source_l,
104 source_r,
105 params_l,
106 params_r,
107 null_safe,
108 output_indices,
109 state_table_l,
110 state_table_r,
111 params.watermark_epoch,
112 params.executor_stats,
113 params.config.developer.chunk_size,
114 asof_desc,
115 use_cache,
116 params.config.developer.high_join_amplification_threshold,
117 )
118 .boxed()
119 };
120 }
121
122 let exec = match (join_type_proto, join_encoding_type) {
123 (JoinTypeProto::Inner, JoinEncodingType::Memory) => {
124 build!(Inner, AsOfMemoryEncoding)
125 }
126 (JoinTypeProto::Inner, JoinEncodingType::Cpu) => {
127 build!(Inner, AsOfCpuEncoding)
128 }
129 (JoinTypeProto::LeftOuter, JoinEncodingType::Memory) => {
130 build!(LeftOuter, AsOfMemoryEncoding)
131 }
132 (JoinTypeProto::LeftOuter, JoinEncodingType::Cpu) => {
133 build!(LeftOuter, AsOfCpuEncoding)
134 }
135 (JoinTypeProto::Unspecified, _) => unreachable!(),
136 };
137 Ok((params.info, exec).into())
138 }
139}