risingwave_frontend/optimizer/plan_node/
stream_project_set.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::stream_plan::ProjectSetNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::expr::{ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::property::{
25    MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
26};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28use crate::utils::ColIndexMappingRewriteExt;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamProjectSet {
32    pub base: PlanBase<Stream>,
33    core: generic::ProjectSet<PlanRef>,
34    /// All the watermark derivations, (`input_column_idx`, `expr_idx`). And the
35    /// derivation expression is the `project_set`'s expression itself.
36    watermark_derivations: Vec<(usize, usize)>,
37    /// Nondecreasing expression indices. `ProjectSet` can produce watermarks for these
38    /// expressions.
39    nondecreasing_exprs: Vec<usize>,
40}
41
42impl StreamProjectSet {
43    pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
44        let ctx = core.input.ctx();
45        let input = core.input.clone();
46        let distribution = core
47            .i2o_col_mapping()
48            .rewrite_provided_distribution(input.distribution());
49
50        let mut watermark_derivations = vec![];
51        let mut nondecreasing_exprs = vec![];
52        let mut out_watermark_columns = WatermarkColumns::new();
53        for (expr_idx, expr) in core.select_list.iter().enumerate() {
54            let out_expr_idx = expr_idx + 1;
55
56            use monotonicity_variants::*;
57            match analyze_monotonicity(expr) {
58                Inherent(monotonicity) => {
59                    if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
60                        // TODO(rc): may be we should also derive watermark for constant later
61                        // FIXME(rc): we need to check expr is not table function
62                        // to produce watermarks
63                        nondecreasing_exprs.push(expr_idx);
64                        // each inherently non-decreasing expr creates a new watermark group
65                        out_watermark_columns.insert(out_expr_idx, ctx.next_watermark_group_id());
66                    }
67                }
68                FollowingInput(input_idx) => {
69                    if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
70                        // to propagate watermarks
71                        watermark_derivations.push((input_idx, expr_idx));
72                        // join an existing watermark group
73                        out_watermark_columns.insert(out_expr_idx, wtmk_group);
74                    }
75                }
76                _FollowingInputInversely(_) => {}
77            }
78        }
79
80        // ProjectSet executor won't change the append-only behavior of the stream, so it depends on
81        // input's `append_only`.
82        let base = PlanBase::new_stream_with_core(
83            &core,
84            distribution,
85            input.append_only(),
86            input.emit_on_window_close(),
87            out_watermark_columns,
88            MonotonicityMap::new(), // TODO: derive monotonicity
89        );
90        StreamProjectSet {
91            base,
92            core,
93            watermark_derivations,
94            nondecreasing_exprs,
95        }
96    }
97}
98impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
99impl_plan_tree_node_for_unary! { StreamProjectSet }
100
101impl PlanTreeNodeUnary for StreamProjectSet {
102    fn input(&self) -> PlanRef {
103        self.core.input.clone()
104    }
105
106    fn clone_with_input(&self, input: PlanRef) -> Self {
107        let mut core = self.core.clone();
108        core.input = input;
109        Self::new(core)
110    }
111}
112
113impl StreamNode for StreamProjectSet {
114    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
115        let (watermark_input_cols, watermark_expr_indices) = self
116            .watermark_derivations
117            .iter()
118            .map(|(i, o)| (*i as u32, *o as u32))
119            .unzip();
120        PbNodeBody::ProjectSet(Box::new(ProjectSetNode {
121            select_list: self
122                .core
123                .select_list
124                .iter()
125                .map(|select_item| select_item.to_project_set_select_item_proto())
126                .collect_vec(),
127            watermark_input_cols,
128            watermark_expr_indices,
129            nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
130        }))
131    }
132}
133
134impl ExprRewritable for StreamProjectSet {
135    fn has_rewritable_expr(&self) -> bool {
136        true
137    }
138
139    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
140        let mut core = self.core.clone();
141        core.rewrite_exprs(r);
142        Self::new(core).into()
143    }
144}
145
146impl ExprVisitable for StreamProjectSet {
147    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
148        self.core.visit_exprs(v);
149    }
150}