risingwave_frontend/optimizer/plan_node/
logical_changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::expr_visitable::ExprVisitable;
16use super::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP, GenericPlanRef};
17use super::utils::impl_distill_by_unit;
18use super::{
19    BatchPlanRef, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
20    LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
21    RewriteStreamContext, StreamChangeLog, StreamPlanRef, ToBatch, ToStream, ToStreamContext,
22    gen_filter_and_pushdown, generic,
23};
24use crate::error::ErrorCode::BindError;
25use crate::error::Result;
26use crate::optimizer::plan_node::generic::PhysicalPlanRef;
27use crate::optimizer::property::Distribution;
28use crate::utils::{ColIndexMapping, Condition};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct LogicalChangeLog {
32    pub base: PlanBase<Logical>,
33    core: generic::ChangeLog<PlanRef>,
34}
35
36impl LogicalChangeLog {
37    pub fn create(input: PlanRef) -> PlanRef {
38        Self::new(input, true, true).into()
39    }
40
41    pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
42        let core = generic::ChangeLog::new(input, need_op, need_changelog_row_id);
43        Self::with_core(core)
44    }
45
46    pub fn with_core(core: generic::ChangeLog<PlanRef>) -> Self {
47        let base = PlanBase::new_logical_with_core(&core);
48        LogicalChangeLog { base, core }
49    }
50}
51
52impl PlanTreeNodeUnary<Logical> for LogicalChangeLog {
53    fn input(&self) -> PlanRef {
54        self.core.input.clone()
55    }
56
57    fn clone_with_input(&self, input: PlanRef) -> Self {
58        Self::new(input, self.core.need_op, self.core.need_changelog_row_id)
59    }
60
61    fn rewrite_with_input(
62        &self,
63        input: PlanRef,
64        input_col_change: ColIndexMapping,
65    ) -> (Self, ColIndexMapping) {
66        let changelog = Self::new(input, self.core.need_op, true);
67
68        let out_col_change = if self.core.need_op {
69            let (mut output_vec, len) = input_col_change.into_parts();
70            output_vec.push(Some(len));
71            ColIndexMapping::new(output_vec, len + 1)
72        } else {
73            input_col_change
74        };
75
76        let (mut output_vec, len) = out_col_change.into_parts();
77        let out_col_change = if self.core.need_changelog_row_id {
78            output_vec.push(Some(len));
79            ColIndexMapping::new(output_vec, len + 1)
80        } else {
81            ColIndexMapping::new(output_vec, len + 1)
82        };
83
84        (changelog, out_col_change)
85    }
86}
87
88impl_plan_tree_node_for_unary! { Logical, LogicalChangeLog}
89impl_distill_by_unit!(LogicalChangeLog, core, "LogicalChangeLog");
90
91impl ExprRewritable<Logical> for LogicalChangeLog {}
92
93impl ExprVisitable for LogicalChangeLog {}
94
95impl PredicatePushdown for LogicalChangeLog {
96    fn predicate_pushdown(
97        &self,
98        predicate: Condition,
99        ctx: &mut super::PredicatePushdownContext,
100    ) -> PlanRef {
101        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
102    }
103}
104
105impl ColPrunable for LogicalChangeLog {
106    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
107        let fields = self.schema().fields();
108        let mut need_op = false;
109        let mut need_changelog_row_id = false;
110        let new_required_cols: Vec<_> = required_cols
111            .iter()
112            .filter_map(|a| {
113                if let Some(f) = fields.get(*a) {
114                    if f.name == CHANGELOG_OP {
115                        need_op = true;
116                        None
117                    } else if f.name == _CHANGELOG_ROW_ID {
118                        need_changelog_row_id = true;
119                        None
120                    } else {
121                        Some(*a)
122                    }
123                } else {
124                    Some(*a)
125                }
126            })
127            .collect();
128
129        let new_input = self.input().prune_col(&new_required_cols, ctx);
130        Self::new(new_input, need_op, need_changelog_row_id).into()
131    }
132}
133
134impl ToBatch for LogicalChangeLog {
135    fn to_batch(&self) -> Result<BatchPlanRef> {
136        Err(BindError("With changelog cte only support with create mv/sink".to_owned()).into())
137    }
138}
139
140impl ToStream for LogicalChangeLog {
141    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
142        let input = self.input().to_stream(ctx)?;
143        let dist = input.distribution();
144        let distribution_keys = match dist {
145            Distribution::HashShard(distribution_keys)
146            | Distribution::UpstreamHashShard(distribution_keys, _) => distribution_keys.clone(),
147            Distribution::Single => {
148                vec![]
149            }
150            _ => {
151                return Err(BindError(format!(
152                    "ChangeLog requires input to be hash distributed, single, but got {:?}",
153                    dist
154                ))
155                .into());
156            }
157        };
158        let core = self.core.clone_with_input(input);
159        let row_id_index = self.schema().fields().len() - 1;
160        let plan = StreamChangeLog::new_with_dist(
161            core,
162            Distribution::HashShard(vec![row_id_index]),
163            distribution_keys.into_iter().map(|k| k as u32).collect(),
164        )
165        .into();
166
167        Ok(plan)
168    }
169
170    fn logical_rewrite_for_stream(
171        &self,
172        ctx: &mut RewriteStreamContext,
173    ) -> Result<(PlanRef, ColIndexMapping)> {
174        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
175        let (changelog, out_col_change) = self.rewrite_with_input(input, input_col_change);
176        Ok((changelog.into(), out_col_change))
177    }
178}