risingwave_frontend/optimizer/plan_node/
stream_project.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ProjectNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::property::{
28 MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
29};
30use crate::stream_fragmenter::BuildFragmentGraphState;
31use crate::utils::ColIndexMappingRewriteExt;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamProject {
37 pub base: PlanBase<Stream>,
38 core: generic::Project<PlanRef>,
39 watermark_derivations: Vec<(usize, usize)>,
42 nondecreasing_exprs: Vec<usize>,
44 noop_update_hint: bool,
47}
48
49impl Distill for StreamProject {
50 fn distill<'a>(&self) -> XmlNode<'a> {
51 let verbose = self.base.ctx().is_explain_verbose();
52
53 let schema = self.schema();
54 let mut vec = self.core.fields_pretty(schema);
55 if let Some(display_output_watermarks) =
56 watermark_pretty(self.base.watermark_columns(), schema)
57 {
58 vec.push(("output_watermarks", display_output_watermarks));
59 }
60 if verbose && self.noop_update_hint {
61 vec.push(("noop_update_hint", "true".into()));
62 }
63 childless_record("StreamProject", vec)
64 }
65}
66
67impl StreamProject {
68 pub fn new(core: generic::Project<PlanRef>) -> Self {
69 let noop_update_hint = core.likely_produces_noop_updates();
70 Self::new_inner(core, noop_update_hint)
71 }
72
73 pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self {
75 Self {
76 noop_update_hint,
77 ..self
78 }
79 }
80
81 fn new_inner(core: generic::Project<PlanRef>, noop_update_hint: bool) -> Self {
82 let ctx = core.ctx();
83 let input = core.input.clone();
84 let distribution = core
85 .i2o_col_mapping()
86 .rewrite_provided_distribution(input.distribution());
87
88 let mut watermark_derivations = vec![];
89 let mut nondecreasing_exprs = vec![];
90 let mut out_watermark_columns = WatermarkColumns::new();
91 let mut out_monotonicity_map = MonotonicityMap::new();
92 for (expr_idx, expr) in core.exprs.iter().enumerate() {
93 use monotonicity_variants::*;
94 match analyze_monotonicity(expr) {
95 Inherent(monotonicity) => {
96 out_monotonicity_map.insert(expr_idx, monotonicity);
97 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
98 nondecreasing_exprs.push(expr_idx);
101 out_watermark_columns.insert(expr_idx, ctx.next_watermark_group_id());
103 }
104 }
105 FollowingInput(input_idx) => {
106 let in_monotonicity = input.columns_monotonicity()[input_idx];
107 out_monotonicity_map.insert(expr_idx, in_monotonicity);
108 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
109 watermark_derivations.push((input_idx, expr_idx));
111 out_watermark_columns.insert(expr_idx, wtmk_group);
113 }
114 }
115 _FollowingInputInversely(_) => {}
116 }
117 }
118 let base = PlanBase::new_stream_with_core(
121 &core,
122 distribution,
123 input.stream_kind(),
124 input.emit_on_window_close(),
125 out_watermark_columns,
126 out_monotonicity_map,
127 );
128
129 StreamProject {
130 base,
131 core,
132 watermark_derivations,
133 nondecreasing_exprs,
134 noop_update_hint,
135 }
136 }
137
138 pub fn as_logical(&self) -> &generic::Project<PlanRef> {
139 &self.core
140 }
141
142 pub fn exprs(&self) -> &Vec<ExprImpl> {
143 &self.core.exprs
144 }
145
146 pub fn noop_update_hint(&self) -> bool {
147 self.noop_update_hint
148 }
149}
150
151impl PlanTreeNodeUnary<Stream> for StreamProject {
152 fn input(&self) -> PlanRef {
153 self.core.input.clone()
154 }
155
156 fn clone_with_input(&self, input: PlanRef) -> Self {
157 let mut core = self.core.clone();
158 core.input = input;
159 Self::new_inner(core, self.noop_update_hint)
160 }
161}
162impl_plan_tree_node_for_unary! { Stream, StreamProject}
163
164impl StreamNode for StreamProject {
165 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
166 let (watermark_input_cols, watermark_output_cols) = self
167 .watermark_derivations
168 .iter()
169 .map(|(i, o)| (*i as u32, *o as u32))
170 .unzip();
171 PbNodeBody::Project(Box::new(ProjectNode {
172 select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
173 watermark_input_cols,
174 watermark_output_cols,
175 nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
176 noop_update_hint: self.noop_update_hint,
177 }))
178 }
179}
180
181impl ExprRewritable<Stream> for StreamProject {
182 fn has_rewritable_expr(&self) -> bool {
183 true
184 }
185
186 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
187 let mut core = self.core.clone();
188 core.rewrite_exprs(r);
189 Self::new_inner(core, self.noop_update_hint).into()
190 }
191}
192
193impl ExprVisitable for StreamProject {
194 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
195 self.core.visit_exprs(v);
196 }
197}