risingwave_frontend/optimizer/plan_node/
logical_except.rs1use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17
18use super::utils::impl_distill_by_unit;
19use super::{
20 ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
21};
22use crate::error::Result;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanRef;
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
27 ToStreamContext, generic,
28};
29use crate::utils::{ColIndexMapping, Condition};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalExcept {
35 pub base: PlanBase<Logical>,
36 core: generic::Except<PlanRef>,
37}
38
39impl LogicalExcept {
40 pub fn new(all: bool, inputs: Vec<PlanRef>) -> Self {
41 assert!(Schema::all_type_eq(inputs.iter().map(|x| x.schema())));
42 let core = generic::Except { all, inputs };
43 let base = PlanBase::new_logical_with_core(&core);
44 LogicalExcept { base, core }
45 }
46
47 pub fn create(all: bool, inputs: Vec<PlanRef>) -> PlanRef {
48 LogicalExcept::new(all, inputs).into()
49 }
50
51 pub fn all(&self) -> bool {
52 self.core.all
53 }
54}
55
56impl PlanTreeNode for LogicalExcept {
57 fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
58 self.core.inputs.clone().into_iter().collect()
59 }
60
61 fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
62 Self::new(self.all(), inputs.to_vec()).into()
63 }
64}
65
66impl_distill_by_unit!(LogicalExcept, core, "LogicalExcept");
67
68impl ColPrunable for LogicalExcept {
69 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
70 let new_inputs = self
71 .inputs()
72 .iter()
73 .map(|input| input.prune_col(required_cols, ctx))
74 .collect_vec();
75 self.clone_with_inputs(&new_inputs)
76 }
77}
78
79impl ExprRewritable for LogicalExcept {}
80
81impl ExprVisitable for LogicalExcept {}
82
83impl PredicatePushdown for LogicalExcept {
84 fn predicate_pushdown(
85 &self,
86 predicate: Condition,
87 ctx: &mut PredicatePushdownContext,
88 ) -> PlanRef {
89 let new_inputs = self
90 .inputs()
91 .iter()
92 .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
93 .collect_vec();
94 self.clone_with_inputs(&new_inputs)
95 }
96}
97
98impl ToBatch for LogicalExcept {
99 fn to_batch(&self) -> Result<PlanRef> {
100 unimplemented!()
101 }
102}
103
104impl ToStream for LogicalExcept {
105 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
106 unimplemented!()
107 }
108
109 fn logical_rewrite_for_stream(
110 &self,
111 _ctx: &mut RewriteStreamContext,
112 ) -> Result<(PlanRef, ColIndexMapping)> {
113 unimplemented!()
114 }
115}