risingwave_frontend/optimizer/plan_node/
stream_project.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ProjectNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanNode;
25use crate::optimizer::property::{
26    MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29use crate::utils::ColIndexMappingRewriteExt;
30
31/// `StreamProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
32/// rows.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct StreamProject {
35    pub base: PlanBase<Stream>,
36    core: generic::Project<PlanRef>,
37    /// All the watermark derivations, (`input_column_index`, `output_column_index`). And the
38    /// derivation expression is the project's expression itself.
39    watermark_derivations: Vec<(usize, usize)>,
40    /// Nondecreasing expression indices. `Project` can produce watermarks for these expressions.
41    nondecreasing_exprs: Vec<usize>,
42    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
43    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
44    noop_update_hint: bool,
45}
46
47impl Distill for StreamProject {
48    fn distill<'a>(&self) -> XmlNode<'a> {
49        let verbose = self.base.ctx().is_explain_verbose();
50
51        let schema = self.schema();
52        let mut vec = self.core.fields_pretty(schema);
53        if let Some(display_output_watermarks) =
54            watermark_pretty(self.base.watermark_columns(), schema)
55        {
56            vec.push(("output_watermarks", display_output_watermarks));
57        }
58        if verbose && self.noop_update_hint {
59            vec.push(("noop_update_hint", "true".into()));
60        }
61        childless_record("StreamProject", vec)
62    }
63}
64
65impl StreamProject {
66    pub fn new(core: generic::Project<PlanRef>) -> Self {
67        let noop_update_hint = core.likely_produces_noop_updates();
68        Self::new_inner(core, noop_update_hint)
69    }
70
71    /// Set the `noop_update_hint` flag to the given value.
72    pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self {
73        Self {
74            noop_update_hint,
75            ..self
76        }
77    }
78
79    fn new_inner(core: generic::Project<PlanRef>, noop_update_hint: bool) -> Self {
80        let ctx = core.ctx();
81        let input = core.input.clone();
82        let distribution = core
83            .i2o_col_mapping()
84            .rewrite_provided_distribution(input.distribution());
85
86        let mut watermark_derivations = vec![];
87        let mut nondecreasing_exprs = vec![];
88        let mut out_watermark_columns = WatermarkColumns::new();
89        let mut out_monotonicity_map = MonotonicityMap::new();
90        for (expr_idx, expr) in core.exprs.iter().enumerate() {
91            use monotonicity_variants::*;
92            match analyze_monotonicity(expr) {
93                Inherent(monotonicity) => {
94                    out_monotonicity_map.insert(expr_idx, monotonicity);
95                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
96                        // TODO(rc): may be we should also derive watermark for constant later
97                        // to produce watermarks
98                        nondecreasing_exprs.push(expr_idx);
99                        // each inherently non-decreasing expr creates a new watermark group
100                        out_watermark_columns.insert(expr_idx, ctx.next_watermark_group_id());
101                    }
102                }
103                FollowingInput(input_idx) => {
104                    let in_monotonicity = input.columns_monotonicity()[input_idx];
105                    out_monotonicity_map.insert(expr_idx, in_monotonicity);
106                    if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
107                        // to propagate watermarks
108                        watermark_derivations.push((input_idx, expr_idx));
109                        // join an existing watermark group
110                        out_watermark_columns.insert(expr_idx, wtmk_group);
111                    }
112                }
113                _FollowingInputInversely(_) => {}
114            }
115        }
116        // Project executor won't change the append-only behavior of the stream, so it depends on
117        // input's `append_only`.
118        let base = PlanBase::new_stream_with_core(
119            &core,
120            distribution,
121            input.append_only(),
122            input.emit_on_window_close(),
123            out_watermark_columns,
124            out_monotonicity_map,
125        );
126
127        StreamProject {
128            base,
129            core,
130            watermark_derivations,
131            nondecreasing_exprs,
132            noop_update_hint,
133        }
134    }
135
136    pub fn as_logical(&self) -> &generic::Project<PlanRef> {
137        &self.core
138    }
139
140    pub fn exprs(&self) -> &Vec<ExprImpl> {
141        &self.core.exprs
142    }
143
144    pub fn noop_update_hint(&self) -> bool {
145        self.noop_update_hint
146    }
147}
148
149impl PlanTreeNodeUnary for StreamProject {
150    fn input(&self) -> PlanRef {
151        self.core.input.clone()
152    }
153
154    fn clone_with_input(&self, input: PlanRef) -> Self {
155        let mut core = self.core.clone();
156        core.input = input;
157        Self::new_inner(core, self.noop_update_hint)
158    }
159}
160impl_plan_tree_node_for_unary! {StreamProject}
161
162impl StreamNode for StreamProject {
163    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
164        let (watermark_input_cols, watermark_output_cols) = self
165            .watermark_derivations
166            .iter()
167            .map(|(i, o)| (*i as u32, *o as u32))
168            .unzip();
169        PbNodeBody::Project(Box::new(ProjectNode {
170            select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
171            watermark_input_cols,
172            watermark_output_cols,
173            nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
174            noop_update_hint: self.noop_update_hint,
175        }))
176    }
177}
178
179impl ExprRewritable for StreamProject {
180    fn has_rewritable_expr(&self) -> bool {
181        true
182    }
183
184    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
185        let mut core = self.core.clone();
186        core.rewrite_exprs(r);
187        Self::new_inner(core, self.noop_update_hint).into()
188    }
189}
190
191impl ExprVisitable for StreamProject {
192    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
193        self.core.visit_exprs(v);
194    }
195}