risingwave_frontend/optimizer/plan_node/
logical_except.rs1use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17
18use super::utils::impl_distill_by_unit;
19use super::{
20 BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase,
21 PredicatePushdown, StreamPlanRef, ToBatch, ToStream,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::GenericPlanRef;
26use crate::optimizer::plan_node::{
27 ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
28 ToStreamContext, generic,
29};
30use crate::utils::{ColIndexMapping, Condition};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct LogicalExcept {
36 pub base: PlanBase<Logical>,
37 core: generic::Except<PlanRef>,
38}
39
40impl LogicalExcept {
41 pub fn new(all: bool, inputs: Vec<PlanRef>) -> Self {
42 assert!(Schema::all_type_eq(inputs.iter().map(|x| x.schema())));
43 let core = generic::Except { all, inputs };
44 let base = PlanBase::new_logical_with_core(&core);
45 LogicalExcept { base, core }
46 }
47
48 pub fn create(all: bool, inputs: Vec<PlanRef>) -> PlanRef {
49 LogicalExcept::new(all, inputs).into()
50 }
51
52 pub fn all(&self) -> bool {
53 self.core.all
54 }
55}
56
57impl PlanTreeNode<Logical> for LogicalExcept {
58 fn inputs(&self) -> smallvec::SmallVec<[PlanRef; 2]> {
59 self.core.inputs.clone().into_iter().collect()
60 }
61
62 fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
63 Self::new(self.all(), inputs.to_vec()).into()
64 }
65}
66
67impl_distill_by_unit!(LogicalExcept, core, "LogicalExcept");
68
69impl ColPrunable for LogicalExcept {
70 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
71 let new_inputs = self
72 .inputs()
73 .iter()
74 .map(|input| input.prune_col(required_cols, ctx))
75 .collect_vec();
76 self.clone_with_inputs(&new_inputs)
77 }
78}
79
80impl ExprRewritable<Logical> for LogicalExcept {}
81
82impl ExprVisitable for LogicalExcept {}
83
84impl PredicatePushdown for LogicalExcept {
85 fn predicate_pushdown(
86 &self,
87 predicate: Condition,
88 ctx: &mut PredicatePushdownContext,
89 ) -> PlanRef {
90 let new_inputs = self
91 .inputs()
92 .iter()
93 .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
94 .collect_vec();
95 self.clone_with_inputs(&new_inputs)
96 }
97}
98
99impl ToBatch for LogicalExcept {
100 fn to_batch(&self) -> Result<BatchPlanRef> {
101 unimplemented!()
102 }
103}
104
105impl ToStream for LogicalExcept {
106 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
107 unimplemented!()
108 }
109
110 fn logical_rewrite_for_stream(
111 &self,
112 _ctx: &mut RewriteStreamContext,
113 ) -> Result<(PlanRef, ColIndexMapping)> {
114 unimplemented!()
115 }
116}