risingwave_frontend/optimizer/plan_node/
stream_dml.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct StreamDml {
28 pub base: PlanBase<Stream>,
29 input: PlanRef,
30 column_descs: Vec<ColumnDesc>,
31}
32
33impl StreamDml {
34 pub fn new(input: PlanRef, append_only: bool, column_descs: Vec<ColumnDesc>) -> Self {
36 let base = PlanBase::new_stream(
37 input.ctx(),
38 input.schema().clone(),
39 input.stream_key().map(|v| v.to_vec()),
40 input.functional_dependency().clone(),
41 input.distribution().clone(),
42 input.stream_kind().merge(if append_only {
43 StreamKind::AppendOnly
47 } else {
48 StreamKind::Upsert
51 }),
52 false, WatermarkColumns::new(), MonotonicityMap::new(), );
56
57 Self {
58 base,
59 input,
60 column_descs,
61 }
62 }
63
64 fn column_names(&self) -> Vec<&str> {
65 self.column_descs
66 .iter()
67 .map(|column_desc| column_desc.name.as_str())
68 .collect()
69 }
70}
71
72impl Distill for StreamDml {
73 fn distill<'a>(&self) -> XmlNode<'a> {
74 let col = self
75 .column_names()
76 .iter()
77 .map(|n| Pretty::from(n.to_string()))
78 .collect();
79 let col = Pretty::Array(col);
80 childless_record("StreamDml", vec![("columns", col)])
81 }
82}
83
84impl PlanTreeNodeUnary<Stream> for StreamDml {
85 fn input(&self) -> PlanRef {
86 self.input.clone()
87 }
88
89 fn clone_with_input(&self, input: PlanRef) -> Self {
90 Self::new(input, self.append_only(), self.column_descs.clone())
91 }
92}
93
94impl_plan_tree_node_for_unary! { Stream, StreamDml}
95
96impl StreamNode for StreamDml {
97 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
98 use risingwave_pb::stream_plan::*;
99
100 PbNodeBody::Dml(Box::new(DmlNode {
101 table_id: 0, table_version_id: INITIAL_TABLE_VERSION_ID, column_descs: self.column_descs.iter().map(Into::into).collect(),
104 rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
105 }))
106 }
107}
108
109impl ExprRewritable<Stream> for StreamDml {}
110
111impl ExprVisitable for StreamDml {}