risingwave_frontend/optimizer/plan_node/
stream_project.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ProjectNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::property::{
28 MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
29};
30use crate::scheduler::SchedulerResult;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32use crate::utils::ColIndexMappingRewriteExt;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamProject {
38 pub base: PlanBase<Stream>,
39 core: generic::Project<PlanRef>,
40 watermark_derivations: Vec<(usize, usize)>,
43 nondecreasing_exprs: Vec<usize>,
45 noop_update_hint: bool,
48}
49
50impl Distill for StreamProject {
51 fn distill<'a>(&self) -> XmlNode<'a> {
52 let verbose = self.base.ctx().is_explain_verbose();
53
54 let schema = self.schema();
55 let mut vec = self.core.fields_pretty(schema);
56 if let Some(display_output_watermarks) =
57 watermark_pretty(self.base.watermark_columns(), schema)
58 {
59 vec.push(("output_watermarks", display_output_watermarks));
60 }
61 if verbose && self.noop_update_hint {
62 vec.push(("noop_update_hint", "true".into()));
63 }
64 childless_record("StreamProject", vec)
65 }
66}
67
68impl StreamProject {
69 pub fn new(core: generic::Project<PlanRef>) -> Self {
70 let noop_update_hint = core.likely_produces_noop_updates();
71 Self::new_inner(core, noop_update_hint)
72 }
73
74 pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self {
76 Self {
77 noop_update_hint,
78 ..self
79 }
80 }
81
82 fn new_inner(core: generic::Project<PlanRef>, noop_update_hint: bool) -> Self {
83 let ctx = core.ctx();
84 let input = core.input.clone();
85 let distribution = core
86 .i2o_col_mapping()
87 .rewrite_provided_distribution(input.distribution());
88
89 let mut watermark_derivations = vec![];
90 let mut nondecreasing_exprs = vec![];
91 let mut out_watermark_columns = WatermarkColumns::new();
92 let mut out_monotonicity_map = MonotonicityMap::new();
93 for (expr_idx, expr) in core.exprs.iter().enumerate() {
94 use monotonicity_variants::*;
95 match analyze_monotonicity(expr) {
96 Inherent(monotonicity) => {
97 out_monotonicity_map.insert(expr_idx, monotonicity);
98 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
99 nondecreasing_exprs.push(expr_idx);
102 out_watermark_columns.insert(expr_idx, ctx.next_watermark_group_id());
104 }
105 }
106 FollowingInput(input_idx) => {
107 let in_monotonicity = input.columns_monotonicity()[input_idx];
108 out_monotonicity_map.insert(expr_idx, in_monotonicity);
109 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
110 watermark_derivations.push((input_idx, expr_idx));
112 out_watermark_columns.insert(expr_idx, wtmk_group);
114 }
115 }
116 _FollowingInputInversely(_) => {}
117 }
118 }
119 let base = PlanBase::new_stream_with_core(
122 &core,
123 distribution,
124 input.stream_kind(),
125 input.emit_on_window_close(),
126 out_watermark_columns,
127 out_monotonicity_map,
128 );
129
130 StreamProject {
131 base,
132 core,
133 watermark_derivations,
134 nondecreasing_exprs,
135 noop_update_hint,
136 }
137 }
138
139 pub fn core(&self) -> &generic::Project<PlanRef> {
140 &self.core
141 }
142
143 pub fn exprs(&self) -> &Vec<ExprImpl> {
144 &self.core.exprs
145 }
146
147 pub fn noop_update_hint(&self) -> bool {
148 self.noop_update_hint
149 }
150}
151
152impl PlanTreeNodeUnary<Stream> for StreamProject {
153 fn input(&self) -> PlanRef {
154 self.core.input.clone()
155 }
156
157 fn clone_with_input(&self, input: PlanRef) -> Self {
158 let mut core = self.core.clone();
159 core.input = input;
160 Self::new_inner(core, self.noop_update_hint)
161 }
162}
163impl_plan_tree_node_for_unary! { Stream, StreamProject}
164
165impl TryToStreamPb for StreamProject {
166 fn try_to_stream_prost_body(
167 &self,
168 _state: &mut BuildFragmentGraphState,
169 ) -> SchedulerResult<PbNodeBody> {
170 let (watermark_input_cols, watermark_output_cols) = self
171 .watermark_derivations
172 .iter()
173 .map(|(i, o)| (*i as u32, *o as u32))
174 .unzip();
175 let select_list = self
176 .core
177 .exprs
178 .iter()
179 .map(|expr| {
180 expr.to_expr_proto_checked_pure(
181 self.input().stream_kind().is_retract(),
182 "SELECT list",
183 )
184 })
185 .collect::<crate::error::Result<Vec<_>>>()?;
186 Ok(PbNodeBody::Project(Box::new(ProjectNode {
187 select_list,
188 watermark_input_cols,
189 watermark_output_cols,
190 nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
191 noop_update_hint: self.noop_update_hint,
192 })))
193 }
194}
195
196impl ExprRewritable<Stream> for StreamProject {
197 fn has_rewritable_expr(&self) -> bool {
198 true
199 }
200
201 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
202 let mut core = self.core.clone();
203 core.rewrite_exprs(r);
204 let noop_update_hint = self.noop_update_hint || core.likely_produces_noop_updates();
205 Self::new_inner(core, noop_update_hint).into()
206 }
207}
208
209impl ExprVisitable for StreamProject {
210 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
211 self.core.visit_exprs(v);
212 }
213}