risingwave_frontend/optimizer/plan_node/
logical_cte_ref.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail_not_implemented;
16use risingwave_common::util::column_index_mapping::ColIndexMapping;
17
18use super::expr_visitable::ExprVisitable;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalProject, PlanBase,
22    PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream,
23    ToStreamContext, generic,
24};
25use crate::PlanRef;
26use crate::binder::ShareId;
27use crate::error::Result;
28use crate::utils::Condition;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct LogicalCteRef {
32    pub base: PlanBase<Logical>,
33    core: generic::CteRef<PlanRef>,
34}
35
36impl LogicalCteRef {
37    pub fn new(share_id: ShareId, base_plan: PlanRef) -> Self {
38        let core = generic::CteRef::new(share_id, base_plan);
39        let base = PlanBase::new_logical_with_core(&core);
40        Self { base, core }
41    }
42
43    pub fn create(share_id: ShareId, base_plan: PlanRef) -> PlanRef {
44        Self::new(share_id, base_plan).into()
45    }
46}
47
48impl_plan_tree_node_for_leaf! {LogicalCteRef}
49
50impl_distill_by_unit! {LogicalCteRef, core, "LogicalCteRef"}
51
52impl ExprRewritable for LogicalCteRef {}
53
54impl ExprVisitable for LogicalCteRef {}
55
56impl ColPrunable for LogicalCteRef {
57    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
58        LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().copied()).into()
59    }
60}
61
62impl PredicatePushdown for LogicalCteRef {
63    fn predicate_pushdown(
64        &self,
65        _predicate: Condition,
66        _ctx: &mut PredicatePushdownContext,
67    ) -> PlanRef {
68        self.clone().into()
69    }
70}
71
72impl ToBatch for LogicalCteRef {
73    fn to_batch(&self) -> Result<PlanRef> {
74        bail_not_implemented!(
75            issue = 15135,
76            "recursive CTE not supported for to_batch of LogicalCteRef"
77        )
78    }
79}
80
81impl ToStream for LogicalCteRef {
82    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
83        bail_not_implemented!(
84            issue = 15135,
85            "recursive CTE not supported for to_stream of LogicalCteRef"
86        )
87    }
88
89    fn logical_rewrite_for_stream(
90        &self,
91        _ctx: &mut RewriteStreamContext,
92    ) -> Result<(PlanRef, ColIndexMapping)> {
93        bail_not_implemented!(
94            issue = 15135,
95            "recursive CTE not supported for logical_rewrite_for_stream of LogicalCteRef"
96        )
97    }
98}