risingwave_frontend/optimizer/plan_node/
stream_project_set.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::stream_plan::ProjectSetNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{
27    MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
28};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30use crate::utils::ColIndexMappingRewriteExt;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamProjectSet {
34    pub base: PlanBase<Stream>,
35    core: generic::ProjectSet<PlanRef>,
36    /// All the watermark derivations, (`input_column_idx`, `expr_idx`). And the
37    /// derivation expression is the `project_set`'s expression itself.
38    watermark_derivations: Vec<(usize, usize)>,
39    /// Nondecreasing expression indices. `ProjectSet` can produce watermarks for these
40    /// expressions.
41    nondecreasing_exprs: Vec<usize>,
42}
43
44impl StreamProjectSet {
45    pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
46        let ctx = core.input.ctx();
47        let input = core.input.clone();
48        let distribution = core
49            .i2o_col_mapping()
50            .rewrite_provided_distribution(input.distribution());
51
52        let mut watermark_derivations = vec![];
53        let mut nondecreasing_exprs = vec![];
54        let mut out_watermark_columns = WatermarkColumns::new();
55        for (expr_idx, expr) in core.select_list.iter().enumerate() {
56            let out_expr_idx = expr_idx + 1;
57
58            use monotonicity_variants::*;
59            match analyze_monotonicity(expr) {
60                Inherent(monotonicity) => {
61                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
62                        // TODO(rc): may be we should also derive watermark for constant later
63                        // FIXME(rc): we need to check expr is not table function
64                        // to produce watermarks
65                        nondecreasing_exprs.push(expr_idx);
66                        // each inherently non-decreasing expr creates a new watermark group
67                        out_watermark_columns.insert(out_expr_idx, ctx.next_watermark_group_id());
68                    }
69                }
70                FollowingInput(input_idx) => {
71                    if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
72                        // to propagate watermarks
73                        watermark_derivations.push((input_idx, expr_idx));
74                        // join an existing watermark group
75                        out_watermark_columns.insert(out_expr_idx, wtmk_group);
76                    }
77                }
78                _FollowingInputInversely(_) => {}
79            }
80        }
81
82        // ProjectSet executor won't change the append-only behavior of the stream, so it depends on
83        // input's `append_only`.
84        let base = PlanBase::new_stream_with_core(
85            &core,
86            distribution,
87            input.stream_kind(),
88            input.emit_on_window_close(),
89            out_watermark_columns,
90            MonotonicityMap::new(), // TODO: derive monotonicity
91        );
92        StreamProjectSet {
93            base,
94            core,
95            watermark_derivations,
96            nondecreasing_exprs,
97        }
98    }
99}
100impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
101impl_plan_tree_node_for_unary! { Stream, StreamProjectSet }
102
103impl PlanTreeNodeUnary<Stream> for StreamProjectSet {
104    fn input(&self) -> PlanRef {
105        self.core.input.clone()
106    }
107
108    fn clone_with_input(&self, input: PlanRef) -> Self {
109        let mut core = self.core.clone();
110        core.input = input;
111        Self::new(core)
112    }
113}
114
115impl StreamNode for StreamProjectSet {
116    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
117        let (watermark_input_cols, watermark_expr_indices) = self
118            .watermark_derivations
119            .iter()
120            .map(|(i, o)| (*i as u32, *o as u32))
121            .unzip();
122        PbNodeBody::ProjectSet(Box::new(ProjectSetNode {
123            select_list: self
124                .core
125                .select_list
126                .iter()
127                .map(|select_item| select_item.to_project_set_select_item_proto())
128                .collect_vec(),
129            watermark_input_cols,
130            watermark_expr_indices,
131            nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
132        }))
133    }
134}
135
136impl ExprRewritable<Stream> for StreamProjectSet {
137    fn has_rewritable_expr(&self) -> bool {
138        true
139    }
140
141    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
142        let mut core = self.core.clone();
143        core.rewrite_exprs(r);
144        Self::new(core).into()
145    }
146}
147
148impl ExprVisitable for StreamProjectSet {
149    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
150        self.core.visit_exprs(v);
151    }
152}