risingwave_frontend/optimizer/plan_node/
stream_dml.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct StreamDml {
28    pub base: PlanBase<Stream>,
29    input: PlanRef,
30    column_descs: Vec<ColumnDesc>,
31}
32
33impl StreamDml {
34    pub fn new(input: PlanRef, append_only: bool, column_descs: Vec<ColumnDesc>) -> Self {
35        let base = PlanBase::new_stream(
36            input.ctx(),
37            input.schema().clone(),
38            input.stream_key().map(|v| v.to_vec()),
39            input.functional_dependency().clone(),
40            input.distribution().clone(),
41            append_only,
42            false,                   // TODO(rc): decide EOWC property
43            WatermarkColumns::new(), // no watermark if dml is allowed
44            MonotonicityMap::new(),  // TODO: derive monotonicity
45        );
46
47        Self {
48            base,
49            input,
50            column_descs,
51        }
52    }
53
54    fn column_names(&self) -> Vec<&str> {
55        self.column_descs
56            .iter()
57            .map(|column_desc| column_desc.name.as_str())
58            .collect()
59    }
60}
61
62impl Distill for StreamDml {
63    fn distill<'a>(&self) -> XmlNode<'a> {
64        let col = self
65            .column_names()
66            .iter()
67            .map(|n| Pretty::from(n.to_string()))
68            .collect();
69        let col = Pretty::Array(col);
70        childless_record("StreamDml", vec![("columns", col)])
71    }
72}
73
74impl PlanTreeNodeUnary for StreamDml {
75    fn input(&self) -> PlanRef {
76        self.input.clone()
77    }
78
79    fn clone_with_input(&self, input: PlanRef) -> Self {
80        Self::new(input, self.append_only(), self.column_descs.clone())
81    }
82}
83
84impl_plan_tree_node_for_unary! {StreamDml}
85
86impl StreamNode for StreamDml {
87    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
88        use risingwave_pb::stream_plan::*;
89
90        PbNodeBody::Dml(Box::new(DmlNode {
91            table_id: 0,                                // Meta will fill this table id.
92            table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id.
93            column_descs: self.column_descs.iter().map(Into::into).collect(),
94            rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
95        }))
96    }
97}
98
99impl ExprRewritable for StreamDml {}
100
101impl ExprVisitable for StreamDml {}