risingwave_frontend/optimizer/plan_node/
logical_recursive_union.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail_not_implemented;
18use risingwave_common::util::column_index_mapping::ColIndexMapping;
19use smallvec::{SmallVec, smallvec};
20
21use super::expr_visitable::ExprVisitable;
22use super::generic::GenericPlanRef;
23use super::utils::{Distill, childless_record};
24use super::{
25    ColPrunable, ColumnPruningContext, ExprRewritable, Logical, PlanBase, PlanTreeNode,
26    PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream,
27    ToStreamContext, generic,
28};
29use crate::PlanRef;
30use crate::binder::ShareId;
31use crate::error::Result;
32use crate::utils::Condition;
33
34/// `LogicalRecursiveUnion` returns the union of the rows of its inputs.
35/// note: if `all` is false, it needs to eliminate duplicates.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct LogicalRecursiveUnion {
38    pub base: PlanBase<Logical>,
39    core: generic::RecursiveUnion<PlanRef>,
40}
41
42impl LogicalRecursiveUnion {
43    pub fn new(base_plan: PlanRef, recursive: PlanRef, id: ShareId) -> Self {
44        let core = generic::RecursiveUnion {
45            base: base_plan,
46            recursive,
47            id,
48        };
49        let base = PlanBase::new_logical_with_core(&core);
50        LogicalRecursiveUnion { base, core }
51    }
52
53    pub fn create(base_plan: PlanRef, recursive: PlanRef, id: ShareId) -> PlanRef {
54        Self::new(base_plan, recursive, id).into()
55    }
56
57    pub(super) fn pretty_fields(base: impl GenericPlanRef, name: &str) -> XmlNode<'_> {
58        childless_record(name, vec![("id", Pretty::debug(&base.id().0))])
59    }
60}
61
62impl PlanTreeNode for LogicalRecursiveUnion {
63    fn inputs(&self) -> SmallVec<[PlanRef; 2]> {
64        smallvec![self.core.base.clone(), self.core.recursive.clone()]
65    }
66
67    fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
68        let mut inputs = inputs.iter().cloned();
69        Self::create(inputs.next().unwrap(), inputs.next().unwrap(), self.core.id)
70    }
71}
72
73impl Distill for LogicalRecursiveUnion {
74    fn distill<'a>(&self) -> XmlNode<'a> {
75        Self::pretty_fields(&self.base, "LogicalRecursiveUnion")
76    }
77}
78
79impl ColPrunable for LogicalRecursiveUnion {
80    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
81        let new_inputs = self
82            .inputs()
83            .iter()
84            .map(|input| input.prune_col(required_cols, ctx))
85            .collect_vec();
86        let new_plan = self.clone_with_inputs(&new_inputs);
87        self.ctx()
88            .insert_rcte_cache_plan(self.core.id, new_plan.clone());
89        new_plan
90    }
91}
92
93impl ExprRewritable for LogicalRecursiveUnion {}
94
95impl ExprVisitable for LogicalRecursiveUnion {}
96
97impl PredicatePushdown for LogicalRecursiveUnion {
98    fn predicate_pushdown(
99        &self,
100        predicate: Condition,
101        ctx: &mut PredicatePushdownContext,
102    ) -> PlanRef {
103        let new_inputs = self
104            .inputs()
105            .iter()
106            .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
107            .collect_vec();
108        let new_plan = self.clone_with_inputs(&new_inputs);
109        self.ctx()
110            .insert_rcte_cache_plan(self.core.id, new_plan.clone());
111        new_plan
112    }
113}
114
115impl ToBatch for LogicalRecursiveUnion {
116    fn to_batch(&self) -> Result<PlanRef> {
117        bail_not_implemented!(
118            issue = 15135,
119            "recursive CTE not supported for to_batch of LogicalRecursiveUnion"
120        )
121    }
122}
123
124impl ToStream for LogicalRecursiveUnion {
125    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
126        bail_not_implemented!(
127            issue = 15135,
128            "recursive CTE not supported for to_stream of LogicalRecursiveUnion"
129        )
130    }
131
132    fn logical_rewrite_for_stream(
133        &self,
134        _ctx: &mut RewriteStreamContext,
135    ) -> Result<(PlanRef, ColIndexMapping)> {
136        bail_not_implemented!(
137            issue = 15135,
138            "recursive CTE not supported for logical_rewrite_for_stream of LogicalRecursiveUnion"
139        )
140    }
141}