risingwave_frontend/optimizer/plan_node/
stream_project.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ProjectNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::property::{
28    MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
29};
30use crate::scheduler::SchedulerResult;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32use crate::utils::ColIndexMappingRewriteExt;
33
34/// `StreamProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
35/// rows.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamProject {
38    pub base: PlanBase<Stream>,
39    core: generic::Project<PlanRef>,
40    /// All the watermark derivations, (`input_column_index`, `output_column_index`). And the
41    /// derivation expression is the project's expression itself.
42    watermark_derivations: Vec<(usize, usize)>,
43    /// Nondecreasing expression indices. `Project` can produce watermarks for these expressions.
44    nondecreasing_exprs: Vec<usize>,
45    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
46    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
47    noop_update_hint: bool,
48}
49
50impl Distill for StreamProject {
51    fn distill<'a>(&self) -> XmlNode<'a> {
52        let verbose = self.base.ctx().is_explain_verbose();
53
54        let schema = self.schema();
55        let mut vec = self.core.fields_pretty(schema);
56        if let Some(display_output_watermarks) =
57            watermark_pretty(self.base.watermark_columns(), schema)
58        {
59            vec.push(("output_watermarks", display_output_watermarks));
60        }
61        if verbose && self.noop_update_hint {
62            vec.push(("noop_update_hint", "true".into()));
63        }
64        childless_record("StreamProject", vec)
65    }
66}
67
68impl StreamProject {
69    pub fn new(core: generic::Project<PlanRef>) -> Self {
70        let noop_update_hint = core.likely_produces_noop_updates();
71        Self::new_inner(core, noop_update_hint)
72    }
73
74    /// Set the `noop_update_hint` flag to the given value.
75    pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self {
76        Self {
77            noop_update_hint,
78            ..self
79        }
80    }
81
82    fn new_inner(core: generic::Project<PlanRef>, noop_update_hint: bool) -> Self {
83        let ctx = core.ctx();
84        let input = core.input.clone();
85        let distribution = core
86            .i2o_col_mapping()
87            .rewrite_provided_distribution(input.distribution());
88
89        let mut watermark_derivations = vec![];
90        let mut nondecreasing_exprs = vec![];
91        let mut out_watermark_columns = WatermarkColumns::new();
92        let mut out_monotonicity_map = MonotonicityMap::new();
93        for (expr_idx, expr) in core.exprs.iter().enumerate() {
94            use monotonicity_variants::*;
95            match analyze_monotonicity(expr) {
96                Inherent(monotonicity) => {
97                    out_monotonicity_map.insert(expr_idx, monotonicity);
98                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
99                        // TODO(rc): may be we should also derive watermark for constant later
100                        // to produce watermarks
101                        nondecreasing_exprs.push(expr_idx);
102                        // each inherently non-decreasing expr creates a new watermark group
103                        out_watermark_columns.insert(expr_idx, ctx.next_watermark_group_id());
104                    }
105                }
106                FollowingInput(input_idx) => {
107                    let in_monotonicity = input.columns_monotonicity()[input_idx];
108                    out_monotonicity_map.insert(expr_idx, in_monotonicity);
109                    if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
110                        // to propagate watermarks
111                        watermark_derivations.push((input_idx, expr_idx));
112                        // join an existing watermark group
113                        out_watermark_columns.insert(expr_idx, wtmk_group);
114                    }
115                }
116                _FollowingInputInversely(_) => {}
117            }
118        }
119        // Project executor won't change the append-only behavior of the stream, so it depends on
120        // input's `append_only`.
121        let base = PlanBase::new_stream_with_core(
122            &core,
123            distribution,
124            input.stream_kind(),
125            input.emit_on_window_close(),
126            out_watermark_columns,
127            out_monotonicity_map,
128        );
129
130        StreamProject {
131            base,
132            core,
133            watermark_derivations,
134            nondecreasing_exprs,
135            noop_update_hint,
136        }
137    }
138
139    pub fn core(&self) -> &generic::Project<PlanRef> {
140        &self.core
141    }
142
143    pub fn exprs(&self) -> &Vec<ExprImpl> {
144        &self.core.exprs
145    }
146
147    pub fn noop_update_hint(&self) -> bool {
148        self.noop_update_hint
149    }
150}
151
152impl PlanTreeNodeUnary<Stream> for StreamProject {
153    fn input(&self) -> PlanRef {
154        self.core.input.clone()
155    }
156
157    fn clone_with_input(&self, input: PlanRef) -> Self {
158        let mut core = self.core.clone();
159        core.input = input;
160        Self::new_inner(core, self.noop_update_hint)
161    }
162}
163impl_plan_tree_node_for_unary! { Stream, StreamProject}
164
165impl TryToStreamPb for StreamProject {
166    fn try_to_stream_prost_body(
167        &self,
168        _state: &mut BuildFragmentGraphState,
169    ) -> SchedulerResult<PbNodeBody> {
170        let (watermark_input_cols, watermark_output_cols) = self
171            .watermark_derivations
172            .iter()
173            .map(|(i, o)| (*i as u32, *o as u32))
174            .unzip();
175        let select_list = self
176            .core
177            .exprs
178            .iter()
179            .map(|expr| {
180                expr.to_expr_proto_checked_pure(
181                    self.input().stream_kind().is_retract(),
182                    "SELECT list",
183                )
184            })
185            .collect::<crate::error::Result<Vec<_>>>()?;
186        Ok(PbNodeBody::Project(Box::new(ProjectNode {
187            select_list,
188            watermark_input_cols,
189            watermark_output_cols,
190            nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
191            noop_update_hint: self.noop_update_hint,
192        })))
193    }
194}
195
196impl ExprRewritable<Stream> for StreamProject {
197    fn has_rewritable_expr(&self) -> bool {
198        true
199    }
200
201    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
202        let mut core = self.core.clone();
203        core.rewrite_exprs(r);
204        let noop_update_hint = self.noop_update_hint || core.likely_produces_noop_updates();
205        Self::new_inner(core, noop_update_hint).into()
206    }
207}
208
209impl ExprVisitable for StreamProject {
210    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
211        self.core.visit_exprs(v);
212    }
213}