risingwave_frontend/optimizer/plan_node/
stream_changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::ChangeLogNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::expr_visitable::ExprVisitable;
19use super::stream::StreamPlanRef;
20use super::stream::prelude::PhysicalPlanRef;
21use super::utils::impl_distill_by_unit;
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, generic};
23use crate::PlanRef;
24use crate::optimizer::property::MonotonicityMap;
25use crate::stream_fragmenter::BuildFragmentGraphState;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct StreamChangeLog {
29    pub base: PlanBase<Stream>,
30    core: generic::ChangeLog<PlanRef>,
31}
32
33impl StreamChangeLog {
34    pub fn new(core: generic::ChangeLog<PlanRef>) -> Self {
35        let input = core.input.clone();
36        let dist = input.distribution().clone();
37        // Filter executor won't change the append-only behavior of the stream.
38        let base = PlanBase::new_stream_with_core(
39            &core,
40            dist,
41            // The changelog will convert all delete/update to insert, so it must be true here.
42            true,
43            input.emit_on_window_close(),
44            input.watermark_columns().clone(),
45            MonotonicityMap::new(), // TODO: derive monotonicity
46        );
47        StreamChangeLog { base, core }
48    }
49}
50
51impl PlanTreeNodeUnary for StreamChangeLog {
52    fn input(&self) -> PlanRef {
53        self.core.input.clone()
54    }
55
56    fn clone_with_input(&self, input: PlanRef) -> Self {
57        let mut core = self.core.clone();
58        core.input = input;
59        Self::new(core)
60    }
61}
62
63impl_plan_tree_node_for_unary! { StreamChangeLog }
64impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");
65
66impl StreamNode for StreamChangeLog {
67    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
68        PbNodeBody::Changelog(Box::new(ChangeLogNode {
69            need_op: self.core.need_op,
70        }))
71    }
72}
73
74impl ExprRewritable for StreamChangeLog {}
75
76impl ExprVisitable for StreamChangeLog {}