risingwave_stream/from_proto/
asof_join.rs1use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_pb::plan_common::AsOfJoinType as JoinTypeProto;
19use risingwave_pb::stream_plan::AsOfJoinNode;
20
21use super::*;
22use crate::common::table::state_table::StateTableBuilder;
23use crate::executor::asof_join::*;
24use crate::executor::{AsOfCpuEncoding, AsOfDesc, AsOfJoinType, AsOfMemoryEncoding, JoinType};
25
26pub struct AsOfJoinExecutorBuilder;
27
28impl ExecutorBuilder for AsOfJoinExecutorBuilder {
29 type Node = AsOfJoinNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 assert_eq!(AsOfJoinType::Inner, JoinType::Inner);
38 assert_eq!(AsOfJoinType::LeftOuter, JoinType::LeftOuter);
39 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for AsOf join"));
40
41 let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
42
43 let table_l = node.get_left_table()?;
44 let table_r = node.get_right_table()?;
45
46 let params_l = JoinParams::new(
47 node.get_left_key()
48 .iter()
49 .map(|key| *key as usize)
50 .collect_vec(),
51 node.get_left_deduped_input_pk_indices()
52 .iter()
53 .map(|key| *key as usize)
54 .collect_vec(),
55 );
56 let params_r = JoinParams::new(
57 node.get_right_key()
58 .iter()
59 .map(|key| *key as usize)
60 .collect_vec(),
61 node.get_right_deduped_input_pk_indices()
62 .iter()
63 .map(|key| *key as usize)
64 .collect_vec(),
65 );
66 let output_indices = node
67 .get_output_indices()
68 .iter()
69 .map(|&x| x as usize)
70 .collect_vec();
71
72 let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
73 .enable_preload_all_rows_by_config(¶ms.config)
74 .build()
75 .await;
76
77 let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
78 .enable_preload_all_rows_by_config(¶ms.config)
79 .build()
80 .await;
81
82 let join_type_proto = node.get_join_type()?;
83 let as_of_desc_proto = node.get_asof_desc()?;
84 let asof_desc = AsOfDesc::from_protobuf(as_of_desc_proto)?;
85
86 let null_safe = node.get_null_safe().clone();
87 let use_cache = node.use_cache.unwrap_or(true);
88
89 #[expect(deprecated)]
92 let join_encoding_type = node
93 .get_join_encoding_type()
94 .map_or(params.config.developer.join_encoding_type, Into::into);
95
96 macro_rules! build {
97 ($join_type:ident, $encoding:ident) => {
98 AsOfJoinExecutor::<_, { AsOfJoinType::$join_type }, $encoding>::new(
99 params.actor_context,
100 params.info.clone(),
101 source_l,
102 source_r,
103 params_l,
104 params_r,
105 null_safe,
106 output_indices,
107 state_table_l,
108 state_table_r,
109 params.watermark_epoch,
110 params.executor_stats,
111 params.config.developer.chunk_size,
112 asof_desc,
113 use_cache,
114 params.config.developer.high_join_amplification_threshold,
115 )
116 .boxed()
117 };
118 }
119
120 let exec = match (join_type_proto, join_encoding_type) {
121 (JoinTypeProto::Inner, JoinEncodingType::Memory) => {
122 build!(Inner, AsOfMemoryEncoding)
123 }
124 (JoinTypeProto::Inner, JoinEncodingType::Cpu) => {
125 build!(Inner, AsOfCpuEncoding)
126 }
127 (JoinTypeProto::LeftOuter, JoinEncodingType::Memory) => {
128 build!(LeftOuter, AsOfMemoryEncoding)
129 }
130 (JoinTypeProto::LeftOuter, JoinEncodingType::Cpu) => {
131 build!(LeftOuter, AsOfCpuEncoding)
132 }
133 (JoinTypeProto::Unspecified, _) => unreachable!(),
134 };
135 Ok((params.info, exec).into())
136 }
137}