risingwave_frontend/optimizer/plan_node/
stream_project.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ProjectNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::property::{
28    MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
29};
30use crate::stream_fragmenter::BuildFragmentGraphState;
31use crate::utils::ColIndexMappingRewriteExt;
32
33/// `StreamProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
34/// rows.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamProject {
37    pub base: PlanBase<Stream>,
38    core: generic::Project<PlanRef>,
39    /// All the watermark derivations, (`input_column_index`, `output_column_index`). And the
40    /// derivation expression is the project's expression itself.
41    watermark_derivations: Vec<(usize, usize)>,
42    /// Nondecreasing expression indices. `Project` can produce watermarks for these expressions.
43    nondecreasing_exprs: Vec<usize>,
44    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
45    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
46    noop_update_hint: bool,
47}
48
49impl Distill for StreamProject {
50    fn distill<'a>(&self) -> XmlNode<'a> {
51        let verbose = self.base.ctx().is_explain_verbose();
52
53        let schema = self.schema();
54        let mut vec = self.core.fields_pretty(schema);
55        if let Some(display_output_watermarks) =
56            watermark_pretty(self.base.watermark_columns(), schema)
57        {
58            vec.push(("output_watermarks", display_output_watermarks));
59        }
60        if verbose && self.noop_update_hint {
61            vec.push(("noop_update_hint", "true".into()));
62        }
63        childless_record("StreamProject", vec)
64    }
65}
66
67impl StreamProject {
68    pub fn new(core: generic::Project<PlanRef>) -> Self {
69        let noop_update_hint = core.likely_produces_noop_updates();
70        Self::new_inner(core, noop_update_hint)
71    }
72
73    /// Set the `noop_update_hint` flag to the given value.
74    pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self {
75        Self {
76            noop_update_hint,
77            ..self
78        }
79    }
80
81    fn new_inner(core: generic::Project<PlanRef>, noop_update_hint: bool) -> Self {
82        let ctx = core.ctx();
83        let input = core.input.clone();
84        let distribution = core
85            .i2o_col_mapping()
86            .rewrite_provided_distribution(input.distribution());
87
88        let mut watermark_derivations = vec![];
89        let mut nondecreasing_exprs = vec![];
90        let mut out_watermark_columns = WatermarkColumns::new();
91        let mut out_monotonicity_map = MonotonicityMap::new();
92        for (expr_idx, expr) in core.exprs.iter().enumerate() {
93            use monotonicity_variants::*;
94            match analyze_monotonicity(expr) {
95                Inherent(monotonicity) => {
96                    out_monotonicity_map.insert(expr_idx, monotonicity);
97                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
98                        // TODO(rc): may be we should also derive watermark for constant later
99                        // to produce watermarks
100                        nondecreasing_exprs.push(expr_idx);
101                        // each inherently non-decreasing expr creates a new watermark group
102                        out_watermark_columns.insert(expr_idx, ctx.next_watermark_group_id());
103                    }
104                }
105                FollowingInput(input_idx) => {
106                    let in_monotonicity = input.columns_monotonicity()[input_idx];
107                    out_monotonicity_map.insert(expr_idx, in_monotonicity);
108                    if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
109                        // to propagate watermarks
110                        watermark_derivations.push((input_idx, expr_idx));
111                        // join an existing watermark group
112                        out_watermark_columns.insert(expr_idx, wtmk_group);
113                    }
114                }
115                _FollowingInputInversely(_) => {}
116            }
117        }
118        // Project executor won't change the append-only behavior of the stream, so it depends on
119        // input's `append_only`.
120        let base = PlanBase::new_stream_with_core(
121            &core,
122            distribution,
123            input.stream_kind(),
124            input.emit_on_window_close(),
125            out_watermark_columns,
126            out_monotonicity_map,
127        );
128
129        StreamProject {
130            base,
131            core,
132            watermark_derivations,
133            nondecreasing_exprs,
134            noop_update_hint,
135        }
136    }
137
138    pub fn as_logical(&self) -> &generic::Project<PlanRef> {
139        &self.core
140    }
141
142    pub fn exprs(&self) -> &Vec<ExprImpl> {
143        &self.core.exprs
144    }
145
146    pub fn noop_update_hint(&self) -> bool {
147        self.noop_update_hint
148    }
149}
150
151impl PlanTreeNodeUnary<Stream> for StreamProject {
152    fn input(&self) -> PlanRef {
153        self.core.input.clone()
154    }
155
156    fn clone_with_input(&self, input: PlanRef) -> Self {
157        let mut core = self.core.clone();
158        core.input = input;
159        Self::new_inner(core, self.noop_update_hint)
160    }
161}
162impl_plan_tree_node_for_unary! { Stream, StreamProject}
163
164impl StreamNode for StreamProject {
165    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
166        let (watermark_input_cols, watermark_output_cols) = self
167            .watermark_derivations
168            .iter()
169            .map(|(i, o)| (*i as u32, *o as u32))
170            .unzip();
171        PbNodeBody::Project(Box::new(ProjectNode {
172            select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
173            watermark_input_cols,
174            watermark_output_cols,
175            nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
176            noop_update_hint: self.noop_update_hint,
177        }))
178    }
179}
180
181impl ExprRewritable<Stream> for StreamProject {
182    fn has_rewritable_expr(&self) -> bool {
183        true
184    }
185
186    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
187        let mut core = self.core.clone();
188        core.rewrite_exprs(r);
189        Self::new_inner(core, self.noop_update_hint).into()
190    }
191}
192
193impl ExprVisitable for StreamProject {
194    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
195        self.core.visit_exprs(v);
196    }
197}