1use itertools::{EitherOrBoth, Itertools};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::JoinType;
20
21use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
22use crate::TableCatalog;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::StreamPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata as _;
27use crate::optimizer::plan_node::stream::prelude::*;
28use crate::optimizer::plan_node::utils::TableCatalogBuilder;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct Join<PlanRef> {
40 pub left: PlanRef,
41 pub right: PlanRef,
42 pub on: Condition,
43 pub join_type: JoinType,
44 pub output_indices: Vec<usize>,
45}
46
47pub(crate) fn has_repeated_element(slice: &[usize]) -> bool {
48 (1..slice.len()).any(|i| slice[i..].contains(&slice[i - 1]))
49}
50
51impl<PlanRef: GenericPlanRef> Join<PlanRef> {
52 pub(crate) fn clone_with_inputs<OtherPlanRef>(
53 &self,
54 left: OtherPlanRef,
55 right: OtherPlanRef,
56 ) -> Join<OtherPlanRef> {
57 Join {
58 left,
59 right,
60 on: self.on.clone(),
61 join_type: self.join_type,
62 output_indices: self.output_indices.clone(),
63 }
64 }
65
66 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
67 self.on = self.on.clone().rewrite_expr(r);
68 }
69
70 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
71 self.on.visit_expr(v);
72 }
73
74 pub fn eq_indexes(&self) -> Vec<(usize, usize)> {
75 let left_len = self.left.schema().len();
76 let right_len = self.right.schema().len();
77 let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
78 eq_predicate.eq_indexes()
79 }
80
81 pub fn new(
82 left: PlanRef,
83 right: PlanRef,
84 on: Condition,
85 join_type: JoinType,
86 output_indices: Vec<usize>,
87 ) -> Self {
88 debug_assert!(!has_repeated_element(&output_indices));
90 Self {
91 left,
92 right,
93 on,
94 join_type,
95 output_indices,
96 }
97 }
98}
99
100impl Join<StreamPlanRef> {
101 pub fn stream_kind(&self) -> Result<StreamKind> {
102 let left_kind = reject_upsert_input!(self.left, "Join");
103 let right_kind = reject_upsert_input!(self.right, "Join");
104
105 if let JoinType::Inner | JoinType::AsofInner = self.join_type
107 && let StreamKind::AppendOnly = left_kind
108 && let StreamKind::AppendOnly = right_kind
109 {
110 Ok(StreamKind::AppendOnly)
111 } else {
112 Ok(StreamKind::Retract)
113 }
114 }
115
116 pub fn infer_internal_and_degree_table_catalog(
118 input: StreamPlanRef,
119 join_key_indices: Vec<usize>,
120 dk_indices_in_jk: Vec<usize>,
121 ) -> (TableCatalog, TableCatalog, Vec<usize>) {
122 let schema = input.schema();
123
124 let internal_table_dist_keys = dk_indices_in_jk
125 .iter()
126 .map(|idx| join_key_indices[*idx])
127 .collect_vec();
128
129 let degree_table_dist_keys = dk_indices_in_jk.clone();
130
131 let join_key_len = join_key_indices.len();
133 let mut pk_indices = join_key_indices;
134
135 let mut deduped_input_pk_indices = vec![];
137 for input_pk_idx in input.stream_key().unwrap() {
138 if !pk_indices.contains(input_pk_idx)
139 && !deduped_input_pk_indices.contains(input_pk_idx)
140 {
141 deduped_input_pk_indices.push(*input_pk_idx);
142 }
143 }
144
145 pk_indices.extend(deduped_input_pk_indices.clone());
146
147 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
149 let internal_columns_fields = schema.fields().to_vec();
150
151 internal_columns_fields.iter().for_each(|field| {
152 internal_table_catalog_builder.add_column(field);
153 });
154 pk_indices.iter().for_each(|idx| {
155 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
156 });
157
158 let mut degree_table_catalog_builder = TableCatalogBuilder::default();
160
161 let degree_column_field = Field::with_name(DataType::Int64, "_degree");
162
163 pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
164 degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
165 degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
166 });
167 degree_table_catalog_builder.add_column(°ree_column_field);
168 degree_table_catalog_builder
169 .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);
170
171 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
172 degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
173
174 (
175 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
176 degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
177 deduped_input_pk_indices,
178 )
179 }
180}
181
182impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
183 fn schema(&self) -> Schema {
184 let left_schema = self.left.schema();
185 let right_schema = self.right.schema();
186 let i2l = self.i2l_col_mapping();
187 let i2r = self.i2r_col_mapping();
188 let fields = self
189 .output_indices
190 .iter()
191 .map(|&i| match (i2l.try_map(i), i2r.try_map(i)) {
192 (Some(l_i), None) => left_schema.fields()[l_i].clone(),
193 (None, Some(r_i)) => right_schema.fields()[r_i].clone(),
194 _ => panic!(
195 "left len {}, right len {}, i {}, lmap {:?}, rmap {:?}",
196 left_schema.len(),
197 right_schema.len(),
198 i,
199 i2l,
200 i2r
201 ),
202 })
203 .collect();
204 Schema { fields }
205 }
206
207 fn stream_key(&self) -> Option<Vec<usize>> {
208 let eq_indexes = self.eq_indexes();
209 let left_pk = self.left.stream_key()?;
210 let right_pk = self.right.stream_key()?;
211 let l2i = self.l2i_col_mapping();
212 let r2i = self.r2i_col_mapping();
213 let full_out_col_num = self.internal_column_num();
214 let i2o = ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num);
215
216 let mut pk_indices_internal = left_pk
218 .iter()
219 .map(|index| l2i.try_map(*index))
220 .chain(right_pk.iter().map(|index| r2i.try_map(*index)))
221 .flatten()
222 .collect::<Vec<_>>();
223
224 let either_or_both = self.add_which_join_key_to_pk();
225
226 for (lk, rk) in eq_indexes {
227 match either_or_both {
228 EitherOrBoth::Left(_) => {
229 if let Some(rk_internal) = r2i.try_map(rk) {
235 pk_indices_internal.retain(|&x| x != rk_internal);
236 }
237 if let Some(lk_internal) = l2i.try_map(lk)
239 && !pk_indices_internal.contains(&lk_internal)
240 {
241 pk_indices_internal.push(lk_internal);
242 }
243 }
244 EitherOrBoth::Right(_) => {
245 if let Some(lk_internal) = l2i.try_map(lk) {
248 pk_indices_internal.retain(|&x| x != lk_internal);
249 }
250 if let Some(rk_internal) = r2i.try_map(rk)
252 && !pk_indices_internal.contains(&rk_internal)
253 {
254 pk_indices_internal.push(rk_internal);
255 }
256 }
257 EitherOrBoth::Both(_, _) => {
258 if let Some(lk_internal) = l2i.try_map(lk)
259 && !pk_indices_internal.contains(&lk_internal)
260 {
261 pk_indices_internal.push(lk_internal);
262 }
263 if let Some(rk_internal) = r2i.try_map(rk)
264 && !pk_indices_internal.contains(&rk_internal)
265 {
266 pk_indices_internal.push(rk_internal);
267 }
268 }
269 };
270 }
271
272 let pk_indices = pk_indices_internal
274 .iter()
275 .map(|&index| i2o.try_map(index))
276 .collect::<Option<Vec<_>>>()?;
277
278 Some(pk_indices)
279 }
280
281 fn ctx(&self) -> OptimizerContextRef {
282 self.left.ctx()
283 }
284
285 fn functional_dependency(&self) -> FunctionalDependencySet {
286 let left_len = self.left.schema().len();
287 let right_len = self.right.schema().len();
288 let left_fd_set = self.left.functional_dependency().clone();
289 let right_fd_set = self.right.functional_dependency().clone();
290
291 let full_out_col_num = self.internal_column_num();
292
293 let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
294 ColIndexMapping::with_shift_offset(left_len, 0)
295 .composite(&ColIndexMapping::identity(full_out_col_num))
296 .rewrite_functional_dependency_set(left_fd_set)
297 };
298 let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
299 ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
300 .rewrite_functional_dependency_set(right_fd_set)
301 };
302 let fd_set: FunctionalDependencySet = match self.join_type {
303 JoinType::Inner | JoinType::AsofInner => {
304 let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
305 for i in &self.on.conjunctions {
306 if let Some((col, _)) = i.as_eq_const() {
307 fd_set.add_constant_columns(&[col.index()])
308 } else if let Some((left, right)) = i.as_eq_cond() {
309 fd_set.add_functional_dependency_by_column_indices(
310 &[left.index()],
311 &[right.index()],
312 );
313 fd_set.add_functional_dependency_by_column_indices(
314 &[right.index()],
315 &[left.index()],
316 );
317 }
318 }
319 get_new_left_fd_set(left_fd_set)
320 .into_dependencies()
321 .into_iter()
322 .chain(get_new_right_fd_set(right_fd_set).into_dependencies())
323 .for_each(|fd| fd_set.add_functional_dependency(fd));
324 fd_set
325 }
326 JoinType::LeftOuter | JoinType::AsofLeftOuter => get_new_left_fd_set(left_fd_set),
327 JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
328 JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
329 JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
330 JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
331 JoinType::Unspecified => unreachable!(),
332 };
333 ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
334 .rewrite_functional_dependency_set(fd_set)
335 }
336}
337
338impl<PlanRef> Join<PlanRef> {
339 pub fn decompose(self) -> (PlanRef, PlanRef, Condition, JoinType, Vec<usize>) {
340 (
341 self.left,
342 self.right,
343 self.on,
344 self.join_type,
345 self.output_indices,
346 )
347 }
348}
349
350impl<PlanRef: GenericPlanRef> Join<PlanRef> {
351 pub fn full_out_col_num(left_len: usize, right_len: usize, join_type: JoinType) -> usize {
352 match join_type {
353 JoinType::Inner
354 | JoinType::LeftOuter
355 | JoinType::RightOuter
356 | JoinType::FullOuter
357 | JoinType::AsofInner
358 | JoinType::AsofLeftOuter => left_len + right_len,
359 JoinType::LeftSemi | JoinType::LeftAnti => left_len,
360 JoinType::RightSemi | JoinType::RightAnti => right_len,
361 JoinType::Unspecified => unreachable!(),
362 }
363 }
364
365 pub fn with_full_output(
366 left: PlanRef,
367 right: PlanRef,
368 join_type: JoinType,
369 on: Condition,
370 ) -> Self {
371 let out_column_num =
372 Self::full_out_col_num(left.schema().len(), right.schema().len(), join_type);
373 Self {
374 left,
375 right,
376 join_type,
377 on,
378 output_indices: (0..out_column_num).collect(),
379 }
380 }
381
382 pub fn internal_column_num(&self) -> usize {
383 Self::full_out_col_num(
384 self.left.schema().len(),
385 self.right.schema().len(),
386 self.join_type,
387 )
388 }
389
390 pub fn is_full_out(&self) -> bool {
391 self.output_indices.len() == self.internal_column_num()
392 }
393
394 pub fn i2l_col_mapping(&self) -> ColIndexMapping {
396 let left_len = self.left.schema().len();
397 let right_len = self.right.schema().len();
398
399 match self.join_type {
400 JoinType::Inner
401 | JoinType::LeftOuter
402 | JoinType::RightOuter
403 | JoinType::FullOuter
404 | JoinType::AsofInner
405 | JoinType::AsofLeftOuter => {
406 ColIndexMapping::identity_or_none(left_len + right_len, left_len)
407 }
408
409 JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::identity(left_len),
410 JoinType::RightSemi | JoinType::RightAnti => {
411 ColIndexMapping::empty(right_len, left_len)
412 }
413 JoinType::Unspecified => unreachable!(),
414 }
415 }
416
417 pub fn i2r_col_mapping(&self) -> ColIndexMapping {
419 let left_len = self.left.schema().len();
420 let right_len = self.right.schema().len();
421
422 match self.join_type {
423 JoinType::Inner
424 | JoinType::LeftOuter
425 | JoinType::RightOuter
426 | JoinType::FullOuter
427 | JoinType::AsofInner
428 | JoinType::AsofLeftOuter => {
429 ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
430 }
431 JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::empty(left_len, right_len),
432 JoinType::RightSemi | JoinType::RightAnti => ColIndexMapping::identity(right_len),
433 JoinType::Unspecified => unreachable!(),
434 }
435 }
436
437 pub fn i2l_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
439 let left_len = self.left.schema().len();
440 let right_len = self.right.schema().len();
441
442 ColIndexMapping::identity_or_none(left_len + right_len, left_len)
443 }
444
445 pub fn i2r_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
447 let left_len = self.left.schema().len();
448 let right_len = self.right.schema().len();
449
450 ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
451 }
452
453 pub fn l2i_col_mapping(&self) -> ColIndexMapping {
455 self.i2l_col_mapping()
456 .inverse()
457 .expect("must be invertible")
458 }
459
460 pub fn r2i_col_mapping(&self) -> ColIndexMapping {
462 self.i2r_col_mapping()
463 .inverse()
464 .expect("must be invertible")
465 }
466
467 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
469 ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
470 }
471
472 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
474 ColIndexMapping::new(
477 self.output_indices.iter().map(|x| Some(*x)).collect(),
478 self.internal_column_num(),
479 )
480 }
481
482 pub fn add_which_join_key_to_pk(&self) -> EitherOrBoth<(), ()> {
483 match self.join_type {
484 JoinType::Inner | JoinType::AsofInner => {
485 EitherOrBoth::Left(())
489 }
490 JoinType::LeftOuter
491 | JoinType::LeftSemi
492 | JoinType::LeftAnti
493 | JoinType::AsofLeftOuter => EitherOrBoth::Left(()),
494 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
495 EitherOrBoth::Right(())
496 }
497 JoinType::FullOuter => EitherOrBoth::Both((), ()),
498 JoinType::Unspecified => unreachable!(),
499 }
500 }
501
502 pub fn concat_schema(&self) -> Schema {
503 Schema::new(
504 [
505 self.left.schema().fields.clone(),
506 self.right.schema().fields.clone(),
507 ]
508 .concat(),
509 )
510 }
511}
512
513pub fn push_down_into_join(
519 predicate: &mut Condition,
520 left_col_num: usize,
521 right_col_num: usize,
522 ty: JoinType,
523 push_temporal_predicate: bool,
524) -> (Condition, Condition, Condition) {
525 let (left, right) = push_down_to_inputs(
526 predicate,
527 left_col_num,
528 right_col_num,
529 can_push_left_from_filter(ty),
530 can_push_right_from_filter(ty),
531 push_temporal_predicate,
532 );
533
534 let on = if can_push_on_from_filter(ty) {
535 let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
536
537 if push_temporal_predicate {
538 Condition { conjunctions }
539 } else {
540 let on = Condition {
542 conjunctions: conjunctions
543 .extract_if(.., |expr| expr.count_nows() == 0)
544 .collect(),
545 };
546 predicate.conjunctions = conjunctions;
547 on
548 }
549 } else {
550 Condition::true_cond()
551 };
552 (left, right, on)
553}
554
555pub fn push_down_join_condition(
560 on_condition: &mut Condition,
561 left_col_num: usize,
562 right_col_num: usize,
563 ty: JoinType,
564 push_temporal_predicate: bool,
565) -> (Condition, Condition) {
566 push_down_to_inputs(
567 on_condition,
568 left_col_num,
569 right_col_num,
570 can_push_left_from_on(ty),
571 can_push_right_from_on(ty),
572 push_temporal_predicate,
573 )
574}
575
576fn push_down_to_inputs(
581 predicate: &mut Condition,
582 left_col_num: usize,
583 right_col_num: usize,
584 push_left: bool,
585 push_right: bool,
586 push_temporal_predicate: bool,
587) -> (Condition, Condition) {
588 let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
589 let (mut left, right, mut others) = if push_temporal_predicate {
590 Condition { conjunctions }.split(left_col_num, right_col_num)
591 } else {
592 let temporal_filter_cons = conjunctions
593 .extract_if(.., |e| e.count_nows() != 0)
594 .collect_vec();
595 let (left, right, mut others) =
596 Condition { conjunctions }.split(left_col_num, right_col_num);
597
598 others.conjunctions.extend(temporal_filter_cons);
599 (left, right, others)
600 };
601
602 if !push_left {
603 others.conjunctions.extend(left);
604 left = Condition::true_cond();
605 };
606
607 let right = if push_right {
608 let mut mapping = ColIndexMapping::with_shift_offset(
609 left_col_num + right_col_num,
610 -(left_col_num as isize),
611 );
612 right.rewrite_expr(&mut mapping)
613 } else {
614 others.conjunctions.extend(right);
615 Condition::true_cond()
616 };
617
618 predicate.conjunctions = others.conjunctions;
619
620 (left, right)
621}
622
623pub fn can_push_left_from_filter(ty: JoinType) -> bool {
624 matches!(
625 ty,
626 JoinType::Inner
627 | JoinType::LeftOuter
628 | JoinType::LeftSemi
629 | JoinType::LeftAnti
630 | JoinType::AsofInner
631 | JoinType::AsofLeftOuter
632 )
633}
634
635pub fn can_push_right_from_filter(ty: JoinType) -> bool {
636 matches!(
637 ty,
638 JoinType::Inner
639 | JoinType::RightOuter
640 | JoinType::RightSemi
641 | JoinType::RightAnti
642 | JoinType::AsofInner
643 )
644}
645
646pub fn can_push_on_from_filter(ty: JoinType) -> bool {
647 matches!(
648 ty,
649 JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi
650 )
651}
652
653pub fn can_push_left_from_on(ty: JoinType) -> bool {
654 matches!(
655 ty,
656 JoinType::Inner
657 | JoinType::RightOuter
658 | JoinType::LeftSemi
659 | JoinType::AsofInner
660 | JoinType::AsofLeftOuter
661 )
662}
663
664pub fn can_push_right_from_on(ty: JoinType) -> bool {
665 matches!(
666 ty,
667 JoinType::Inner
668 | JoinType::LeftOuter
669 | JoinType::RightSemi
670 | JoinType::AsofInner
671 | JoinType::AsofLeftOuter
672 )
673}