risingwave_frontend/optimizer/plan_node/
stream_dml.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct StreamDml {
28 pub base: PlanBase<Stream>,
29 input: PlanRef,
30 column_descs: Vec<ColumnDesc>,
31}
32
33impl StreamDml {
34 pub fn new(input: PlanRef, append_only: bool, column_descs: Vec<ColumnDesc>) -> Self {
35 let base = PlanBase::new_stream(
36 input.ctx(),
37 input.schema().clone(),
38 input.stream_key().map(|v| v.to_vec()),
39 input.functional_dependency().clone(),
40 input.distribution().clone(),
41 append_only,
42 false, WatermarkColumns::new(), MonotonicityMap::new(), );
46
47 Self {
48 base,
49 input,
50 column_descs,
51 }
52 }
53
54 fn column_names(&self) -> Vec<&str> {
55 self.column_descs
56 .iter()
57 .map(|column_desc| column_desc.name.as_str())
58 .collect()
59 }
60}
61
62impl Distill for StreamDml {
63 fn distill<'a>(&self) -> XmlNode<'a> {
64 let col = self
65 .column_names()
66 .iter()
67 .map(|n| Pretty::from(n.to_string()))
68 .collect();
69 let col = Pretty::Array(col);
70 childless_record("StreamDml", vec![("columns", col)])
71 }
72}
73
74impl PlanTreeNodeUnary for StreamDml {
75 fn input(&self) -> PlanRef {
76 self.input.clone()
77 }
78
79 fn clone_with_input(&self, input: PlanRef) -> Self {
80 Self::new(input, self.append_only(), self.column_descs.clone())
81 }
82}
83
84impl_plan_tree_node_for_unary! {StreamDml}
85
86impl StreamNode for StreamDml {
87 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
88 use risingwave_pb::stream_plan::*;
89
90 PbNodeBody::Dml(Box::new(DmlNode {
91 table_id: 0, table_version_id: INITIAL_TABLE_VERSION_ID, column_descs: self.column_descs.iter().map(Into::into).collect(),
94 rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
95 }))
96 }
97}
98
99impl ExprRewritable for StreamDml {}
100
101impl ExprVisitable for StreamDml {}