risingwave_frontend/optimizer/plan_node/
stream_changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::ChangeLogNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::expr_visitable::ExprVisitable;
19use super::stream::StreamPlanNodeMetadata;
20use super::stream::prelude::PhysicalPlanRef;
21use super::utils::impl_distill_by_unit;
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, generic};
23use crate::optimizer::StreamPlanRef as PlanRef;
24use crate::optimizer::property::{Distribution, MonotonicityMap, StreamKind};
25use crate::stream_fragmenter::BuildFragmentGraphState;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct StreamChangeLog {
29    pub base: PlanBase<Stream>,
30    core: generic::ChangeLog<PlanRef>,
31    distribution_keys: Vec<u32>,
32}
33
34impl StreamChangeLog {
35    pub fn new_with_dist(
36        core: generic::ChangeLog<PlanRef>,
37        dist: Distribution,
38        distribution_keys: Vec<u32>,
39    ) -> Self {
40        let input = core.input.clone();
41        let base = PlanBase::new_stream_with_core(
42            &core,
43            dist,
44            // The changelog will convert all delete/update to insert, so it must be append-only here.
45            StreamKind::AppendOnly,
46            input.emit_on_window_close(),
47            input.watermark_columns().clone(),
48            MonotonicityMap::new(), // TODO: derive monotonicity
49        );
50        StreamChangeLog {
51            base,
52            core,
53            distribution_keys,
54        }
55    }
56}
57
58impl PlanTreeNodeUnary<Stream> for StreamChangeLog {
59    fn input(&self) -> PlanRef {
60        self.core.input.clone()
61    }
62
63    fn clone_with_input(&self, input: PlanRef) -> Self {
64        let mut core = self.core.clone();
65        core.input = input;
66        Self::new_with_dist(
67            core,
68            self.base.distribution().clone(),
69            self.distribution_keys.clone(),
70        )
71    }
72}
73
74impl_plan_tree_node_for_unary! { Stream, StreamChangeLog }
75impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");
76
77impl StreamNode for StreamChangeLog {
78    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
79        PbNodeBody::Changelog(Box::new(ChangeLogNode {
80            need_op: self.core.need_op,
81            distribution_keys: self.distribution_keys.clone(),
82        }))
83    }
84}
85
86impl ExprRewritable<Stream> for StreamChangeLog {}
87
88impl ExprVisitable for StreamChangeLog {}