risingwave_frontend/optimizer/plan_node/
logical_intersect.rs1use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17
18use super::utils::impl_distill_by_unit;
19use super::{
20 ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
21 ToBatch, ToStream,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
27 ToStreamContext, generic,
28};
29use crate::utils::{ColIndexMapping, Condition};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalIntersect {
35 pub base: PlanBase<Logical>,
36 core: generic::Intersect<PlanRef>,
37}
38
39impl LogicalIntersect {
40 pub fn new(all: bool, inputs: Vec<PlanRef>) -> Self {
41 assert!(Schema::all_type_eq(inputs.iter().map(|x| x.schema())));
42 let core = generic::Intersect { all, inputs };
43 let base = PlanBase::new_logical_with_core(&core);
44 LogicalIntersect { base, core }
45 }
46
47 pub fn create(all: bool, inputs: Vec<PlanRef>) -> PlanRef {
48 LogicalIntersect::new(all, inputs).into()
49 }
50
51 pub fn all(&self) -> bool {
52 self.core.all
53 }
54}
55
56impl PlanTreeNode<Logical> for LogicalIntersect {
57 fn inputs(&self) -> smallvec::SmallVec<[PlanRef; 2]> {
58 self.core.inputs.clone().into_iter().collect()
59 }
60
61 fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
62 Self::new(self.all(), inputs.to_vec()).into()
63 }
64}
65
66impl_distill_by_unit!(LogicalIntersect, core, "LogicalIntersect");
67
68impl ColPrunable for LogicalIntersect {
69 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
70 let new_inputs = self
71 .inputs()
72 .iter()
73 .map(|input| input.prune_col(required_cols, ctx))
74 .collect_vec();
75 self.clone_with_inputs(&new_inputs)
76 }
77}
78
79impl ExprRewritable<Logical> for LogicalIntersect {}
80
81impl ExprVisitable for LogicalIntersect {}
82
83impl PredicatePushdown for LogicalIntersect {
84 fn predicate_pushdown(
85 &self,
86 predicate: Condition,
87 ctx: &mut PredicatePushdownContext,
88 ) -> PlanRef {
89 let new_inputs = self
90 .inputs()
91 .iter()
92 .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
93 .collect_vec();
94 self.clone_with_inputs(&new_inputs)
95 }
96}
97
98impl ToBatch for LogicalIntersect {
99 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
100 unimplemented!()
101 }
102}
103
104impl ToStream for LogicalIntersect {
105 fn to_stream(
106 &self,
107 _ctx: &mut ToStreamContext,
108 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
109 unimplemented!()
110 }
111
112 fn logical_rewrite_for_stream(
113 &self,
114 _ctx: &mut RewriteStreamContext,
115 ) -> Result<(PlanRef, ColIndexMapping)> {
116 unimplemented!()
117 }
118}