1use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, VecDeque};
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Schema;
21use risingwave_pb::plan_common::JoinType;
22
23use super::utils::{Distill, childless_record};
24use super::{
25 ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalJoin, LogicalPlanNodeType,
26 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeBinary, PlanTreeNodeUnary,
27 PredicatePushdown, ToBatch, ToStream,
28};
29use crate::error::{ErrorCode, Result, RwError};
30use crate::expr::{ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall};
31use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
32use crate::optimizer::plan_node::{
33 ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
34 ToStreamContext,
35};
36use crate::optimizer::plan_visitor::TemporalJoinValidator;
37use crate::optimizer::property::FunctionalDependencySet;
38use crate::utils::{
39 ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay,
40 ConnectedComponentLabeller,
41};
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct LogicalMultiJoin {
51 pub base: PlanBase<Logical>,
52 inputs: Vec<PlanRef>,
53 on: Condition,
54 output_indices: Vec<usize>,
55 inner2output: ColIndexMapping,
56 inner_o2i_mapping: Vec<(usize, usize)>,
61 inner_i2o_mappings: Vec<ColIndexMapping>,
62}
63
64impl Distill for LogicalMultiJoin {
65 fn distill<'a>(&self) -> XmlNode<'a> {
66 let fields = (self.inputs.iter())
67 .flat_map(|input| input.schema().fields.clone())
68 .collect();
69 let input_schema = Schema { fields };
70 let cond = Pretty::display(&ConditionDisplay {
71 condition: self.on(),
72 input_schema: &input_schema,
73 });
74 childless_record("LogicalMultiJoin", vec![("on", cond)])
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Hash)]
79pub struct LogicalMultiJoinBuilder {
80 output_indices: Vec<usize>,
81 conjunctions: Vec<ExprImpl>,
84 inputs: Vec<PlanRef>,
85 tot_input_col_num: usize,
86}
87
88impl LogicalMultiJoinBuilder {
89 pub fn add_predicate_above(&mut self, exprs: impl Iterator<Item = ExprImpl>) {
92 let mut mapping = ColIndexMapping::new(
93 self.output_indices.iter().map(|i| Some(*i)).collect(),
94 self.tot_input_col_num,
95 );
96 self.conjunctions
97 .extend(exprs.map(|expr| mapping.rewrite_expr(expr)));
98 }
99
100 pub fn build(self) -> LogicalMultiJoin {
101 LogicalMultiJoin::new(
102 self.inputs,
103 Condition {
104 conjunctions: self.conjunctions,
105 },
106 self.output_indices,
107 )
108 }
109
110 pub fn into_parts(self) -> (Vec<usize>, Vec<ExprImpl>, Vec<PlanRef>, usize) {
111 (
112 self.output_indices,
113 self.conjunctions,
114 self.inputs,
115 self.tot_input_col_num,
116 )
117 }
118
119 pub fn new(plan: PlanRef) -> LogicalMultiJoinBuilder {
120 match plan.node_type() {
121 LogicalPlanNodeType::LogicalJoin => Self::with_join(plan),
122 LogicalPlanNodeType::LogicalFilter => Self::with_filter(plan),
123 LogicalPlanNodeType::LogicalProject => Self::with_project(plan),
124 _ => Self::with_input(plan),
125 }
126 }
127
128 fn with_join(plan: PlanRef) -> LogicalMultiJoinBuilder {
129 let join: &LogicalJoin = plan.as_logical_join().unwrap();
130 if join.join_type() != JoinType::Inner {
131 return Self::with_input(plan);
132 }
133 let left = join.left();
134 let right = join.right();
135
136 if TemporalJoinValidator::exist_dangling_temporal_scan(left.clone()) {
137 return Self::with_input(plan);
138 }
139 if TemporalJoinValidator::exist_dangling_temporal_scan(right.clone()) {
140 return Self::with_input(plan);
141 }
142
143 let mut builder = Self::new(left);
144
145 let (r_output_indices, r_conjunctions, mut r_inputs, r_tot_input_col_num) =
146 Self::new(right).into_parts();
147
148 let mut shift_mapping = ColIndexMapping::with_shift_offset(
151 r_tot_input_col_num,
152 builder.tot_input_col_num as isize,
153 );
154 builder.inputs.append(&mut r_inputs);
155 builder.tot_input_col_num += r_tot_input_col_num;
156
157 builder.conjunctions.extend(
158 r_conjunctions
159 .into_iter()
160 .map(|expr| shift_mapping.rewrite_expr(expr)),
161 );
162
163 builder.output_indices.extend(
164 r_output_indices
165 .into_iter()
166 .map(|idx| shift_mapping.map(idx)),
167 );
168 builder.add_predicate_above(join.on().conjunctions.iter().cloned());
169
170 builder.output_indices = join
171 .output_indices()
172 .iter()
173 .map(|idx| builder.output_indices[*idx])
174 .collect();
175 builder
176 }
177
178 fn with_filter(plan: PlanRef) -> LogicalMultiJoinBuilder {
179 let filter: &LogicalFilter = plan.as_logical_filter().unwrap();
180 let mut builder = Self::new(filter.input());
181 builder.add_predicate_above(filter.predicate().conjunctions.iter().cloned());
182 builder
183 }
184
185 fn with_project(plan: PlanRef) -> LogicalMultiJoinBuilder {
186 let proj: &LogicalProject = plan.as_logical_project().unwrap();
187 let output_indices = match proj.try_as_projection() {
188 Some(output_indices) => output_indices,
189 None => return Self::with_input(plan),
190 };
191 let mut builder = Self::new(proj.input());
192 builder.output_indices = output_indices
193 .into_iter()
194 .map(|i| builder.output_indices[i])
195 .collect();
196 builder
197 }
198
199 fn with_input(input: PlanRef) -> LogicalMultiJoinBuilder {
200 LogicalMultiJoinBuilder {
201 output_indices: (0..input.schema().len()).collect_vec(),
202 conjunctions: vec![],
203 tot_input_col_num: input.schema().len(),
204 inputs: vec![input],
205 }
206 }
207
208 pub fn inputs(&self) -> &[PlanRef] {
209 self.inputs.as_ref()
210 }
211}
212impl LogicalMultiJoin {
213 pub(crate) fn new(inputs: Vec<PlanRef>, on: Condition, output_indices: Vec<usize>) -> Self {
214 let input_schemas = inputs
215 .iter()
216 .map(|input| input.schema().clone())
217 .collect_vec();
218
219 let (inner_o2i_mapping, tot_col_num) = {
220 let mut inner_o2i_mapping = vec![];
221 let mut tot_col_num = 0;
222 for (input_idx, input_schema) in input_schemas.iter().enumerate() {
223 tot_col_num += input_schema.len();
224 for (col_idx, _field) in input_schema.fields().iter().enumerate() {
225 inner_o2i_mapping.push((input_idx, col_idx));
226 }
227 }
228 (inner_o2i_mapping, tot_col_num)
229 };
230 let inner2output = ColIndexMapping::with_remaining_columns(&output_indices, tot_col_num);
231
232 let schema = Schema {
233 fields: output_indices
234 .iter()
235 .map(|idx| inner_o2i_mapping[*idx])
236 .map(|(input_idx, col_idx)| input_schemas[input_idx].fields()[col_idx].clone())
237 .collect(),
238 };
239
240 let inner_i2o_mappings = {
241 let mut i2o_maps = vec![];
242 for input_schema in &input_schemas {
243 let map = vec![None; input_schema.len()];
244 i2o_maps.push(map);
245 }
246 for (out_idx, (input_idx, in_idx)) in inner_o2i_mapping.iter().enumerate() {
247 i2o_maps[*input_idx][*in_idx] = Some(out_idx);
248 }
249
250 i2o_maps
251 .into_iter()
252 .map(|map| ColIndexMapping::new(map, tot_col_num))
253 .collect_vec()
254 };
255
256 let pk_indices = Self::derive_stream_key(&inputs, &inner_i2o_mappings, &inner2output);
257 let functional_dependency = {
258 let mut fd_set = FunctionalDependencySet::new(tot_col_num);
259 let mut column_cnt: usize = 0;
260 let id_mapping = ColIndexMapping::identity(tot_col_num);
261 for i in &inputs {
262 let mapping =
263 ColIndexMapping::with_shift_offset(i.schema().len(), column_cnt as isize)
264 .composite(&id_mapping);
265 mapping
266 .rewrite_functional_dependency_set(i.functional_dependency().clone())
267 .into_dependencies()
268 .into_iter()
269 .for_each(|fd| fd_set.add_functional_dependency(fd));
270 column_cnt += i.schema().len();
271 }
272 for i in &on.conjunctions {
273 if let Some((col, _)) = i.as_eq_const() {
274 fd_set.add_constant_columns(&[col.index()])
275 } else if let Some((left, right)) = i.as_eq_cond() {
276 fd_set.add_functional_dependency_by_column_indices(
277 &[left.index()],
278 &[right.index()],
279 );
280 fd_set.add_functional_dependency_by_column_indices(
281 &[right.index()],
282 &[left.index()],
283 );
284 }
285 }
286 ColIndexMapping::with_remaining_columns(&output_indices, tot_col_num)
287 .rewrite_functional_dependency_set(fd_set)
288 };
289 let base =
290 PlanBase::new_logical(inputs[0].ctx(), schema, pk_indices, functional_dependency);
291
292 Self {
293 base,
294 inputs,
295 on,
296 output_indices,
297 inner2output,
298 inner_o2i_mapping,
299 inner_i2o_mappings,
300 }
301 }
302
303 fn derive_stream_key(
304 inputs: &[PlanRef],
305 inner_i2o_mappings: &[ColIndexMapping],
306 inner2output: &ColIndexMapping,
307 ) -> Option<Vec<usize>> {
308 let mut pk_indices = vec![];
310 for (i, input) in inputs.iter().enumerate() {
311 let input_stream_key = input.stream_key()?;
312 for input_pk_idx in input_stream_key {
313 pk_indices.push(inner_i2o_mappings[i].map(*input_pk_idx));
314 }
315 }
316 pk_indices
317 .into_iter()
318 .map(|col_idx| inner2output.try_map(col_idx))
319 .collect::<Option<Vec<_>>>()
320 }
321
322 pub fn on(&self) -> &Condition {
324 &self.on
325 }
326
327 pub fn clone_with_cond(&self, cond: Condition) -> Self {
329 Self::new(self.inputs.clone(), cond, self.output_indices.clone())
330 }
331}
332
333impl PlanTreeNode<Logical> for LogicalMultiJoin {
334 fn inputs(&self) -> smallvec::SmallVec<[PlanRef; 2]> {
335 let mut vec = smallvec::SmallVec::new();
336 vec.extend(self.inputs.clone());
337 vec
338 }
339
340 fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
341 Self::new(
342 inputs.to_vec(),
343 self.on().clone(),
344 self.output_indices.clone(),
345 )
346 .into()
347 }
348}
349
350impl LogicalMultiJoin {
351 pub fn as_reordered_left_deep_join(&self, join_ordering: &[usize]) -> PlanRef {
352 assert_eq!(join_ordering.len(), self.inputs.len());
353 assert!(!join_ordering.is_empty());
354
355 let base_plan = self.inputs[join_ordering[0]].clone();
356
357 let mut output = join_ordering[1..]
360 .iter()
361 .fold(base_plan, |join_chain, &index| {
362 LogicalJoin::new(
363 join_chain,
364 self.inputs[index].clone(),
365 JoinType::Inner,
366 Condition::true_cond(),
367 )
368 .into()
369 });
370
371 let total_col_num = self.inner2output.source_size();
372 let reorder_mapping = {
373 let mut reorder_mapping = vec![None; total_col_num];
374 join_ordering
375 .iter()
376 .cloned()
377 .flat_map(|input_idx| {
378 (0..self.inputs[input_idx].schema().len())
379 .map(move |col_idx| self.inner_i2o_mappings[input_idx].map(col_idx))
380 })
381 .enumerate()
382 .for_each(|(tar, src)| reorder_mapping[src] = Some(tar));
383 reorder_mapping
384 };
385 output =
386 LogicalProject::with_out_col_idx(output, reorder_mapping.iter().map(|i| i.unwrap()))
387 .into();
388
389 output = LogicalFilter::create(output, self.on.clone());
392 output =
393 LogicalProject::with_out_col_idx(output, self.output_indices.iter().cloned()).into();
394
395 output
396 }
397
398 #[expect(clippy::doc_overindented_list_items)]
399 pub(crate) fn heuristic_ordering(&self) -> Result<Vec<usize>> {
424 let mut labeller = ConnectedComponentLabeller::new(self.inputs.len());
425
426 let (eq_join_conditions, _) = self.on.clone().split_by_input_col_nums(
427 &self.input_col_nums(),
428 true,
430 );
431
432 for k in eq_join_conditions.keys() {
434 labeller.add_edge(k.0, k.1);
435 }
436
437 let mut edge_sets: Vec<_> = labeller.into_edge_sets();
438
439 edge_sets.sort_by_key(|a| std::cmp::Reverse(a.len()));
441
442 let mut join_ordering = vec![];
443
444 for component in edge_sets {
445 let mut eq_cond_edges: Vec<(usize, usize)> = component.into_iter().collect();
446
447 eq_cond_edges.sort();
449
450 if eq_cond_edges.is_empty() {
451 break;
453 };
454
455 let edge = eq_cond_edges.remove(0);
456 join_ordering.extend(&vec![edge.0, edge.1]);
457
458 while !eq_cond_edges.is_empty() {
459 let mut found = vec![];
460 for (idx, edge) in eq_cond_edges.iter().enumerate() {
461 if join_ordering.contains(&edge.1) && join_ordering.contains(&edge.0) {
464 found.push(idx);
465 } else {
466 let new_input = if join_ordering.contains(&edge.0) {
469 edge.1
470 } else if join_ordering.contains(&edge.1) {
471 edge.0
472 } else {
473 continue;
474 };
475 join_ordering.push(new_input);
476 found.push(idx);
477 }
478 }
479 if found.is_empty() {
483 return Err(RwError::from(ErrorCode::InternalError(
484 "Connecting edge not found in join connected subgraph".into(),
485 )));
486 }
487 let mut idx = 0;
488 eq_cond_edges.retain(|_| {
489 let keep = !found.contains(&idx);
490 idx += 1;
491 keep
492 });
493 }
494 }
495 for i in 0..self.inputs.len() {
497 if !join_ordering.contains(&i) {
498 join_ordering.push(i);
499 }
500 }
501 Ok(join_ordering)
502 }
503
504 pub fn as_bushy_tree_join(&self) -> Result<PlanRef> {
513 let (nodes, condition) = self.get_join_graph()?;
514
515 if nodes.is_empty() {
516 return Err(RwError::from(ErrorCode::InternalError(
517 "empty multi-join graph".into(),
518 )));
519 }
520
521 let mut optimized_bushy_tree: Option<(GraphNode, Vec<GraphNode>)> = None;
522 let mut que: VecDeque<(BTreeMap<usize, GraphNode>, Vec<GraphNode>)> =
523 VecDeque::from([(nodes, vec![])]);
524
525 while let Some((mut nodes, mut isolated)) = que.pop_front() {
526 if nodes.len() == 1 {
527 let node = nodes.into_values().next().unwrap();
528
529 if let Some((old, _)) = &optimized_bushy_tree {
530 if node.join_tree.height < old.join_tree.height {
531 optimized_bushy_tree = Some((node, isolated));
532 }
533 } else {
534 optimized_bushy_tree = Some((node, isolated));
535 }
536 continue;
537 } else if nodes.is_empty() {
538 if optimized_bushy_tree.is_none() {
539 let base = isolated.pop().unwrap();
540 optimized_bushy_tree = Some((base, isolated));
541 }
542 continue;
543 }
544
545 let (idx, _) = nodes
546 .iter()
547 .min_by(
548 |(_, x), (_, y)| match x.relations.len().cmp(&y.relations.len()) {
549 Ordering::Less => Ordering::Less,
550 Ordering::Greater => Ordering::Greater,
551 Ordering::Equal => x.join_tree.height.cmp(&y.join_tree.height),
552 },
553 )
554 .unwrap();
555 let n_id = *idx;
556
557 let n = nodes.get(&n_id).unwrap();
558 if n.relations.is_empty() {
559 let n = nodes.remove(&n_id).unwrap();
560 isolated.push(n);
561 que.push_back((nodes, isolated));
562 continue;
563 }
564
565 let mut relations = nodes
566 .get_mut(&n_id)
567 .unwrap()
568 .relations
569 .iter()
570 .cloned()
571 .collect_vec();
572 relations.sort_by(|a, b| {
573 let a = nodes.get(a).unwrap();
574 let b = nodes.get(b).unwrap();
575 match a.join_tree.height.cmp(&b.join_tree.height) {
576 Ordering::Equal => a.id.cmp(&b.id),
577 other => other,
578 }
579 });
580
581 for merge_node_id in &relations {
582 let mut nodes = nodes.clone();
583 let n = nodes.remove(&n_id).unwrap();
584
585 for adj_node_id in &n.relations {
586 if adj_node_id != merge_node_id {
587 let adj_node = nodes.get_mut(adj_node_id).unwrap();
588 adj_node.relations.remove(&n_id);
589 adj_node.relations.insert(*merge_node_id);
590 let merge_node = nodes.get_mut(merge_node_id).unwrap();
591 merge_node.relations.insert(*adj_node_id);
592 }
593 }
594
595 let merge_node = nodes.get_mut(merge_node_id).unwrap();
596 merge_node.relations.remove(&n_id);
597 let l_tree = n.join_tree.clone();
598 let r_tree = std::mem::take(&mut merge_node.join_tree);
599 let new_height = usize::max(l_tree.height, r_tree.height) + 1;
600
601 if let Some(min_height) = optimized_bushy_tree
602 .as_ref()
603 .map(|(t, _)| t.join_tree.height)
604 && min_height < new_height
605 {
606 continue;
607 }
608
609 merge_node.join_tree = JoinTreeNode {
610 idx: None,
611 left: Some(Box::new(l_tree)),
612 right: Some(Box::new(r_tree)),
613 height: new_height,
614 };
615
616 que.push_back((nodes, isolated.clone()));
617 }
618 }
619
620 let mut join_ordering = vec![];
622 let mut output = if let Some((optimized_bushy_tree, isolated)) = optimized_bushy_tree {
623 let optimized_bushy_tree =
624 isolated
625 .into_iter()
626 .fold(optimized_bushy_tree, |chain, n| GraphNode {
627 id: n.id,
628 relations: BTreeSet::default(),
629 join_tree: JoinTreeNode {
630 height: chain.join_tree.height.max(n.join_tree.height) + 1,
631 idx: None,
632 left: Some(Box::new(chain.join_tree)),
633 right: Some(Box::new(n.join_tree)),
634 },
635 });
636 self.create_logical_join(optimized_bushy_tree.join_tree, &mut join_ordering)?
637 } else {
638 return Err(RwError::from(ErrorCode::InternalError(
639 "no plan remain".into(),
640 )));
641 };
642
643 let total_col_num = self.inner2output.source_size();
644 let reorder_mapping = {
645 let mut reorder_mapping = vec![None; total_col_num];
646
647 join_ordering
648 .iter()
649 .cloned()
650 .flat_map(|input_idx| {
651 (0..self.inputs[input_idx].schema().len())
652 .map(move |col_idx| self.inner_i2o_mappings[input_idx].map(col_idx))
653 })
654 .enumerate()
655 .for_each(|(tar, src)| reorder_mapping[src] = Some(tar));
656 reorder_mapping
657 };
658 output =
659 LogicalProject::with_out_col_idx(output, reorder_mapping.iter().map(|i| i.unwrap()))
660 .into();
661
662 output = LogicalFilter::create(output, condition);
663 output =
664 LogicalProject::with_out_col_idx(output, self.output_indices.iter().cloned()).into();
665 Ok(output)
666 }
667
668 pub(crate) fn input_col_nums(&self) -> Vec<usize> {
669 self.inputs.iter().map(|i| i.schema().len()).collect()
670 }
671
672 fn get_join_graph(&self) -> Result<(BTreeMap<usize, GraphNode>, Condition)> {
674 let mut nodes: BTreeMap<_, _> = (0..self.inputs.len())
675 .map(|idx| GraphNode {
676 id: idx,
677 relations: BTreeSet::default(),
678 join_tree: JoinTreeNode {
679 idx: Some(idx),
680 left: None,
681 right: None,
682 height: 0,
683 },
684 })
685 .enumerate()
686 .collect();
687
688 let condition = self.on.clone();
689 let condition = self.eq_condition_derivation(condition)?;
690 let (eq_join_conditions, _) = condition
691 .clone()
692 .split_by_input_col_nums(&self.input_col_nums(), true);
693
694 for ((src, dst), _) in eq_join_conditions {
695 nodes.get_mut(&src).unwrap().relations.insert(dst);
696 nodes.get_mut(&dst).unwrap().relations.insert(src);
697 }
698
699 Ok((nodes, condition))
700 }
701
702 fn eq_condition_derivation(&self, mut condition: Condition) -> Result<Condition> {
704 let (eq_join_conditions, _) = condition
705 .clone()
706 .split_by_input_col_nums(&self.input_col_nums(), true);
707
708 let mut new_conj: BTreeMap<usize, BTreeSet<usize>> = BTreeMap::new();
709 let mut input_ref_map = BTreeMap::new();
710
711 for con in eq_join_conditions.values() {
712 for conj in &con.conjunctions {
713 let (l, r) = conj.as_eq_cond().unwrap();
714 new_conj.entry(l.index).or_default().insert(r.index);
715 new_conj.entry(r.index).or_default().insert(l.index);
716 input_ref_map.insert(l.index, Some(l));
717 input_ref_map.insert(r.index, Some(r));
718 }
719 }
720
721 let mut new_pairs = BTreeSet::new();
722
723 for conjs in new_conj.values() {
724 if conjs.len() < 2 {
725 continue;
726 }
727
728 let conjs = conjs.iter().copied().collect_vec();
729 for i in 0..conjs.len() {
730 for j in i + 1..conjs.len() {
731 if !new_conj.get(&conjs[i]).unwrap().contains(&conjs[j]) {
732 if conjs[i] < conjs[j] {
733 new_pairs.insert((conjs[i], conjs[j]));
734 } else {
735 new_pairs.insert((conjs[j], conjs[i]));
736 }
737 }
738 }
739 }
740 }
741 for (i, j) in new_pairs {
742 condition
743 .conjunctions
744 .push(ExprImpl::FunctionCall(Box::new(FunctionCall::new(
745 ExprType::Equal,
746 vec![
747 ExprImpl::InputRef(Box::new(
748 input_ref_map.get(&i).unwrap().as_ref().unwrap().clone(),
749 )),
750 ExprImpl::InputRef(Box::new(
751 input_ref_map.get(&j).unwrap().as_ref().unwrap().clone(),
752 )),
753 ],
754 )?)));
755 }
756 Ok(condition)
757 }
758
759 fn create_logical_join(
761 &self,
762 mut join_tree: JoinTreeNode,
763 join_ordering: &mut Vec<usize>,
764 ) -> Result<PlanRef> {
765 Ok(match (join_tree.left.take(), join_tree.right.take()) {
766 (Some(l), Some(r)) => LogicalJoin::new(
767 self.create_logical_join(*l, join_ordering)?,
768 self.create_logical_join(*r, join_ordering)?,
769 JoinType::Inner,
770 Condition::true_cond(),
771 )
772 .into(),
773 (None, None) => {
774 if let Some(idx) = join_tree.idx {
775 join_ordering.push(idx);
776 self.inputs[idx].clone()
777 } else {
778 return Err(RwError::from(ErrorCode::InternalError(
779 "id of the leaf node not found in the join tree".into(),
780 )));
781 }
782 }
783 (_, _) => {
784 return Err(RwError::from(ErrorCode::InternalError(
785 "only leaf node can have None subtree".into(),
786 )));
787 }
788 })
789 }
790}
791
792#[derive(Clone, Default, Debug)]
794struct JoinTreeNode {
795 idx: Option<usize>,
796 left: Option<Box<JoinTreeNode>>,
797 right: Option<Box<JoinTreeNode>>,
798 height: usize,
799}
800
801#[derive(Clone, Debug)]
803struct GraphNode {
804 id: usize,
805 join_tree: JoinTreeNode,
806 relations: BTreeSet<usize>,
807}
808
809impl ToStream for LogicalMultiJoin {
810 fn logical_rewrite_for_stream(
811 &self,
812 _ctx: &mut RewriteStreamContext,
813 ) -> Result<(PlanRef, ColIndexMapping)> {
814 panic!(
815 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
816 a temporary lifetime. It only facilitates join reordering during logical planning."
817 )
818 }
819
820 fn to_stream(
821 &self,
822 _ctx: &mut ToStreamContext,
823 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
824 panic!(
825 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
826 a temporary lifetime. It only facilitates join reordering during logical planning."
827 )
828 }
829}
830
831impl ToBatch for LogicalMultiJoin {
832 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
833 panic!(
834 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
835 a temporary lifetime. It only facilitates join reordering during logical planning."
836 )
837 }
838}
839
840impl ColPrunable for LogicalMultiJoin {
841 fn prune_col(&self, _required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
842 panic!(
843 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
844 a temporary lifetime. It only facilitates join reordering during logical planning."
845 )
846 }
847}
848
849impl ExprRewritable<Logical> for LogicalMultiJoin {
850 fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef {
851 panic!(
852 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
853 a temporary lifetime. It only facilitates join reordering during logical planning."
854 )
855 }
856}
857
858impl ExprVisitable for LogicalMultiJoin {
859 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {
860 panic!(
861 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
862 a temporary lifetime. It only facilitates join reordering during logical planning."
863 )
864 }
865}
866
867impl PredicatePushdown for LogicalMultiJoin {
868 fn predicate_pushdown(
869 &self,
870 _predicate: Condition,
871 _ctx: &mut PredicatePushdownContext,
872 ) -> PlanRef {
873 panic!(
874 "Method not available for `LogicalMultiJoin` which is a placeholder node with \
875 a temporary lifetime. It only facilitates join reordering during logical planning."
876 )
877 }
878}
879
880#[cfg(test)]
881mod test {
882 use std::collections::HashSet;
883
884 use risingwave_common::catalog::Field;
885 use risingwave_common::types::DataType;
886 use risingwave_pb::expr::expr_node::Type;
887
888 use super::*;
889 use crate::expr::InputRef;
890 use crate::optimizer::optimizer_context::OptimizerContext;
891 use crate::optimizer::plan_node::LogicalValues;
892 use crate::optimizer::plan_node::generic::GenericPlanRef;
893 use crate::optimizer::property::FunctionalDependency;
894 #[tokio::test]
895 async fn fd_derivation_multi_join() {
896 let ctx = OptimizerContext::mock();
903 let t1 = {
904 let fields: Vec<Field> = vec![
905 Field::with_name(DataType::Int32, "v0"),
906 Field::with_name(DataType::Int32, "v1"),
907 ];
908 let mut values = LogicalValues::new(vec![], Schema { fields }, ctx.clone());
909 values
911 .base
912 .functional_dependency_mut()
913 .add_functional_dependency_by_column_indices(&[0], &[1]);
914 values
915 };
916 let t2 = {
917 let fields: Vec<Field> = vec![
918 Field::with_name(DataType::Int32, "v2"),
919 Field::with_name(DataType::Int32, "v3"),
920 Field::with_name(DataType::Int32, "v4"),
921 ];
922 let mut values = LogicalValues::new(vec![], Schema { fields }, ctx.clone());
923 values
925 .base
926 .functional_dependency_mut()
927 .add_functional_dependency_by_column_indices(&[0], &[1, 2]);
928 values
929 };
930 let t3 = {
931 let fields: Vec<Field> = vec![
932 Field::with_name(DataType::Int32, "v5"),
933 Field::with_name(DataType::Int32, "v6"),
934 ];
935 let mut values = LogicalValues::new(vec![], Schema { fields }, ctx);
936 values
938 .base
939 .functional_dependency_mut()
940 .add_functional_dependency_by_column_indices(&[], &[0]);
941 values
942 };
943 let on: ExprImpl = FunctionCall::new(
945 Type::And,
946 vec![
947 FunctionCall::new(
948 Type::Equal,
949 vec![
950 InputRef::new(0, DataType::Int32).into(),
951 ExprImpl::literal_int(0),
952 ],
953 )
954 .unwrap()
955 .into(),
956 FunctionCall::new(
957 Type::And,
958 vec![
959 FunctionCall::new(
960 Type::Equal,
961 vec![
962 InputRef::new(1, DataType::Int32).into(),
963 InputRef::new(3, DataType::Int32).into(),
964 ],
965 )
966 .unwrap()
967 .into(),
968 FunctionCall::new(
969 Type::Equal,
970 vec![
971 InputRef::new(4, DataType::Int32).into(),
972 InputRef::new(5, DataType::Int32).into(),
973 ],
974 )
975 .unwrap()
976 .into(),
977 ],
978 )
979 .unwrap()
980 .into(),
981 ],
982 )
983 .unwrap()
984 .into();
985 let multi_join = LogicalMultiJoin::new(
986 vec![t1.into(), t2.into(), t3.into()],
987 Condition::with_expr(on),
988 vec![0, 1, 4, 5],
989 );
990 let expected_fd_set: HashSet<_> = [
991 FunctionalDependency::with_indices(4, &[0], &[1]),
992 FunctionalDependency::with_indices(4, &[], &[0, 3]),
993 FunctionalDependency::with_indices(4, &[2], &[3]),
994 FunctionalDependency::with_indices(4, &[3], &[2]),
995 ]
996 .into_iter()
997 .collect();
998 let fd_set: HashSet<_> = multi_join
999 .functional_dependency()
1000 .as_dependencies()
1001 .iter()
1002 .cloned()
1003 .collect();
1004 assert_eq!(expected_fd_set, fd_set);
1005 }
1006}