risingwave_frontend/optimizer/plan_node/
logical_changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16
17use super::expr_visitable::ExprVisitable;
18use super::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP, GenericPlanRef};
19use super::utils::impl_distill_by_unit;
20use super::{
21    ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalProject, PlanBase,
22    PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, StreamChangeLog, StreamRowIdGen,
23    ToBatch, ToStream, ToStreamContext, gen_filter_and_pushdown, generic,
24};
25use crate::PlanRef;
26use crate::error::ErrorCode::BindError;
27use crate::error::Result;
28use crate::expr::{ExprImpl, InputRef};
29use crate::optimizer::property::Distribution;
30use crate::utils::{ColIndexMapping, Condition};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct LogicalChangeLog {
34    pub base: PlanBase<Logical>,
35    core: generic::ChangeLog<PlanRef>,
36}
37
38impl LogicalChangeLog {
39    pub fn create(input: PlanRef) -> PlanRef {
40        Self::new(input, true, true).into()
41    }
42
43    pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
44        let core = generic::ChangeLog::new(input, need_op, need_changelog_row_id);
45        Self::with_core(core)
46    }
47
48    pub fn with_core(core: generic::ChangeLog<PlanRef>) -> Self {
49        let base = PlanBase::new_logical_with_core(&core);
50        LogicalChangeLog { base, core }
51    }
52}
53
54impl PlanTreeNodeUnary for LogicalChangeLog {
55    fn input(&self) -> PlanRef {
56        self.core.input.clone()
57    }
58
59    fn clone_with_input(&self, input: PlanRef) -> Self {
60        Self::new(input, self.core.need_op, self.core.need_changelog_row_id)
61    }
62
63    fn rewrite_with_input(
64        &self,
65        input: PlanRef,
66        input_col_change: ColIndexMapping,
67    ) -> (Self, ColIndexMapping) {
68        let changelog = Self::new(input, self.core.need_op, true);
69
70        let out_col_change = if self.core.need_op {
71            let (mut output_vec, len) = input_col_change.into_parts();
72            output_vec.push(Some(len));
73            ColIndexMapping::new(output_vec, len + 1)
74        } else {
75            input_col_change
76        };
77
78        let (mut output_vec, len) = out_col_change.into_parts();
79        let out_col_change = if self.core.need_changelog_row_id {
80            output_vec.push(Some(len));
81            ColIndexMapping::new(output_vec, len + 1)
82        } else {
83            ColIndexMapping::new(output_vec, len + 1)
84        };
85
86        (changelog, out_col_change)
87    }
88}
89
90impl_plan_tree_node_for_unary! {LogicalChangeLog}
91impl_distill_by_unit!(LogicalChangeLog, core, "LogicalChangeLog");
92
93impl ExprRewritable for LogicalChangeLog {}
94
95impl ExprVisitable for LogicalChangeLog {}
96
97impl PredicatePushdown for LogicalChangeLog {
98    fn predicate_pushdown(
99        &self,
100        predicate: Condition,
101        ctx: &mut super::PredicatePushdownContext,
102    ) -> PlanRef {
103        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
104    }
105}
106
107impl ColPrunable for LogicalChangeLog {
108    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
109        let fields = self.schema().fields();
110        let mut need_op = false;
111        let mut need_changelog_row_id = false;
112        let new_required_cols: Vec<_> = required_cols
113            .iter()
114            .filter_map(|a| {
115                if let Some(f) = fields.get(*a) {
116                    if f.name == CHANGELOG_OP {
117                        need_op = true;
118                        None
119                    } else if f.name == _CHANGELOG_ROW_ID {
120                        need_changelog_row_id = true;
121                        None
122                    } else {
123                        Some(*a)
124                    }
125                } else {
126                    Some(*a)
127                }
128            })
129            .collect();
130
131        let new_input = self.input().prune_col(&new_required_cols, ctx);
132        Self::new(new_input, need_op, need_changelog_row_id).into()
133    }
134}
135
136impl ToBatch for LogicalChangeLog {
137    fn to_batch(&self) -> Result<PlanRef> {
138        Err(BindError("With changelog cte only support with create mv/sink".to_owned()).into())
139    }
140}
141
142impl ToStream for LogicalChangeLog {
143    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
144        let new_input = self.input().to_stream(ctx)?;
145
146        let mut new_logical = self.core.clone();
147        new_logical.input = new_input;
148        let plan = StreamChangeLog::new(new_logical).into();
149        let row_id_index = self.schema().fields().len() - 1;
150        let plan = StreamRowIdGen::new_with_dist(
151            plan,
152            row_id_index,
153            Distribution::HashShard(vec![row_id_index]),
154        )
155        .into();
156
157        Ok(plan)
158    }
159
160    fn logical_rewrite_for_stream(
161        &self,
162        ctx: &mut RewriteStreamContext,
163    ) -> Result<(PlanRef, ColIndexMapping)> {
164        let original_schema = self.input().schema().clone();
165        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
166        let exprs = (0..original_schema.len())
167            .map(|x| {
168                ExprImpl::InputRef(
169                    InputRef::new(
170                        input_col_change.map(x),
171                        original_schema.fields[x].data_type.clone(),
172                    )
173                    .into(),
174                )
175            })
176            .collect_vec();
177        let project = LogicalProject::new(input.clone(), exprs);
178        let (project, out_col_change) = project.rewrite_with_input(input, input_col_change);
179        let (changelog, out_col_change) = self.rewrite_with_input(project.into(), out_col_change);
180        Ok((changelog.into(), out_col_change))
181    }
182}