1use itertools::{EitherOrBoth, Itertools};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::JoinType;
20
21use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
22use crate::TableCatalog;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::StreamPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata as _;
27use crate::optimizer::plan_node::stream::prelude::*;
28use crate::optimizer::plan_node::utils::TableCatalogBuilder;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct Join<PlanRef> {
40 pub left: PlanRef,
41 pub right: PlanRef,
42 pub on: Condition,
43 pub join_type: JoinType,
44 pub output_indices: Vec<usize>,
45}
46
47pub(crate) fn has_repeated_element(slice: &[usize]) -> bool {
48 (1..slice.len()).any(|i| slice[i..].contains(&slice[i - 1]))
49}
50
51impl<PlanRef: GenericPlanRef> Join<PlanRef> {
52 pub(crate) fn clone_with_inputs<OtherPlanRef>(
53 &self,
54 left: OtherPlanRef,
55 right: OtherPlanRef,
56 ) -> Join<OtherPlanRef> {
57 Join {
58 left,
59 right,
60 on: self.on.clone(),
61 join_type: self.join_type,
62 output_indices: self.output_indices.clone(),
63 }
64 }
65
66 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
67 self.on = self.on.clone().rewrite_expr(r);
68 }
69
70 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
71 self.on.visit_expr(v);
72 }
73
74 pub fn eq_indexes(&self) -> Vec<(usize, usize)> {
75 let left_len = self.left.schema().len();
76 let right_len = self.right.schema().len();
77 let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
78 eq_predicate.eq_indexes()
79 }
80
81 pub fn new(
82 left: PlanRef,
83 right: PlanRef,
84 on: Condition,
85 join_type: JoinType,
86 output_indices: Vec<usize>,
87 ) -> Self {
88 debug_assert!(!has_repeated_element(&output_indices));
90 Self {
91 left,
92 right,
93 on,
94 join_type,
95 output_indices,
96 }
97 }
98}
99
100impl Join<StreamPlanRef> {
101 pub fn stream_kind(&self) -> Result<StreamKind> {
102 let left_kind = reject_upsert_input!(self.left, "Join");
103 let right_kind = reject_upsert_input!(self.right, "Join");
104
105 if let JoinType::Inner | JoinType::AsofInner = self.join_type
107 && let StreamKind::AppendOnly = left_kind
108 && let StreamKind::AppendOnly = right_kind
109 {
110 Ok(StreamKind::AppendOnly)
111 } else {
112 Ok(StreamKind::Retract)
113 }
114 }
115
116 pub fn infer_internal_and_degree_table_catalog(
118 input: StreamPlanRef,
119 join_key_indices: Vec<usize>,
120 dk_indices_in_jk: Vec<usize>,
121 ) -> (TableCatalog, TableCatalog, Vec<usize>) {
122 let schema = input.schema();
123
124 let internal_table_dist_keys = dk_indices_in_jk
125 .iter()
126 .map(|idx| join_key_indices[*idx])
127 .collect_vec();
128
129 let degree_table_dist_keys = dk_indices_in_jk.clone();
130
131 let join_key_len = join_key_indices.len();
133 let mut pk_indices = join_key_indices;
134
135 let mut deduped_input_pk_indices = vec![];
137 for input_pk_idx in input.stream_key().unwrap() {
138 if !pk_indices.contains(input_pk_idx)
139 && !deduped_input_pk_indices.contains(input_pk_idx)
140 {
141 deduped_input_pk_indices.push(*input_pk_idx);
142 }
143 }
144
145 pk_indices.extend(deduped_input_pk_indices.clone());
146
147 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
149 let internal_columns_fields = schema.fields().to_vec();
150
151 internal_columns_fields.iter().for_each(|field| {
152 internal_table_catalog_builder.add_column(field);
153 });
154 pk_indices.iter().for_each(|idx| {
155 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
156 });
157
158 let mut degree_table_catalog_builder = TableCatalogBuilder::default();
160
161 let degree_column_field = Field::with_name(DataType::Int64, "_degree");
162
163 pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
164 degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
165 degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
166 });
167 degree_table_catalog_builder.add_column(°ree_column_field);
168 degree_table_catalog_builder
169 .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);
170
171 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
172 degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
173
174 (
175 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
176 degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
177 deduped_input_pk_indices,
178 )
179 }
180}
181
182impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
183 fn schema(&self) -> Schema {
184 let left_schema = self.left.schema();
185 let right_schema = self.right.schema();
186 let i2l = self.i2l_col_mapping();
187 let i2r = self.i2r_col_mapping();
188 let fields = self
189 .output_indices
190 .iter()
191 .map(|&i| match (i2l.try_map(i), i2r.try_map(i)) {
192 (Some(l_i), None) => left_schema.fields()[l_i].clone(),
193 (None, Some(r_i)) => right_schema.fields()[r_i].clone(),
194 _ => panic!(
195 "left len {}, right len {}, i {}, lmap {:?}, rmap {:?}",
196 left_schema.len(),
197 right_schema.len(),
198 i,
199 i2l,
200 i2r
201 ),
202 })
203 .collect();
204 Schema { fields }
205 }
206
207 fn stream_key(&self) -> Option<Vec<usize>> {
208 let eq_indexes = self.eq_indexes();
209 let left_pk = self.left.stream_key()?;
210 let right_pk = self.right.stream_key()?;
211 let l2i = self.l2i_col_mapping();
212 let r2i = self.r2i_col_mapping();
213 let full_out_col_num = self.internal_column_num();
214 let i2o = ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num);
215
216 let mut pk_indices = left_pk
217 .iter()
218 .map(|index| l2i.try_map(*index))
219 .chain(right_pk.iter().map(|index| r2i.try_map(*index)))
220 .flatten()
221 .map(|index| i2o.try_map(index))
222 .collect::<Option<Vec<_>>>()?;
223
224 let l2i = self.l2i_col_mapping();
227 let r2i = self.r2i_col_mapping();
228 let full_out_col_num = self.internal_column_num();
229 let i2o = ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num);
230
231 let either_or_both = self.add_which_join_key_to_pk();
232
233 for (lk, rk) in eq_indexes {
234 match either_or_both {
235 EitherOrBoth::Left(_) => {
236 if let Some(rk) = r2i.try_map(rk)
242 && let Some(out_k) = i2o.try_map(rk)
243 {
244 pk_indices.retain(|&x| x != out_k);
245 }
246 if let Some(lk) = l2i.try_map(lk) {
248 let out_k = i2o.try_map(lk)?;
249 if !pk_indices.contains(&out_k) {
250 pk_indices.push(out_k);
251 }
252 }
253 }
254 EitherOrBoth::Right(_) => {
255 if let Some(lk) = l2i.try_map(lk)
258 && let Some(out_k) = i2o.try_map(lk)
259 {
260 pk_indices.retain(|&x| x != out_k);
261 }
262 if let Some(rk) = r2i.try_map(rk) {
264 let out_k = i2o.try_map(rk)?;
265 if !pk_indices.contains(&out_k) {
266 pk_indices.push(out_k);
267 }
268 }
269 }
270 EitherOrBoth::Both(_, _) => {
271 if let Some(lk) = l2i.try_map(lk) {
272 let out_k = i2o.try_map(lk)?;
273 if !pk_indices.contains(&out_k) {
274 pk_indices.push(out_k);
275 }
276 }
277 if let Some(rk) = r2i.try_map(rk) {
278 let out_k = i2o.try_map(rk)?;
279 if !pk_indices.contains(&out_k) {
280 pk_indices.push(out_k);
281 }
282 }
283 }
284 };
285 }
286 Some(pk_indices)
287 }
288
289 fn ctx(&self) -> OptimizerContextRef {
290 self.left.ctx()
291 }
292
293 fn functional_dependency(&self) -> FunctionalDependencySet {
294 let left_len = self.left.schema().len();
295 let right_len = self.right.schema().len();
296 let left_fd_set = self.left.functional_dependency().clone();
297 let right_fd_set = self.right.functional_dependency().clone();
298
299 let full_out_col_num = self.internal_column_num();
300
301 let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
302 ColIndexMapping::with_shift_offset(left_len, 0)
303 .composite(&ColIndexMapping::identity(full_out_col_num))
304 .rewrite_functional_dependency_set(left_fd_set)
305 };
306 let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
307 ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
308 .rewrite_functional_dependency_set(right_fd_set)
309 };
310 let fd_set: FunctionalDependencySet = match self.join_type {
311 JoinType::Inner | JoinType::AsofInner => {
312 let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
313 for i in &self.on.conjunctions {
314 if let Some((col, _)) = i.as_eq_const() {
315 fd_set.add_constant_columns(&[col.index()])
316 } else if let Some((left, right)) = i.as_eq_cond() {
317 fd_set.add_functional_dependency_by_column_indices(
318 &[left.index()],
319 &[right.index()],
320 );
321 fd_set.add_functional_dependency_by_column_indices(
322 &[right.index()],
323 &[left.index()],
324 );
325 }
326 }
327 get_new_left_fd_set(left_fd_set)
328 .into_dependencies()
329 .into_iter()
330 .chain(get_new_right_fd_set(right_fd_set).into_dependencies())
331 .for_each(|fd| fd_set.add_functional_dependency(fd));
332 fd_set
333 }
334 JoinType::LeftOuter | JoinType::AsofLeftOuter => get_new_left_fd_set(left_fd_set),
335 JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
336 JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
337 JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
338 JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
339 JoinType::Unspecified => unreachable!(),
340 };
341 ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
342 .rewrite_functional_dependency_set(fd_set)
343 }
344}
345
346impl<PlanRef> Join<PlanRef> {
347 pub fn decompose(self) -> (PlanRef, PlanRef, Condition, JoinType, Vec<usize>) {
348 (
349 self.left,
350 self.right,
351 self.on,
352 self.join_type,
353 self.output_indices,
354 )
355 }
356}
357
358impl<PlanRef: GenericPlanRef> Join<PlanRef> {
359 pub fn full_out_col_num(left_len: usize, right_len: usize, join_type: JoinType) -> usize {
360 match join_type {
361 JoinType::Inner
362 | JoinType::LeftOuter
363 | JoinType::RightOuter
364 | JoinType::FullOuter
365 | JoinType::AsofInner
366 | JoinType::AsofLeftOuter => left_len + right_len,
367 JoinType::LeftSemi | JoinType::LeftAnti => left_len,
368 JoinType::RightSemi | JoinType::RightAnti => right_len,
369 JoinType::Unspecified => unreachable!(),
370 }
371 }
372
373 pub fn with_full_output(
374 left: PlanRef,
375 right: PlanRef,
376 join_type: JoinType,
377 on: Condition,
378 ) -> Self {
379 let out_column_num =
380 Self::full_out_col_num(left.schema().len(), right.schema().len(), join_type);
381 Self {
382 left,
383 right,
384 join_type,
385 on,
386 output_indices: (0..out_column_num).collect(),
387 }
388 }
389
390 pub fn internal_column_num(&self) -> usize {
391 Self::full_out_col_num(
392 self.left.schema().len(),
393 self.right.schema().len(),
394 self.join_type,
395 )
396 }
397
398 pub fn is_full_out(&self) -> bool {
399 self.output_indices.len() == self.internal_column_num()
400 }
401
402 pub fn i2l_col_mapping(&self) -> ColIndexMapping {
404 let left_len = self.left.schema().len();
405 let right_len = self.right.schema().len();
406
407 match self.join_type {
408 JoinType::Inner
409 | JoinType::LeftOuter
410 | JoinType::RightOuter
411 | JoinType::FullOuter
412 | JoinType::AsofInner
413 | JoinType::AsofLeftOuter => {
414 ColIndexMapping::identity_or_none(left_len + right_len, left_len)
415 }
416
417 JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::identity(left_len),
418 JoinType::RightSemi | JoinType::RightAnti => {
419 ColIndexMapping::empty(right_len, left_len)
420 }
421 JoinType::Unspecified => unreachable!(),
422 }
423 }
424
425 pub fn i2r_col_mapping(&self) -> ColIndexMapping {
427 let left_len = self.left.schema().len();
428 let right_len = self.right.schema().len();
429
430 match self.join_type {
431 JoinType::Inner
432 | JoinType::LeftOuter
433 | JoinType::RightOuter
434 | JoinType::FullOuter
435 | JoinType::AsofInner
436 | JoinType::AsofLeftOuter => {
437 ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
438 }
439 JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::empty(left_len, right_len),
440 JoinType::RightSemi | JoinType::RightAnti => ColIndexMapping::identity(right_len),
441 JoinType::Unspecified => unreachable!(),
442 }
443 }
444
445 pub fn i2l_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
447 let left_len = self.left.schema().len();
448 let right_len = self.right.schema().len();
449
450 ColIndexMapping::identity_or_none(left_len + right_len, left_len)
451 }
452
453 pub fn i2r_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
455 let left_len = self.left.schema().len();
456 let right_len = self.right.schema().len();
457
458 ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
459 }
460
461 pub fn l2i_col_mapping(&self) -> ColIndexMapping {
463 self.i2l_col_mapping()
464 .inverse()
465 .expect("must be invertible")
466 }
467
468 pub fn r2i_col_mapping(&self) -> ColIndexMapping {
470 self.i2r_col_mapping()
471 .inverse()
472 .expect("must be invertible")
473 }
474
475 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
477 ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
478 }
479
480 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
482 ColIndexMapping::new(
485 self.output_indices.iter().map(|x| Some(*x)).collect(),
486 self.internal_column_num(),
487 )
488 }
489
490 pub fn add_which_join_key_to_pk(&self) -> EitherOrBoth<(), ()> {
491 match self.join_type {
492 JoinType::Inner | JoinType::AsofInner => {
493 EitherOrBoth::Left(())
497 }
498 JoinType::LeftOuter
499 | JoinType::LeftSemi
500 | JoinType::LeftAnti
501 | JoinType::AsofLeftOuter => EitherOrBoth::Left(()),
502 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
503 EitherOrBoth::Right(())
504 }
505 JoinType::FullOuter => EitherOrBoth::Both((), ()),
506 JoinType::Unspecified => unreachable!(),
507 }
508 }
509
510 pub fn concat_schema(&self) -> Schema {
511 Schema::new(
512 [
513 self.left.schema().fields.clone(),
514 self.right.schema().fields.clone(),
515 ]
516 .concat(),
517 )
518 }
519}
520
521pub fn push_down_into_join(
527 predicate: &mut Condition,
528 left_col_num: usize,
529 right_col_num: usize,
530 ty: JoinType,
531 push_temporal_predicate: bool,
532) -> (Condition, Condition, Condition) {
533 let (left, right) = push_down_to_inputs(
534 predicate,
535 left_col_num,
536 right_col_num,
537 can_push_left_from_filter(ty),
538 can_push_right_from_filter(ty),
539 push_temporal_predicate,
540 );
541
542 let on = if can_push_on_from_filter(ty) {
543 let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
544
545 if push_temporal_predicate {
546 Condition { conjunctions }
547 } else {
548 let on = Condition {
550 conjunctions: conjunctions
551 .extract_if(.., |expr| expr.count_nows() == 0)
552 .collect(),
553 };
554 predicate.conjunctions = conjunctions;
555 on
556 }
557 } else {
558 Condition::true_cond()
559 };
560 (left, right, on)
561}
562
563pub fn push_down_join_condition(
568 on_condition: &mut Condition,
569 left_col_num: usize,
570 right_col_num: usize,
571 ty: JoinType,
572 push_temporal_predicate: bool,
573) -> (Condition, Condition) {
574 push_down_to_inputs(
575 on_condition,
576 left_col_num,
577 right_col_num,
578 can_push_left_from_on(ty),
579 can_push_right_from_on(ty),
580 push_temporal_predicate,
581 )
582}
583
584fn push_down_to_inputs(
589 predicate: &mut Condition,
590 left_col_num: usize,
591 right_col_num: usize,
592 push_left: bool,
593 push_right: bool,
594 push_temporal_predicate: bool,
595) -> (Condition, Condition) {
596 let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
597 let (mut left, right, mut others) = if push_temporal_predicate {
598 Condition { conjunctions }.split(left_col_num, right_col_num)
599 } else {
600 let temporal_filter_cons = conjunctions
601 .extract_if(.., |e| e.count_nows() != 0)
602 .collect_vec();
603 let (left, right, mut others) =
604 Condition { conjunctions }.split(left_col_num, right_col_num);
605
606 others.conjunctions.extend(temporal_filter_cons);
607 (left, right, others)
608 };
609
610 if !push_left {
611 others.conjunctions.extend(left);
612 left = Condition::true_cond();
613 };
614
615 let right = if push_right {
616 let mut mapping = ColIndexMapping::with_shift_offset(
617 left_col_num + right_col_num,
618 -(left_col_num as isize),
619 );
620 right.rewrite_expr(&mut mapping)
621 } else {
622 others.conjunctions.extend(right);
623 Condition::true_cond()
624 };
625
626 predicate.conjunctions = others.conjunctions;
627
628 (left, right)
629}
630
631pub fn can_push_left_from_filter(ty: JoinType) -> bool {
632 matches!(
633 ty,
634 JoinType::Inner
635 | JoinType::LeftOuter
636 | JoinType::LeftSemi
637 | JoinType::LeftAnti
638 | JoinType::AsofInner
639 | JoinType::AsofLeftOuter
640 )
641}
642
643pub fn can_push_right_from_filter(ty: JoinType) -> bool {
644 matches!(
645 ty,
646 JoinType::Inner
647 | JoinType::RightOuter
648 | JoinType::RightSemi
649 | JoinType::RightAnti
650 | JoinType::AsofInner
651 )
652}
653
654pub fn can_push_on_from_filter(ty: JoinType) -> bool {
655 matches!(
656 ty,
657 JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi
658 )
659}
660
661pub fn can_push_left_from_on(ty: JoinType) -> bool {
662 matches!(
663 ty,
664 JoinType::Inner
665 | JoinType::RightOuter
666 | JoinType::LeftSemi
667 | JoinType::AsofInner
668 | JoinType::AsofLeftOuter
669 )
670}
671
672pub fn can_push_right_from_on(ty: JoinType) -> bool {
673 matches!(
674 ty,
675 JoinType::Inner
676 | JoinType::LeftOuter
677 | JoinType::RightSemi
678 | JoinType::AsofInner
679 | JoinType::AsofLeftOuter
680 )
681}