risingwave_frontend/optimizer/plan_node/
stream_dml.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct StreamDml {
28    pub base: PlanBase<Stream>,
29    input: PlanRef,
30    column_descs: Vec<ColumnDesc>,
31}
32
33impl StreamDml {
34    // `append_only` indicates whether only `INSERT` is allowed.
35    pub fn new(input: PlanRef, append_only: bool, column_descs: Vec<ColumnDesc>) -> Self {
36        let base = PlanBase::new_stream(
37            input.ctx(),
38            input.schema().clone(),
39            input.stream_key().map(|v| v.to_vec()),
40            input.functional_dependency().clone(),
41            input.distribution().clone(),
42            input.stream_kind().merge(if append_only {
43                // For append-only table. Either there will be a `RowIdGen` following the `Dml` and `Union`,
44                // or there will be a `Materialize` with conflict handling enabled. In both cases there
45                // will be no key conflict, so we can treat the merged stream as append-only here.
46                StreamKind::AppendOnly
47            } else {
48                // We cannot guarantee that there's no conflict on stream key between upstream
49                // source and DML input, so we must treat it as upsert here.
50                StreamKind::Upsert
51            }),
52            false,                   // TODO(rc): decide EOWC property
53            WatermarkColumns::new(), // no watermark if dml is allowed
54            MonotonicityMap::new(),  // TODO: derive monotonicity
55        );
56
57        Self {
58            base,
59            input,
60            column_descs,
61        }
62    }
63
64    fn column_names(&self) -> Vec<&str> {
65        self.column_descs
66            .iter()
67            .map(|column_desc| column_desc.name.as_str())
68            .collect()
69    }
70}
71
72impl Distill for StreamDml {
73    fn distill<'a>(&self) -> XmlNode<'a> {
74        let col = self
75            .column_names()
76            .iter()
77            .map(|n| Pretty::from(n.to_string()))
78            .collect();
79        let col = Pretty::Array(col);
80        childless_record("StreamDml", vec![("columns", col)])
81    }
82}
83
84impl PlanTreeNodeUnary<Stream> for StreamDml {
85    fn input(&self) -> PlanRef {
86        self.input.clone()
87    }
88
89    fn clone_with_input(&self, input: PlanRef) -> Self {
90        Self::new(input, self.append_only(), self.column_descs.clone())
91    }
92}
93
94impl_plan_tree_node_for_unary! { Stream, StreamDml}
95
96impl StreamNode for StreamDml {
97    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
98        use risingwave_pb::stream_plan::*;
99
100        PbNodeBody::Dml(Box::new(DmlNode {
101            table_id: 0,                                // Meta will fill this table id.
102            table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id.
103            column_descs: self.column_descs.iter().map(Into::into).collect(),
104            rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
105        }))
106    }
107}
108
109impl ExprRewritable<Stream> for StreamDml {}
110
111impl ExprVisitable for StreamDml {}