risingwave_stream/from_proto/
hash_join.rs1use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
21use risingwave_pb::plan_common::JoinType as JoinTypeProto;
22use risingwave_pb::stream_plan::{
23 HashJoinNode, HashJoinWatermarkHandleDesc, JoinKeyWatermarkIndex,
24};
25
26use super::*;
27use crate::common::table::state_table::{StateTable, StateTableBuilder};
28use crate::executor::hash_join::*;
29use crate::executor::monitor::StreamingMetrics;
30use crate::executor::{ActorContextRef, CpuEncoding, JoinType, MemoryEncoding};
31use crate::task::AtomicU64Ref;
32
33pub struct HashJoinExecutorBuilder;
34
35impl_stream_node_body!(HashJoin(HashJoinNode) => HashJoinExecutorBuilder);
36
37impl ExecutorBuilder for HashJoinExecutorBuilder {
38 type Node = HashJoinNode;
39
40 async fn new_boxed_executor(
41 params: ExecutorParams,
42 node: &Self::Node,
43 store: impl StateStore,
44 ) -> StreamResult<Executor> {
45 let is_append_only = node.is_append_only;
46 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for hash join"));
47
48 let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
49
50 let table_l = node.get_left_table()?;
51 let degree_table_l = node.get_left_degree_table()?;
52
53 let table_r = node.get_right_table()?;
54 let degree_table_r = node.get_right_degree_table()?;
55
56 let params_l = JoinParams::new(
57 node.get_left_key()
58 .iter()
59 .map(|key| *key as usize)
60 .collect_vec(),
61 node.get_left_deduped_input_pk_indices()
62 .iter()
63 .map(|key| *key as usize)
64 .collect_vec(),
65 );
66 let params_r = JoinParams::new(
67 node.get_right_key()
68 .iter()
69 .map(|key| *key as usize)
70 .collect_vec(),
71 node.get_right_deduped_input_pk_indices()
72 .iter()
73 .map(|key| *key as usize)
74 .collect_vec(),
75 );
76 let null_safe = node.get_null_safe().clone();
77 let output_indices = node
78 .get_output_indices()
79 .iter()
80 .map(|&x| x as usize)
81 .collect_vec();
82
83 let condition = match node.get_condition() {
84 Ok(cond_prost) => Some(build_non_strict_from_prost(
85 cond_prost,
86 params.eval_error_report.clone(),
87 )?),
88 Err(_) => None,
89 };
90 trace!("Join non-equi condition: {:?}", condition);
91 let mut inequality_pairs = vec![];
92 if let Some(desc) = node.watermark_handle_desc.as_ref() {
93 inequality_pairs = desc
94 .inequality_pairs
95 .iter()
96 .map(|inequality_pair| InequalityPairInfo {
97 left_idx: inequality_pair.left_idx as usize,
98 right_idx: inequality_pair.right_idx as usize,
99 clean_left_state: inequality_pair.clean_left_state,
100 clean_right_state: inequality_pair.clean_right_state,
101 op: inequality_pair.op(),
102 })
103 .collect();
104 }
105
106 let watermark_indices_in_jk = resolve_clean_watermark_indices_in_jk(
107 node.watermark_handle_desc.as_ref(),
108 params_l.join_key_indices.len(),
109 );
110
111 let join_key_data_types = params_l
112 .join_key_indices
113 .iter()
114 .map(|idx| source_l.schema().fields[*idx].data_type())
115 .collect_vec();
116
117 let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
118 .enable_preload_all_rows_by_config(¶ms.config)
119 .build()
120 .await;
121 let degree_state_table_l =
122 StateTableBuilder::new(degree_table_l, store.clone(), Some(vnodes.clone()))
123 .enable_preload_all_rows_by_config(¶ms.config)
124 .build()
125 .await;
126
127 let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
128 .enable_preload_all_rows_by_config(¶ms.config)
129 .build()
130 .await;
131 let degree_state_table_r = StateTableBuilder::new(degree_table_r, store, Some(vnodes))
132 .enable_preload_all_rows_by_config(¶ms.config)
133 .build()
134 .await;
135
136 #[expect(deprecated)]
139 let join_encoding_type = node
140 .get_join_encoding_type()
141 .map_or(params.config.developer.join_encoding_type, Into::into);
142
143 let args = HashJoinExecutorDispatcherArgs {
144 ctx: params.actor_context,
145 info: params.info.clone(),
146 source_l,
147 source_r,
148 params_l,
149 params_r,
150 null_safe,
151 output_indices,
152 cond: condition,
153 inequality_pairs,
154 state_table_l,
155 degree_state_table_l,
156 state_table_r,
157 degree_state_table_r,
158 lru_manager: params.watermark_epoch,
159 is_append_only,
160 metrics: params.executor_stats,
161 join_type_proto: node.get_join_type()?,
162 join_key_data_types,
163 chunk_size: params.config.developer.chunk_size,
164 high_join_amplification_threshold: (params.config.developer)
165 .high_join_amplification_threshold,
166 join_encoding_type,
167 watermark_indices_in_jk,
168 };
169
170 let exec = args.dispatch()?;
171 Ok((params.info, exec).into())
172 }
173}
174
175struct HashJoinExecutorDispatcherArgs<S: StateStore> {
176 ctx: ActorContextRef,
177 info: ExecutorInfo,
178 source_l: Executor,
179 source_r: Executor,
180 params_l: JoinParams,
181 params_r: JoinParams,
182 null_safe: Vec<bool>,
183 output_indices: Vec<usize>,
184 cond: Option<NonStrictExpression>,
185 inequality_pairs: Vec<InequalityPairInfo>,
186 state_table_l: StateTable<S>,
187 degree_state_table_l: StateTable<S>,
188 state_table_r: StateTable<S>,
189 degree_state_table_r: StateTable<S>,
190 lru_manager: AtomicU64Ref,
191 is_append_only: bool,
192 metrics: Arc<StreamingMetrics>,
193 join_type_proto: JoinTypeProto,
194 join_key_data_types: Vec<DataType>,
195 chunk_size: usize,
196 high_join_amplification_threshold: usize,
197 join_encoding_type: JoinEncodingType,
198 watermark_indices_in_jk: Vec<(usize, bool)>,
199}
200
201fn resolve_clean_watermark_indices_in_jk(
202 desc: Option<&HashJoinWatermarkHandleDesc>,
203 join_key_len: usize,
204) -> Vec<(usize, bool)> {
205 if let Some(desc) = desc {
206 return desc
207 .watermark_indices_in_jk
208 .iter()
209 .map(
210 |JoinKeyWatermarkIndex {
211 index,
212 do_state_cleaning,
213 }| (*index as usize, *do_state_cleaning),
214 )
215 .collect_vec();
216 }
217 if join_key_len > 0 {
220 vec![(0, true)]
221 } else {
222 vec![]
223 }
224}
225
226impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
227 type Output = StreamResult<Box<dyn Execute>>;
228
229 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
230 macro_rules! build {
232 ($join_type:ident, $join_encoding:ident) => {
233 Ok(
234 HashJoinExecutor::<K, S, { JoinType::$join_type }, $join_encoding>::new(
235 self.ctx,
236 self.info,
237 self.source_l,
238 self.source_r,
239 self.params_l,
240 self.params_r,
241 self.null_safe,
242 self.output_indices,
243 self.cond,
244 self.inequality_pairs,
245 self.state_table_l,
246 self.degree_state_table_l,
247 self.state_table_r,
248 self.degree_state_table_r,
249 self.lru_manager,
250 self.is_append_only,
251 self.metrics,
252 self.chunk_size,
253 self.high_join_amplification_threshold,
254 self.watermark_indices_in_jk.clone(),
255 )
256 .boxed(),
257 )
258 };
259 }
260
261 macro_rules! build_match {
262 ($($join_type:ident),*) => {
263 match (self.join_type_proto, self.join_encoding_type) {
264 (JoinTypeProto::AsofInner, _)
265 | (JoinTypeProto::AsofLeftOuter, _)
266 | (JoinTypeProto::Unspecified, _) => unreachable!(),
267 $(
268 (JoinTypeProto::$join_type, JoinEncodingType::Memory) => build!($join_type, MemoryEncoding),
269 (JoinTypeProto::$join_type, JoinEncodingType::Cpu) => build!($join_type, CpuEncoding),
270 )*
271 }
272 };
273 }
274 build_match! {
275 Inner,
276 LeftOuter,
277 RightOuter,
278 FullOuter,
279 LeftSemi,
280 LeftAnti,
281 RightSemi,
282 RightAnti
283 }
284 }
285
286 fn data_types(&self) -> &[DataType] {
287 &self.join_key_data_types
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use risingwave_pb::stream_plan::{HashJoinWatermarkHandleDesc, JoinKeyWatermarkIndex};
294
295 use super::resolve_clean_watermark_indices_in_jk;
296
297 #[test]
298 fn test_resolve_clean_watermark_indices_in_jk_from_desc() {
299 let desc = HashJoinWatermarkHandleDesc {
300 watermark_indices_in_jk: vec![
301 JoinKeyWatermarkIndex {
302 index: 2,
303 do_state_cleaning: true,
304 },
305 JoinKeyWatermarkIndex {
306 index: 1,
307 do_state_cleaning: false,
308 },
309 ],
310 inequality_pairs: vec![],
311 };
312 assert_eq!(
313 resolve_clean_watermark_indices_in_jk(Some(&desc), 3),
314 vec![(2, true), (1, false)]
315 );
316 }
317
318 #[test]
319 fn test_resolve_clean_watermark_indices_in_jk_default_first() {
320 assert_eq!(
321 resolve_clean_watermark_indices_in_jk(None, 2),
322 vec![(0, true)]
323 );
324 }
325
326 #[test]
327 fn test_resolve_clean_watermark_indices_in_jk_empty_on_no_keys() {
328 assert_eq!(resolve_clean_watermark_indices_in_jk(None, 0), vec![]);
329 }
330}