risingwave_frontend/optimizer/plan_node/
stream_project.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ProjectNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanNode;
25use crate::optimizer::property::{
26 MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct StreamProject {
35 pub base: PlanBase<Stream>,
36 core: generic::Project<PlanRef>,
37 watermark_derivations: Vec<(usize, usize)>,
40 nondecreasing_exprs: Vec<usize>,
42 noop_update_hint: bool,
45}
46
47impl Distill for StreamProject {
48 fn distill<'a>(&self) -> XmlNode<'a> {
49 let verbose = self.base.ctx().is_explain_verbose();
50
51 let schema = self.schema();
52 let mut vec = self.core.fields_pretty(schema);
53 if let Some(display_output_watermarks) =
54 watermark_pretty(self.base.watermark_columns(), schema)
55 {
56 vec.push(("output_watermarks", display_output_watermarks));
57 }
58 if verbose && self.noop_update_hint {
59 vec.push(("noop_update_hint", "true".into()));
60 }
61 childless_record("StreamProject", vec)
62 }
63}
64
65impl StreamProject {
66 pub fn new(core: generic::Project<PlanRef>) -> Self {
67 let noop_update_hint = core.likely_produces_noop_updates();
68 Self::new_inner(core, noop_update_hint)
69 }
70
71 pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self {
73 Self {
74 noop_update_hint,
75 ..self
76 }
77 }
78
79 fn new_inner(core: generic::Project<PlanRef>, noop_update_hint: bool) -> Self {
80 let ctx = core.ctx();
81 let input = core.input.clone();
82 let distribution = core
83 .i2o_col_mapping()
84 .rewrite_provided_distribution(input.distribution());
85
86 let mut watermark_derivations = vec![];
87 let mut nondecreasing_exprs = vec![];
88 let mut out_watermark_columns = WatermarkColumns::new();
89 let mut out_monotonicity_map = MonotonicityMap::new();
90 for (expr_idx, expr) in core.exprs.iter().enumerate() {
91 use monotonicity_variants::*;
92 match analyze_monotonicity(expr) {
93 Inherent(monotonicity) => {
94 out_monotonicity_map.insert(expr_idx, monotonicity);
95 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
96 nondecreasing_exprs.push(expr_idx);
99 out_watermark_columns.insert(expr_idx, ctx.next_watermark_group_id());
101 }
102 }
103 FollowingInput(input_idx) => {
104 let in_monotonicity = input.columns_monotonicity()[input_idx];
105 out_monotonicity_map.insert(expr_idx, in_monotonicity);
106 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
107 watermark_derivations.push((input_idx, expr_idx));
109 out_watermark_columns.insert(expr_idx, wtmk_group);
111 }
112 }
113 _FollowingInputInversely(_) => {}
114 }
115 }
116 let base = PlanBase::new_stream_with_core(
119 &core,
120 distribution,
121 input.append_only(),
122 input.emit_on_window_close(),
123 out_watermark_columns,
124 out_monotonicity_map,
125 );
126
127 StreamProject {
128 base,
129 core,
130 watermark_derivations,
131 nondecreasing_exprs,
132 noop_update_hint,
133 }
134 }
135
136 pub fn as_logical(&self) -> &generic::Project<PlanRef> {
137 &self.core
138 }
139
140 pub fn exprs(&self) -> &Vec<ExprImpl> {
141 &self.core.exprs
142 }
143
144 pub fn noop_update_hint(&self) -> bool {
145 self.noop_update_hint
146 }
147}
148
149impl PlanTreeNodeUnary for StreamProject {
150 fn input(&self) -> PlanRef {
151 self.core.input.clone()
152 }
153
154 fn clone_with_input(&self, input: PlanRef) -> Self {
155 let mut core = self.core.clone();
156 core.input = input;
157 Self::new_inner(core, self.noop_update_hint)
158 }
159}
160impl_plan_tree_node_for_unary! {StreamProject}
161
162impl StreamNode for StreamProject {
163 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
164 let (watermark_input_cols, watermark_output_cols) = self
165 .watermark_derivations
166 .iter()
167 .map(|(i, o)| (*i as u32, *o as u32))
168 .unzip();
169 PbNodeBody::Project(Box::new(ProjectNode {
170 select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
171 watermark_input_cols,
172 watermark_output_cols,
173 nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
174 noop_update_hint: self.noop_update_hint,
175 }))
176 }
177}
178
179impl ExprRewritable for StreamProject {
180 fn has_rewritable_expr(&self) -> bool {
181 true
182 }
183
184 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
185 let mut core = self.core.clone();
186 core.rewrite_exprs(r);
187 Self::new_inner(core, self.noop_update_hint).into()
188 }
189}
190
191impl ExprVisitable for StreamProject {
192 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
193 self.core.visit_exprs(v);
194 }
195}