1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::DataType;
20use risingwave_common::util::functional::SameOrElseExt;
21use risingwave_common::util::sort_util::OrderType;
22use risingwave_pb::plan_common::JoinType;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use risingwave_pb::stream_plan::{
25 HashJoinNode, HashJoinWatermarkHandleDesc, InequalityPairV2 as PbInequalityPairV2,
26 InequalityType as PbInequalityType, JoinKeyWatermarkIndex, PbJoinEncodingType,
27};
28
29use super::generic::GenericPlanNode;
30use super::stream::prelude::*;
31use super::stream_join_common::StreamJoinCommon;
32use super::utils::{
33 Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
34};
35use super::{
36 ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamDeltaJoin, StreamPlanRef as PlanRef,
37 TryToStreamPb, generic,
38};
39use crate::TableCatalog;
40use crate::expr::{Expr, ExprDisplay, ExprRewriter, ExprType, ExprVisitor, InequalityInputPair};
41use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
42use crate::optimizer::plan_node::utils::IndicesDisplay;
43use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
44use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
45use crate::scheduler::SchedulerResult;
46use crate::stream_fragmenter::BuildFragmentGraphState;
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamHashJoin {
53 pub base: PlanBase<Stream>,
54 core: generic::Join<PlanRef>,
55
56 watermark_indices_in_jk: Vec<(usize, bool)>,
58
59 inequality_pairs: Vec<(usize, bool, bool, InequalityInputPair)>,
63
64 is_append_only: bool,
67}
68
69struct WatermarkDeriveResult {
71 watermark_columns: WatermarkColumns,
73 watermark_indices_in_jk: Vec<(usize, bool)>,
75 inequality_pairs: Vec<(usize, bool, bool, InequalityInputPair)>,
77}
78
79fn derive_watermark_for_hash_join(
89 core: &generic::Join<PlanRef>,
90 eq_join_predicate: &EqJoinPredicate,
91) -> WatermarkDeriveResult {
92 let ctx = core.ctx();
93 let l2i = core.l2i_col_mapping();
94 let r2i = core.r2i_col_mapping();
95
96 let mut watermark_indices_in_jk = vec![];
97 let mut inequality_pairs = vec![];
98
99 let mut found_jk_state_clean = false;
102 let mut found_ineq_clean_left = false;
103 let mut found_ineq_clean_right = false;
104
105 let mut watermark_columns = WatermarkColumns::new();
110 for (idx, (left_key, right_key)) in eq_join_predicate.eq_indexes().iter().enumerate() {
111 if let Some(l_wtmk_group) = core.left.watermark_columns().get_group(*left_key)
112 && let Some(r_wtmk_group) = core.right.watermark_columns().get_group(*right_key)
113 {
114 let do_state_cleaning = !found_jk_state_clean;
116 if do_state_cleaning {
117 found_jk_state_clean = true;
118 }
119 watermark_indices_in_jk.push((idx, do_state_cleaning));
120
121 if let Some(internal) = l2i.try_map(*left_key) {
122 watermark_columns.insert(
123 internal,
124 l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
125 );
126 }
127 if let Some(internal) = r2i.try_map(*right_key) {
128 watermark_columns.insert(
129 internal,
130 l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
131 );
132 }
133 }
134 }
135
136 let original_inequality_pairs = eq_join_predicate.inequality_pairs_v2();
139 for (conjunction_idx, pair) in original_inequality_pairs {
140 let InequalityInputPair {
141 left_idx,
142 right_idx,
143 op,
144 } = pair;
145
146 let both_upstream_has_watermark = core.left.watermark_columns().contains(left_idx)
148 && core.right.watermark_columns().contains(right_idx);
149 if !both_upstream_has_watermark {
150 continue;
151 }
152
153 let left_is_larger = matches!(op, ExprType::GreaterThan | ExprType::GreaterThanOrEqual);
158
159 let (clean_left, clean_right) = if left_is_larger {
163 let do_clean = !found_jk_state_clean && !found_ineq_clean_left;
164 if do_clean {
165 found_ineq_clean_left = true;
166 }
167 (do_clean, false)
168 } else {
169 let do_clean = !found_jk_state_clean && !found_ineq_clean_right;
170 if do_clean {
171 found_ineq_clean_right = true;
172 }
173 (false, do_clean)
174 };
175
176 let mut is_valuable_inequality = clean_left || clean_right;
177
178 if left_is_larger {
183 if let Some(internal) = l2i.try_map(left_idx)
184 && !watermark_columns.contains(internal)
185 {
186 watermark_columns.insert(internal, ctx.next_watermark_group_id());
187 is_valuable_inequality = true;
188 }
189 } else if let Some(internal) = r2i.try_map(right_idx)
190 && !watermark_columns.contains(internal)
191 {
192 watermark_columns.insert(internal, ctx.next_watermark_group_id());
193 is_valuable_inequality = true;
194 }
195
196 if is_valuable_inequality {
197 inequality_pairs.push((
198 conjunction_idx,
199 clean_left,
200 clean_right,
201 InequalityInputPair::new(left_idx, right_idx, op),
202 ));
203 }
204 }
205
206 let watermark_columns = watermark_columns.map_clone(&core.i2o_col_mapping());
207
208 WatermarkDeriveResult {
209 watermark_columns,
210 watermark_indices_in_jk,
211 inequality_pairs,
212 }
213}
214
215impl StreamHashJoin {
216 pub fn new(mut core: generic::Join<PlanRef>) -> Result<Self> {
217 let stream_kind = core.stream_kind()?;
218
219 let eq_join_predicate = {
221 let eq_join_predicate = core
222 .on
223 .as_eq_predicate_ref()
224 .expect("StreamHashJoin requires JoinOn::EqPredicate in core")
225 .clone();
226 let mut reorder_idx = vec![];
227 for (i, (left_key, right_key)) in eq_join_predicate.eq_indexes().iter().enumerate() {
228 if core.left.watermark_columns().contains(*left_key)
229 && core.right.watermark_columns().contains(*right_key)
230 {
231 reorder_idx.push(i);
232 }
233 }
234 eq_join_predicate.reorder(&reorder_idx)
235 };
236 core.on = generic::JoinOn::EqPredicate(eq_join_predicate.clone());
237
238 let dist = StreamJoinCommon::derive_dist(
239 core.left.distribution(),
240 core.right.distribution(),
241 &core,
242 );
243
244 let WatermarkDeriveResult {
246 watermark_columns,
247 watermark_indices_in_jk,
248 inequality_pairs,
249 } = derive_watermark_for_hash_join(&core, &eq_join_predicate);
250
251 let base = PlanBase::new_stream_with_core(
253 &core,
254 dist,
255 stream_kind,
256 false, watermark_columns,
258 MonotonicityMap::new(), );
260
261 Ok(Self {
262 base,
263 core,
264 watermark_indices_in_jk,
265 inequality_pairs,
266 is_append_only: stream_kind.is_append_only(),
267 })
268 }
269
270 pub fn join_type(&self) -> JoinType {
272 self.core.join_type
273 }
274
275 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
277 self.core
278 .on
279 .as_eq_predicate_ref()
280 .expect("StreamHashJoin should store predicate as EqJoinPredicate")
281 }
282
283 pub fn into_delta_join(self) -> StreamDeltaJoin {
285 StreamDeltaJoin::new(self.core).unwrap()
286 }
287
288 pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
289 let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
290 let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
291
292 StreamJoinCommon::get_dist_key_in_join_key(
293 &left_dk_indices,
294 &right_dk_indices,
295 self.eq_join_predicate(),
296 )
297 }
298
299 pub fn inequality_pairs(&self) -> &Vec<(usize, bool, bool, InequalityInputPair)> {
300 &self.inequality_pairs
301 }
302
303 fn clean_left_state_conjunction_idx(&self) -> Option<usize> {
305 self.inequality_pairs
306 .iter()
307 .find(|(_, clean_left, _, _)| *clean_left)
308 .map(|(idx, _, _, _)| *idx)
309 }
310
311 fn clean_right_state_conjunction_idx(&self) -> Option<usize> {
313 self.inequality_pairs
314 .iter()
315 .find(|(_, _, clean_right, _)| *clean_right)
316 .map(|(idx, _, _, _)| *idx)
317 }
318
319 fn infer_internal_and_degree_table_catalog(
336 &self,
337 input: PlanRef,
338 join_key_indices: Vec<usize>,
339 dk_indices_in_jk: Vec<usize>,
340 is_left: bool,
341 ) -> Result<(TableCatalog, TableCatalog, Vec<usize>)> {
342 let schema = input.schema();
343
344 let internal_table_dist_keys = dk_indices_in_jk
345 .iter()
346 .map(|idx| join_key_indices[*idx])
347 .collect_vec();
348
349 let degree_table_dist_keys = dk_indices_in_jk.clone();
350
351 let join_key_len = join_key_indices.len();
353 let mut pk_indices = join_key_indices.clone();
354
355 let mut deduped_input_pk_indices = vec![];
357 for input_pk_idx in input.stream_key().unwrap() {
358 if !pk_indices.contains(input_pk_idx)
359 && !deduped_input_pk_indices.contains(input_pk_idx)
360 {
361 deduped_input_pk_indices.push(*input_pk_idx);
362 }
363 }
364
365 pk_indices.extend(deduped_input_pk_indices.clone());
366
367 let (
369 clean_watermark_indices,
370 eq_join_key_clean_watermark_indices,
371 inequal_clean_watermark_indices,
372 ) = self.infer_clean_watermark_indices(&join_key_indices, is_left)?;
373
374 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
376 let internal_columns_fields = schema.fields().to_vec();
377
378 internal_columns_fields.iter().for_each(|field| {
379 internal_table_catalog_builder.add_column(field);
380 });
381 pk_indices.iter().for_each(|idx| {
382 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
383 });
384
385 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
386 internal_table_catalog_builder.set_clean_watermark_indices(clean_watermark_indices);
387
388 let mut degree_table_catalog_builder = TableCatalogBuilder::default();
390
391 let degree_column_field = Field::with_name(DataType::Int64, "_degree");
392
393 pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
394 degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
395 degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
396 });
397
398 degree_table_catalog_builder.add_column(°ree_column_field);
400 let degree_col_idx = degree_table_catalog_builder.columns().len() - 1;
401
402 let degree_inequality_col_idx = inequal_clean_watermark_indices
404 .iter()
405 .at_most_one()
406 .unwrap()
407 .map(|idx| {
408 degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
409 degree_table_catalog_builder.columns().len() - 1
410 });
411
412 let mut value_indices = vec![degree_col_idx];
414 if let Some(idx) = degree_inequality_col_idx {
415 value_indices.push(idx);
416 }
417 degree_table_catalog_builder.set_value_indices(value_indices);
418
419 degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
420
421 let degree_clean_watermark_indices = if let Some(idx) = degree_inequality_col_idx {
423 vec![idx]
424 } else {
425 eq_join_key_clean_watermark_indices
428 .iter()
429 .map(|input_col_idx| {
430 join_key_indices
431 .iter()
432 .position(|jk_idx| jk_idx == input_col_idx)
433 .expect("eq join key clean watermark index must exist in join_key_indices")
434 })
435 .collect()
436 };
437 degree_table_catalog_builder.set_clean_watermark_indices(degree_clean_watermark_indices);
438
439 Ok((
440 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
441 degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
442 deduped_input_pk_indices,
443 ))
444 }
445
446 fn infer_clean_watermark_indices(
455 &self,
456 join_key_indices: &[usize],
457 is_left: bool,
458 ) -> Result<(Vec<usize>, Vec<usize>, Vec<usize>)> {
459 let mut clean_watermark_indices = vec![];
460
461 let eq_join_key_clean_watermark_indices =
463 self.infer_eq_join_key_clean_watermark_indices(join_key_indices);
464 clean_watermark_indices.extend(eq_join_key_clean_watermark_indices.clone());
465 let mut inequal_clean_watermark_indices = vec![];
466 for (_conjunction_idx, clean_left, clean_right, pair) in &self.inequality_pairs {
468 if is_left && *clean_left {
469 let col_idx = pair.left_idx;
470 if !clean_watermark_indices.contains(&col_idx) {
471 inequal_clean_watermark_indices.push(col_idx);
472 }
473 } else if !is_left && *clean_right {
474 let col_idx = pair.right_idx;
475 if !clean_watermark_indices.contains(&col_idx) {
476 inequal_clean_watermark_indices.push(col_idx);
477 }
478 }
479 }
480
481 clean_watermark_indices.extend(inequal_clean_watermark_indices.clone());
482
483 if clean_watermark_indices.len() > 1 {
486 bail!(
487 "Expected at most 1 clean_watermark_index per table, got {:?}",
488 clean_watermark_indices
489 )
490 }
491
492 Ok((
493 clean_watermark_indices,
494 eq_join_key_clean_watermark_indices,
495 inequal_clean_watermark_indices,
496 ))
497 }
498
499 fn infer_eq_join_key_clean_watermark_indices(&self, join_key_indices: &[usize]) -> Vec<usize> {
501 let mut clean_indices = vec![];
502 for (idx_in_jk, do_state_cleaning) in &self.watermark_indices_in_jk {
503 if *do_state_cleaning {
504 let col_idx = join_key_indices[*idx_in_jk];
505 if !clean_indices.contains(&col_idx) {
506 clean_indices.push(col_idx);
507 }
508 }
509 }
510 clean_indices
511 }
512}
513
514impl Distill for StreamHashJoin {
515 fn distill<'a>(&self) -> XmlNode<'a> {
516 let (ljk, rjk) = self
517 .eq_join_predicate()
518 .eq_indexes()
519 .first()
520 .cloned()
521 .expect("first join key");
522
523 let clean_left_state_conjunction_idx = self.clean_left_state_conjunction_idx();
524 let clean_right_state_conjunction_idx = self.clean_right_state_conjunction_idx();
525 let clean_state_in_jk_indices: Vec<usize> = self
526 .watermark_indices_in_jk
527 .iter()
528 .filter_map(
529 |(idx, do_state_cleaning)| if *do_state_cleaning { Some(*idx) } else { None },
530 )
531 .collect();
532
533 let name = plan_node_name!("StreamHashJoin",
534 { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
535 { "interval", clean_left_state_conjunction_idx.is_some() && clean_right_state_conjunction_idx.is_some() },
536 { "append_only", self.is_append_only },
537 );
538 let verbose = self.base.ctx().is_explain_verbose();
539 let mut vec = Vec::with_capacity(6);
540 vec.push(("type", Pretty::debug(&self.core.join_type)));
541
542 let concat_schema = self.core.concat_schema();
543 vec.push((
544 "predicate",
545 Pretty::debug(&EqJoinPredicateDisplay {
546 eq_join_predicate: self.eq_join_predicate(),
547 input_schema: &concat_schema,
548 }),
549 ));
550
551 let get_other_cond = |conjunction_idx| {
552 Pretty::debug(&ExprDisplay {
553 expr: &self.eq_join_predicate().other_cond().conjunctions[conjunction_idx],
554 input_schema: &concat_schema,
555 })
556 };
557 let get_eq_cond = |conjunction_idx| {
558 Pretty::debug(&ExprDisplay {
559 expr: &self.eq_join_predicate().eq_cond().conjunctions[conjunction_idx],
560 input_schema: &concat_schema,
561 })
562 };
563 if !clean_state_in_jk_indices.is_empty() {
564 vec.push((
565 "conditions_to_clean_state_in_join_key",
566 Pretty::Array(
567 clean_state_in_jk_indices
568 .iter()
569 .map(|idx| get_eq_cond(*idx))
570 .collect::<Vec<_>>(),
571 ),
572 ));
573 }
574 if let Some(i) = clean_left_state_conjunction_idx {
575 vec.push(("conditions_to_clean_left_state_table", get_other_cond(i)));
576 }
577 if let Some(i) = clean_right_state_conjunction_idx {
578 vec.push(("conditions_to_clean_right_state_table", get_other_cond(i)));
579 }
580 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
581 vec.push(("output_watermarks", ow));
582 }
583
584 if verbose {
585 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
586 vec.push(("output", data));
587 }
588
589 childless_record(name, vec)
590 }
591}
592
593impl PlanTreeNodeBinary<Stream> for StreamHashJoin {
594 fn left(&self) -> PlanRef {
595 self.core.left.clone()
596 }
597
598 fn right(&self) -> PlanRef {
599 self.core.right.clone()
600 }
601
602 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
603 let mut core = self.core.clone();
604 core.left = left;
605 core.right = right;
606 Self::new(core).unwrap()
607 }
608}
609
610impl_plan_tree_node_for_binary! { Stream, StreamHashJoin }
611
612impl TryToStreamPb for StreamHashJoin {
613 fn try_to_stream_prost_body(
614 &self,
615 state: &mut BuildFragmentGraphState,
616 ) -> SchedulerResult<NodeBody> {
617 let left_jk_indices = self.eq_join_predicate().left_eq_indexes();
618 let right_jk_indices = self.eq_join_predicate().right_eq_indexes();
619 let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
620 let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
621
622 let retract =
623 self.left().stream_kind().is_retract() || self.right().stream_kind().is_retract();
624
625 let dk_indices_in_jk = self.derive_dist_key_in_join_key();
626
627 let (left_table, left_degree_table, left_deduped_input_pk_indices) = self
628 .infer_internal_and_degree_table_catalog(
629 self.left(),
630 left_jk_indices,
631 dk_indices_in_jk.clone(),
632 true, )?;
634 let (right_table, right_degree_table, right_deduped_input_pk_indices) = self
635 .infer_internal_and_degree_table_catalog(
636 self.right(),
637 right_jk_indices,
638 dk_indices_in_jk,
639 false, )?;
641
642 let left_deduped_input_pk_indices = left_deduped_input_pk_indices
643 .iter()
644 .map(|idx| *idx as u32)
645 .collect_vec();
646
647 let right_deduped_input_pk_indices = right_deduped_input_pk_indices
648 .iter()
649 .map(|idx| *idx as u32)
650 .collect_vec();
651
652 let (left_table, left_degree_table) = (
653 left_table.with_id(state.gen_table_id_wrapped()),
654 left_degree_table.with_id(state.gen_table_id_wrapped()),
655 );
656 let (right_table, right_degree_table) = (
657 right_table.with_id(state.gen_table_id_wrapped()),
658 right_degree_table.with_id(state.gen_table_id_wrapped()),
659 );
660
661 let null_safe_prost = self.eq_join_predicate().null_safes().into_iter().collect();
662
663 let condition = self
664 .eq_join_predicate()
665 .other_cond()
666 .as_expr_unless_true()
667 .map(|expr| expr.to_expr_proto_checked_pure(retract, "JOIN condition"))
668 .transpose()?;
669
670 fn expr_type_to_pb_inequality_type(op: ExprType) -> i32 {
672 match op {
673 ExprType::LessThan => PbInequalityType::LessThan as i32,
674 ExprType::LessThanOrEqual => PbInequalityType::LessThanOrEqual as i32,
675 ExprType::GreaterThan => PbInequalityType::GreaterThan as i32,
676 ExprType::GreaterThanOrEqual => PbInequalityType::GreaterThanOrEqual as i32,
677 _ => PbInequalityType::Unspecified as i32,
678 }
679 }
680
681 Ok(NodeBody::HashJoin(Box::new(HashJoinNode {
682 join_type: self.core.join_type as i32,
683 left_key: left_jk_indices_prost,
684 right_key: right_jk_indices_prost,
685 null_safe: null_safe_prost,
686 condition,
687 watermark_handle_desc: Some(HashJoinWatermarkHandleDesc {
688 watermark_indices_in_jk: self
689 .watermark_indices_in_jk
690 .iter()
691 .map(|(idx, do_clean)| JoinKeyWatermarkIndex {
692 index: *idx as u32,
693 do_state_cleaning: *do_clean,
694 })
695 .collect(),
696 inequality_pairs: self
697 .inequality_pairs
698 .iter()
699 .map(
700 |(_conjunction_idx, clean_left, clean_right, pair)| PbInequalityPairV2 {
701 left_idx: pair.left_idx as u32,
702 right_idx: pair.right_idx as u32,
703 clean_left_state: *clean_left,
704 clean_right_state: *clean_right,
705 op: expr_type_to_pb_inequality_type(pair.op),
706 },
707 )
708 .collect(),
709 }),
710 left_table: Some(left_table.to_internal_table_prost()),
711 right_table: Some(right_table.to_internal_table_prost()),
712 left_degree_table: Some(left_degree_table.to_internal_table_prost()),
713 right_degree_table: Some(right_degree_table.to_internal_table_prost()),
714 left_deduped_input_pk_indices,
715 right_deduped_input_pk_indices,
716 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
717 is_append_only: self.is_append_only,
718 #[allow(deprecated)]
720 join_encoding_type: PbJoinEncodingType::Unspecified as _,
721 })))
722 }
723}
724
725impl ExprRewritable<Stream> for StreamHashJoin {
726 fn has_rewritable_expr(&self) -> bool {
727 true
728 }
729
730 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
731 let mut core = self.core.clone();
732 core.rewrite_exprs(r);
733 Self::new(core).unwrap().into()
734 }
735}
736
737impl ExprVisitable for StreamHashJoin {
738 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
739 self.core.visit_exprs(v);
740 }
741}