risingwave_frontend/optimizer/plan_node/
stream_project_set.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::ProjectSetNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::stream::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb, generic,
22};
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{
26    MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
27};
28use crate::scheduler::SchedulerResult;
29use crate::stream_fragmenter::BuildFragmentGraphState;
30use crate::utils::ColIndexMappingRewriteExt;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamProjectSet {
34    pub base: PlanBase<Stream>,
35    core: generic::ProjectSet<PlanRef>,
36    /// All the watermark derivations, (`input_column_idx`, `expr_idx`). And the
37    /// derivation expression is the `project_set`'s expression itself.
38    watermark_derivations: Vec<(usize, usize)>,
39    /// Nondecreasing expression indices. `ProjectSet` can produce watermarks for these
40    /// expressions.
41    nondecreasing_exprs: Vec<usize>,
42}
43
44impl StreamProjectSet {
45    pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
46        let ctx = core.input.ctx();
47        let input = core.input.clone();
48        let distribution = core
49            .i2o_col_mapping()
50            .rewrite_provided_distribution(input.distribution());
51
52        let mut watermark_derivations = vec![];
53        let mut nondecreasing_exprs = vec![];
54        let mut out_watermark_columns = WatermarkColumns::new();
55        for (expr_idx, expr) in core.select_list.iter().enumerate() {
56            let out_expr_idx = expr_idx + 1;
57
58            use monotonicity_variants::*;
59            match analyze_monotonicity(expr) {
60                Inherent(monotonicity) => {
61                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
62                        // TODO(rc): may be we should also derive watermark for constant later
63                        // FIXME(rc): we need to check expr is not table function
64                        // to produce watermarks
65                        nondecreasing_exprs.push(expr_idx);
66                        // each inherently non-decreasing expr creates a new watermark group
67                        out_watermark_columns.insert(out_expr_idx, ctx.next_watermark_group_id());
68                    }
69                }
70                FollowingInput(input_idx) => {
71                    if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
72                        // to propagate watermarks
73                        watermark_derivations.push((input_idx, expr_idx));
74                        // join an existing watermark group
75                        out_watermark_columns.insert(out_expr_idx, wtmk_group);
76                    }
77                }
78                _FollowingInputInversely(_) => {}
79            }
80        }
81
82        // ProjectSet executor won't change the append-only behavior of the stream, so it depends on
83        // input's `append_only`.
84        let base = PlanBase::new_stream_with_core(
85            &core,
86            distribution,
87            input.stream_kind(),
88            input.emit_on_window_close(),
89            out_watermark_columns,
90            MonotonicityMap::new(), // TODO: derive monotonicity
91        );
92        StreamProjectSet {
93            base,
94            core,
95            watermark_derivations,
96            nondecreasing_exprs,
97        }
98    }
99}
100impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
101impl_plan_tree_node_for_unary! { Stream, StreamProjectSet }
102
103impl PlanTreeNodeUnary<Stream> for StreamProjectSet {
104    fn input(&self) -> PlanRef {
105        self.core.input.clone()
106    }
107
108    fn clone_with_input(&self, input: PlanRef) -> Self {
109        let mut core = self.core.clone();
110        core.input = input;
111        Self::new(core)
112    }
113}
114
115impl TryToStreamPb for StreamProjectSet {
116    fn try_to_stream_prost_body(
117        &self,
118        _state: &mut BuildFragmentGraphState,
119    ) -> SchedulerResult<PbNodeBody> {
120        let (watermark_input_cols, watermark_expr_indices) = self
121            .watermark_derivations
122            .iter()
123            .map(|(i, o)| (*i as u32, *o as u32))
124            .unzip();
125        let select_list = self
126            .core
127            .select_list
128            .iter()
129            .map(|select_item| {
130                select_item.to_project_set_select_item_proto_checked_pure(
131                    self.input().stream_kind().is_retract(),
132                )
133            })
134            .collect::<crate::error::Result<Vec<_>>>()?;
135        Ok(PbNodeBody::ProjectSet(Box::new(ProjectSetNode {
136            select_list,
137            watermark_input_cols,
138            watermark_expr_indices,
139            nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
140        })))
141    }
142}
143
144impl ExprRewritable<Stream> for StreamProjectSet {
145    fn has_rewritable_expr(&self) -> bool {
146        true
147    }
148
149    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
150        let mut core = self.core.clone();
151        core.rewrite_exprs(r);
152        Self::new(core).into()
153    }
154}
155
156impl ExprVisitable for StreamProjectSet {
157    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
158        self.core.visit_exprs(v);
159    }
160}