risingwave_frontend/optimizer/plan_node/generic/
changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Str, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use super::{DistillUnit, GenericPlanNode};
20use crate::OptimizerContextRef;
21use crate::optimizer::plan_node::stream::prelude::GenericPlanRef;
22use crate::optimizer::plan_node::utils::childless_record;
23use crate::optimizer::property::FunctionalDependencySet;
24use crate::utils::ColIndexMappingRewriteExt;
25
26pub const CHANGELOG_OP: &str = "changelog_op";
27pub const _CHANGELOG_ROW_ID: &str = "_changelog_row_id";
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct ChangeLog<PlanRef> {
30    pub input: PlanRef,
31    // If there is no op in the output result, it is false, example 'create materialized view mv1 as with sub as changelog from t1 select v1 from sub;'
32    pub need_op: bool,
33    // Before rewrite. If there is no changelog_row_id in the output result, it is false.
34    // After rewrite. It is always true.
35    pub need_changelog_row_id: bool,
36}
37impl<PlanRef: GenericPlanRef> DistillUnit for ChangeLog<PlanRef> {
38    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
39        childless_record(name, vec![])
40    }
41}
42impl<PlanRef: GenericPlanRef> ChangeLog<PlanRef> {
43    pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
44        ChangeLog {
45            input,
46            need_op,
47            need_changelog_row_id,
48        }
49    }
50
51    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
52        let mut map = vec![None; self.input.schema().len()];
53        (0..self.input.schema().len()).for_each(|i| map[i] = Some(i));
54        ColIndexMapping::new(map, self.schema().len())
55    }
56}
57impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
58    fn schema(&self) -> Schema {
59        let mut fields = self.input.schema().fields.clone();
60        if self.need_op {
61            fields.push(Field::with_name(
62                risingwave_common::types::DataType::Int16,
63                CHANGELOG_OP,
64            ));
65        }
66        if self.need_changelog_row_id {
67            fields.push(Field::with_name(
68                risingwave_common::types::DataType::Serial,
69                _CHANGELOG_ROW_ID,
70            ));
71        }
72        Schema::new(fields)
73    }
74
75    fn stream_key(&self) -> Option<Vec<usize>> {
76        if self.need_changelog_row_id {
77            let keys = vec![self.schema().len() - 1];
78            Some(keys)
79        } else {
80            None
81        }
82    }
83
84    fn ctx(&self) -> OptimizerContextRef {
85        self.input.ctx()
86    }
87
88    fn functional_dependency(&self) -> FunctionalDependencySet {
89        let i2o = self.i2o_col_mapping();
90        i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
91    }
92}