risingwave_frontend/optimizer/plan_node/
logical_intersect.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17
18use super::utils::impl_distill_by_unit;
19use super::{
20    ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
21};
22use crate::error::Result;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::{
25    ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
26    ToStreamContext, generic,
27};
28use crate::utils::{ColIndexMapping, Condition};
29
30/// `LogicalIntersect` returns the intersect of the rows of its inputs.
31/// If `all` is false, it needs to eliminate duplicates.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct LogicalIntersect {
34    pub base: PlanBase<Logical>,
35    core: generic::Intersect<PlanRef>,
36}
37
38impl LogicalIntersect {
39    pub fn new(all: bool, inputs: Vec<PlanRef>) -> Self {
40        assert!(Schema::all_type_eq(inputs.iter().map(|x| x.schema())));
41        let core = generic::Intersect { all, inputs };
42        let base = PlanBase::new_logical_with_core(&core);
43        LogicalIntersect { base, core }
44    }
45
46    pub fn create(all: bool, inputs: Vec<PlanRef>) -> PlanRef {
47        LogicalIntersect::new(all, inputs).into()
48    }
49
50    pub fn all(&self) -> bool {
51        self.core.all
52    }
53}
54
55impl PlanTreeNode for LogicalIntersect {
56    fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
57        self.core.inputs.clone().into_iter().collect()
58    }
59
60    fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
61        Self::new(self.all(), inputs.to_vec()).into()
62    }
63}
64
65impl_distill_by_unit!(LogicalIntersect, core, "LogicalIntersect");
66
67impl ColPrunable for LogicalIntersect {
68    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
69        let new_inputs = self
70            .inputs()
71            .iter()
72            .map(|input| input.prune_col(required_cols, ctx))
73            .collect_vec();
74        self.clone_with_inputs(&new_inputs)
75    }
76}
77
78impl ExprRewritable for LogicalIntersect {}
79
80impl ExprVisitable for LogicalIntersect {}
81
82impl PredicatePushdown for LogicalIntersect {
83    fn predicate_pushdown(
84        &self,
85        predicate: Condition,
86        ctx: &mut PredicatePushdownContext,
87    ) -> PlanRef {
88        let new_inputs = self
89            .inputs()
90            .iter()
91            .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
92            .collect_vec();
93        self.clone_with_inputs(&new_inputs)
94    }
95}
96
97impl ToBatch for LogicalIntersect {
98    fn to_batch(&self) -> Result<PlanRef> {
99        unimplemented!()
100    }
101}
102
103impl ToStream for LogicalIntersect {
104    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
105        unimplemented!()
106    }
107
108    fn logical_rewrite_for_stream(
109        &self,
110        _ctx: &mut RewriteStreamContext,
111    ) -> Result<(PlanRef, ColIndexMapping)> {
112        unimplemented!()
113    }
114}