risingwave_frontend/optimizer/plan_node/
stream_upstream_sink_union.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16use pretty_xmlish::XmlNode;
17use risingwave_common::catalog::Schema;
18use risingwave_pb::stream_plan::UpstreamSinkUnionNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::stream::prelude::*;
22use crate::OptimizerContextRef;
23use crate::expr::ExprImpl;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
26use crate::optimizer::plan_node::{ExprRewritable, PlanBase, Stream, StreamNode};
27use crate::optimizer::property::{
28    Distribution, FunctionalDependencySet, MonotonicityDerivation, MonotonicityMap,
29    WatermarkColumns, analyze_monotonicity,
30};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, Educe)]
34#[educe(PartialEq, Eq, Hash)]
35struct UpstreamSinkUnionInner {
36    #[educe(PartialEq(ignore), Hash(ignore))]
37    ctx: OptimizerContextRef,
38    schema: Schema,
39    stream_key: Option<Vec<usize>>,
40    dist: Distribution,
41    stream_kind: StreamKind,
42    // `generated_column` is used to generate the `watermark_columns` field and affects the derivation of subsequent
43    // operators. Since the project operator of `generated_column` is actually on the sink-fragment, not on the table,
44    // only expr can be retained here to determine the `watermark_columns`.
45    generated_column_exprs: Option<Vec<ExprImpl>>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct StreamUpstreamSinkUnion {
50    pub base: PlanBase<Stream>,
51    pub generated_column_exprs: Option<Vec<ExprImpl>>,
52}
53
54impl StreamUpstreamSinkUnion {
55    pub fn new(
56        ctx: OptimizerContextRef,
57        schema: &Schema,
58        stream_key: Option<&[usize]>,
59        dist: Distribution,
60        append_only: bool,
61        user_defined_pk: bool,
62        generated_column_exprs: Option<Vec<ExprImpl>>,
63    ) -> Self {
64        // For upstream sink creating, we require that if the table doesn't define pk or the table is `append_only`, the
65        // upstream sink must be `append_only`.
66        let stream_kind = if append_only || !user_defined_pk {
67            StreamKind::AppendOnly
68        } else {
69            StreamKind::Upsert
70        };
71        let inner = UpstreamSinkUnionInner {
72            ctx,
73            schema: schema.clone(),
74            stream_key: stream_key.map(|keys| keys.to_vec()),
75            dist,
76            stream_kind,
77            generated_column_exprs,
78        };
79
80        Self::new_inner(inner)
81    }
82
83    fn new_inner(inner: UpstreamSinkUnionInner) -> Self {
84        let mut out_watermark_columns = WatermarkColumns::new();
85        let mut out_monotonicity_map = MonotonicityMap::new();
86        // reference `StreamProject::new_inner()`
87        if let Some(ref generated_column_exprs) = inner.generated_column_exprs {
88            for (expr_idx, expr) in generated_column_exprs.iter().enumerate() {
89                if let MonotonicityDerivation::Inherent(monotonicity) = analyze_monotonicity(expr) {
90                    out_monotonicity_map.insert(expr_idx, monotonicity);
91                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
92                        out_watermark_columns.insert(expr_idx, inner.ctx.next_watermark_group_id());
93                    }
94                }
95            }
96        }
97
98        let field_num = inner.schema.fields().len();
99        let base = PlanBase::new_stream(
100            inner.ctx,
101            inner.schema,
102            inner.stream_key, // maybe incorrect
103            FunctionalDependencySet::new(field_num),
104            inner.dist,
105            inner.stream_kind,
106            false, // emit_on_window_close
107            out_watermark_columns,
108            out_monotonicity_map,
109        );
110
111        Self {
112            base,
113            generated_column_exprs: inner.generated_column_exprs,
114        }
115    }
116
117    fn rebuild_inner(&self) -> UpstreamSinkUnionInner {
118        UpstreamSinkUnionInner {
119            ctx: self.base.ctx(),
120            schema: self.base.schema().clone(),
121            stream_key: self.base.stream_key().map(|keys| keys.to_vec()),
122            dist: self.base.distribution().clone(),
123            stream_kind: self.base.stream_kind(),
124            generated_column_exprs: self.generated_column_exprs.clone(),
125        }
126    }
127}
128
129impl Distill for StreamUpstreamSinkUnion {
130    fn distill<'a>(&self) -> XmlNode<'a> {
131        let verbose = self.base.ctx().is_explain_verbose();
132        let mut vec = Vec::new();
133        if verbose && let Some(ow) = watermark_pretty(self.watermark_columns(), self.schema()) {
134            vec.push(("output_watermarks", ow));
135        }
136        childless_record("StreamUpstreamSinkUnion", vec)
137    }
138}
139
140impl_plan_tree_node_for_leaf! { Stream, StreamUpstreamSinkUnion }
141
142impl StreamNode for StreamUpstreamSinkUnion {
143    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
144        PbNodeBody::UpstreamSinkUnion(Box::new(UpstreamSinkUnionNode {
145            // When the table is created, there are no upstreams, so this is empty. When upstream sinks are created
146            // later, the actual upstreams in executor will increase, and during recovery, it will be filled with the
147            // actual upstream infos.
148            init_upstreams: vec![],
149        }))
150    }
151}
152
153impl ExprRewritable<Stream> for StreamUpstreamSinkUnion {
154    fn has_rewritable_expr(&self) -> bool {
155        self.generated_column_exprs.is_some()
156    }
157
158    fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> super::PlanRef<Stream> {
159        let mut inner = self.rebuild_inner();
160        inner.generated_column_exprs = inner
161            .generated_column_exprs
162            .map(|exprs| exprs.into_iter().map(|expr| r.rewrite_expr(expr)).collect());
163        Self::new_inner(inner).into()
164    }
165}
166
167impl ExprVisitable for StreamUpstreamSinkUnion {
168    fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
169        if let Some(exprs) = self.generated_column_exprs.as_ref() {
170            exprs.iter().for_each(|expr| {
171                v.visit_expr(expr);
172            });
173        }
174    }
175}