risingwave_frontend/optimizer/plan_node/
logical_intersect.rs1use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17
18use super::utils::impl_distill_by_unit;
19use super::{
20 ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
21};
22use crate::error::Result;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::{
25 ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
26 ToStreamContext, generic,
27};
28use crate::utils::{ColIndexMapping, Condition};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct LogicalIntersect {
34 pub base: PlanBase<Logical>,
35 core: generic::Intersect<PlanRef>,
36}
37
38impl LogicalIntersect {
39 pub fn new(all: bool, inputs: Vec<PlanRef>) -> Self {
40 assert!(Schema::all_type_eq(inputs.iter().map(|x| x.schema())));
41 let core = generic::Intersect { all, inputs };
42 let base = PlanBase::new_logical_with_core(&core);
43 LogicalIntersect { base, core }
44 }
45
46 pub fn create(all: bool, inputs: Vec<PlanRef>) -> PlanRef {
47 LogicalIntersect::new(all, inputs).into()
48 }
49
50 pub fn all(&self) -> bool {
51 self.core.all
52 }
53}
54
55impl PlanTreeNode for LogicalIntersect {
56 fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
57 self.core.inputs.clone().into_iter().collect()
58 }
59
60 fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
61 Self::new(self.all(), inputs.to_vec()).into()
62 }
63}
64
65impl_distill_by_unit!(LogicalIntersect, core, "LogicalIntersect");
66
67impl ColPrunable for LogicalIntersect {
68 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
69 let new_inputs = self
70 .inputs()
71 .iter()
72 .map(|input| input.prune_col(required_cols, ctx))
73 .collect_vec();
74 self.clone_with_inputs(&new_inputs)
75 }
76}
77
78impl ExprRewritable for LogicalIntersect {}
79
80impl ExprVisitable for LogicalIntersect {}
81
82impl PredicatePushdown for LogicalIntersect {
83 fn predicate_pushdown(
84 &self,
85 predicate: Condition,
86 ctx: &mut PredicatePushdownContext,
87 ) -> PlanRef {
88 let new_inputs = self
89 .inputs()
90 .iter()
91 .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
92 .collect_vec();
93 self.clone_with_inputs(&new_inputs)
94 }
95}
96
97impl ToBatch for LogicalIntersect {
98 fn to_batch(&self) -> Result<PlanRef> {
99 unimplemented!()
100 }
101}
102
103impl ToStream for LogicalIntersect {
104 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
105 unimplemented!()
106 }
107
108 fn logical_rewrite_for_stream(
109 &self,
110 _ctx: &mut RewriteStreamContext,
111 ) -> Result<(PlanRef, ColIndexMapping)> {
112 unimplemented!()
113 }
114}