risingwave_stream/from_proto/
hash_join.rs1use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
21use risingwave_pb::plan_common::JoinType as JoinTypeProto;
22use risingwave_pb::stream_plan::{
23 HashJoinNode, HashJoinWatermarkHandleDesc, JoinKeyWatermarkIndex,
24};
25
26use super::*;
27use crate::common::table::state_table::{StateTable, StateTableBuilder};
28use crate::executor::hash_join::*;
29use crate::executor::monitor::StreamingMetrics;
30use crate::executor::{ActorContextRef, CpuEncoding, JoinType, MemoryEncoding};
31use crate::task::AtomicU64Ref;
32
33pub struct HashJoinExecutorBuilder;
34
35impl ExecutorBuilder for HashJoinExecutorBuilder {
36 type Node = HashJoinNode;
37
38 async fn new_boxed_executor(
39 params: ExecutorParams,
40 node: &Self::Node,
41 store: impl StateStore,
42 ) -> StreamResult<Executor> {
43 let is_append_only = node.is_append_only;
44 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for hash join"));
45
46 let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
47
48 let table_l = node.get_left_table()?;
49 let degree_table_l = node.get_left_degree_table()?;
50
51 let table_r = node.get_right_table()?;
52 let degree_table_r = node.get_right_degree_table()?;
53
54 let params_l = JoinParams::new(
55 node.get_left_key()
56 .iter()
57 .map(|key| *key as usize)
58 .collect_vec(),
59 node.get_left_deduped_input_pk_indices()
60 .iter()
61 .map(|key| *key as usize)
62 .collect_vec(),
63 );
64 let params_r = JoinParams::new(
65 node.get_right_key()
66 .iter()
67 .map(|key| *key as usize)
68 .collect_vec(),
69 node.get_right_deduped_input_pk_indices()
70 .iter()
71 .map(|key| *key as usize)
72 .collect_vec(),
73 );
74 let null_safe = node.get_null_safe().clone();
75 let output_indices = node
76 .get_output_indices()
77 .iter()
78 .map(|&x| x as usize)
79 .collect_vec();
80
81 let condition = match node.get_condition() {
82 Ok(cond_prost) => Some(build_non_strict_from_prost(
83 cond_prost,
84 params.eval_error_report.clone(),
85 )?),
86 Err(_) => None,
87 };
88 trace!("Join non-equi condition: {:?}", condition);
89 let mut inequality_pairs = vec![];
90 if let Some(desc) = node.watermark_handle_desc.as_ref() {
91 inequality_pairs = desc
92 .inequality_pairs
93 .iter()
94 .map(|inequality_pair| InequalityPairInfo {
95 left_idx: inequality_pair.left_idx as usize,
96 right_idx: inequality_pair.right_idx as usize,
97 clean_left_state: inequality_pair.clean_left_state,
98 clean_right_state: inequality_pair.clean_right_state,
99 op: inequality_pair.op(),
100 })
101 .collect();
102 }
103
104 let watermark_indices_in_jk = resolve_clean_watermark_indices_in_jk(
105 node.watermark_handle_desc.as_ref(),
106 params_l.join_key_indices.len(),
107 );
108
109 let join_key_data_types = params_l
110 .join_key_indices
111 .iter()
112 .map(|idx| source_l.schema().fields[*idx].data_type())
113 .collect_vec();
114
115 let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
116 .enable_preload_all_rows_by_config(¶ms.config)
117 .build()
118 .await;
119 let degree_state_table_l =
120 StateTableBuilder::new(degree_table_l, store.clone(), Some(vnodes.clone()))
121 .enable_preload_all_rows_by_config(¶ms.config)
122 .build()
123 .await;
124
125 let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
126 .enable_preload_all_rows_by_config(¶ms.config)
127 .build()
128 .await;
129 let degree_state_table_r = StateTableBuilder::new(degree_table_r, store, Some(vnodes))
130 .enable_preload_all_rows_by_config(¶ms.config)
131 .build()
132 .await;
133
134 #[allow(deprecated)]
137 let join_encoding_type = node
138 .get_join_encoding_type()
139 .map_or(params.config.developer.join_encoding_type, Into::into);
140
141 let args = HashJoinExecutorDispatcherArgs {
142 ctx: params.actor_context,
143 info: params.info.clone(),
144 source_l,
145 source_r,
146 params_l,
147 params_r,
148 null_safe,
149 output_indices,
150 cond: condition,
151 inequality_pairs,
152 state_table_l,
153 degree_state_table_l,
154 state_table_r,
155 degree_state_table_r,
156 lru_manager: params.watermark_epoch,
157 is_append_only,
158 metrics: params.executor_stats,
159 join_type_proto: node.get_join_type()?,
160 join_key_data_types,
161 chunk_size: params.config.developer.chunk_size,
162 high_join_amplification_threshold: (params.config.developer)
163 .high_join_amplification_threshold,
164 join_encoding_type,
165 watermark_indices_in_jk,
166 };
167
168 let exec = args.dispatch()?;
169 Ok((params.info, exec).into())
170 }
171}
172
173struct HashJoinExecutorDispatcherArgs<S: StateStore> {
174 ctx: ActorContextRef,
175 info: ExecutorInfo,
176 source_l: Executor,
177 source_r: Executor,
178 params_l: JoinParams,
179 params_r: JoinParams,
180 null_safe: Vec<bool>,
181 output_indices: Vec<usize>,
182 cond: Option<NonStrictExpression>,
183 inequality_pairs: Vec<InequalityPairInfo>,
184 state_table_l: StateTable<S>,
185 degree_state_table_l: StateTable<S>,
186 state_table_r: StateTable<S>,
187 degree_state_table_r: StateTable<S>,
188 lru_manager: AtomicU64Ref,
189 is_append_only: bool,
190 metrics: Arc<StreamingMetrics>,
191 join_type_proto: JoinTypeProto,
192 join_key_data_types: Vec<DataType>,
193 chunk_size: usize,
194 high_join_amplification_threshold: usize,
195 join_encoding_type: JoinEncodingType,
196 watermark_indices_in_jk: Vec<(usize, bool)>,
197}
198
199fn resolve_clean_watermark_indices_in_jk(
200 desc: Option<&HashJoinWatermarkHandleDesc>,
201 join_key_len: usize,
202) -> Vec<(usize, bool)> {
203 if let Some(desc) = desc {
204 return desc
205 .watermark_indices_in_jk
206 .iter()
207 .map(
208 |JoinKeyWatermarkIndex {
209 index,
210 do_state_cleaning,
211 }| (*index as usize, *do_state_cleaning),
212 )
213 .collect_vec();
214 }
215 if join_key_len > 0 {
218 vec![(0, true)]
219 } else {
220 vec![]
221 }
222}
223
224impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
225 type Output = StreamResult<Box<dyn Execute>>;
226
227 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
228 macro_rules! build {
230 ($join_type:ident, $join_encoding:ident) => {
231 Ok(
232 HashJoinExecutor::<K, S, { JoinType::$join_type }, $join_encoding>::new(
233 self.ctx,
234 self.info,
235 self.source_l,
236 self.source_r,
237 self.params_l,
238 self.params_r,
239 self.null_safe,
240 self.output_indices,
241 self.cond,
242 self.inequality_pairs,
243 self.state_table_l,
244 self.degree_state_table_l,
245 self.state_table_r,
246 self.degree_state_table_r,
247 self.lru_manager,
248 self.is_append_only,
249 self.metrics,
250 self.chunk_size,
251 self.high_join_amplification_threshold,
252 self.watermark_indices_in_jk.clone(),
253 )
254 .boxed(),
255 )
256 };
257 }
258
259 macro_rules! build_match {
260 ($($join_type:ident),*) => {
261 match (self.join_type_proto, self.join_encoding_type) {
262 (JoinTypeProto::AsofInner, _)
263 | (JoinTypeProto::AsofLeftOuter, _)
264 | (JoinTypeProto::Unspecified, _) => unreachable!(),
265 $(
266 (JoinTypeProto::$join_type, JoinEncodingType::Memory) => build!($join_type, MemoryEncoding),
267 (JoinTypeProto::$join_type, JoinEncodingType::Cpu) => build!($join_type, CpuEncoding),
268 )*
269 }
270 };
271 }
272 build_match! {
273 Inner,
274 LeftOuter,
275 RightOuter,
276 FullOuter,
277 LeftSemi,
278 LeftAnti,
279 RightSemi,
280 RightAnti
281 }
282 }
283
284 fn data_types(&self) -> &[DataType] {
285 &self.join_key_data_types
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use risingwave_pb::stream_plan::{HashJoinWatermarkHandleDesc, JoinKeyWatermarkIndex};
292
293 use super::resolve_clean_watermark_indices_in_jk;
294
295 #[test]
296 fn test_resolve_clean_watermark_indices_in_jk_from_desc() {
297 let desc = HashJoinWatermarkHandleDesc {
298 watermark_indices_in_jk: vec![
299 JoinKeyWatermarkIndex {
300 index: 2,
301 do_state_cleaning: true,
302 },
303 JoinKeyWatermarkIndex {
304 index: 1,
305 do_state_cleaning: false,
306 },
307 ],
308 inequality_pairs: vec![],
309 };
310 assert_eq!(
311 resolve_clean_watermark_indices_in_jk(Some(&desc), 3),
312 vec![(2, true), (1, false)]
313 );
314 }
315
316 #[test]
317 fn test_resolve_clean_watermark_indices_in_jk_default_first() {
318 assert_eq!(
319 resolve_clean_watermark_indices_in_jk(None, 2),
320 vec![(0, true)]
321 );
322 }
323
324 #[test]
325 fn test_resolve_clean_watermark_indices_in_jk_empty_on_no_keys() {
326 assert_eq!(resolve_clean_watermark_indices_in_jk(None, 0), vec![]);
327 }
328}