risingwave_frontend/optimizer/plan_node/
stream_upstream_sink_union.rs1use educe::Educe;
16use pretty_xmlish::XmlNode;
17use risingwave_common::catalog::Schema;
18use risingwave_pb::stream_plan::UpstreamSinkUnionNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::stream::prelude::*;
22use crate::OptimizerContextRef;
23use crate::expr::ExprImpl;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
26use crate::optimizer::plan_node::{ExprRewritable, PlanBase, Stream, StreamNode};
27use crate::optimizer::property::{
28 Distribution, FunctionalDependencySet, MonotonicityDerivation, MonotonicityMap,
29 WatermarkColumns, analyze_monotonicity,
30};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, Educe)]
34#[educe(PartialEq, Eq, Hash)]
35struct UpstreamSinkUnionInner {
36 #[educe(PartialEq(ignore), Hash(ignore))]
37 ctx: OptimizerContextRef,
38 schema: Schema,
39 stream_key: Option<Vec<usize>>,
40 dist: Distribution,
41 stream_kind: StreamKind,
42 generated_column_exprs: Option<Vec<ExprImpl>>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct StreamUpstreamSinkUnion {
50 pub base: PlanBase<Stream>,
51 pub generated_column_exprs: Option<Vec<ExprImpl>>,
52}
53
54impl StreamUpstreamSinkUnion {
55 pub fn new(
56 ctx: OptimizerContextRef,
57 schema: &Schema,
58 stream_key: Option<&[usize]>,
59 dist: Distribution,
60 append_only: bool,
61 user_defined_pk: bool,
62 generated_column_exprs: Option<Vec<ExprImpl>>,
63 ) -> Self {
64 let stream_kind = if append_only || !user_defined_pk {
67 StreamKind::AppendOnly
68 } else {
69 StreamKind::Upsert
70 };
71 let inner = UpstreamSinkUnionInner {
72 ctx,
73 schema: schema.clone(),
74 stream_key: stream_key.map(|keys| keys.to_vec()),
75 dist,
76 stream_kind,
77 generated_column_exprs,
78 };
79
80 Self::new_inner(inner)
81 }
82
83 fn new_inner(inner: UpstreamSinkUnionInner) -> Self {
84 let mut out_watermark_columns = WatermarkColumns::new();
85 let mut out_monotonicity_map = MonotonicityMap::new();
86 if let Some(ref generated_column_exprs) = inner.generated_column_exprs {
88 for (expr_idx, expr) in generated_column_exprs.iter().enumerate() {
89 if let MonotonicityDerivation::Inherent(monotonicity) = analyze_monotonicity(expr) {
90 out_monotonicity_map.insert(expr_idx, monotonicity);
91 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
92 out_watermark_columns.insert(expr_idx, inner.ctx.next_watermark_group_id());
93 }
94 }
95 }
96 }
97
98 let field_num = inner.schema.fields().len();
99 let base = PlanBase::new_stream(
100 inner.ctx,
101 inner.schema,
102 inner.stream_key, FunctionalDependencySet::new(field_num),
104 inner.dist,
105 inner.stream_kind,
106 false, out_watermark_columns,
108 out_monotonicity_map,
109 );
110
111 Self {
112 base,
113 generated_column_exprs: inner.generated_column_exprs,
114 }
115 }
116
117 fn rebuild_inner(&self) -> UpstreamSinkUnionInner {
118 UpstreamSinkUnionInner {
119 ctx: self.base.ctx(),
120 schema: self.base.schema().clone(),
121 stream_key: self.base.stream_key().map(|keys| keys.to_vec()),
122 dist: self.base.distribution().clone(),
123 stream_kind: self.base.stream_kind(),
124 generated_column_exprs: self.generated_column_exprs.clone(),
125 }
126 }
127}
128
129impl Distill for StreamUpstreamSinkUnion {
130 fn distill<'a>(&self) -> XmlNode<'a> {
131 let verbose = self.base.ctx().is_explain_verbose();
132 let mut vec = Vec::new();
133 if verbose && let Some(ow) = watermark_pretty(self.watermark_columns(), self.schema()) {
134 vec.push(("output_watermarks", ow));
135 }
136 childless_record("StreamUpstreamSinkUnion", vec)
137 }
138}
139
140impl_plan_tree_node_for_leaf! { Stream, StreamUpstreamSinkUnion }
141
142impl StreamNode for StreamUpstreamSinkUnion {
143 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
144 PbNodeBody::UpstreamSinkUnion(Box::new(UpstreamSinkUnionNode {
145 init_upstreams: vec![],
149 }))
150 }
151}
152
153impl ExprRewritable<Stream> for StreamUpstreamSinkUnion {
154 fn has_rewritable_expr(&self) -> bool {
155 self.generated_column_exprs.is_some()
156 }
157
158 fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> super::PlanRef<Stream> {
159 let mut inner = self.rebuild_inner();
160 inner.generated_column_exprs = inner
161 .generated_column_exprs
162 .map(|exprs| exprs.into_iter().map(|expr| r.rewrite_expr(expr)).collect());
163 Self::new_inner(inner).into()
164 }
165}
166
167impl ExprVisitable for StreamUpstreamSinkUnion {
168 fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
169 if let Some(exprs) = self.generated_column_exprs.as_ref() {
170 exprs.iter().for_each(|expr| {
171 v.visit_expr(expr);
172 });
173 }
174 }
175}