risingwave_frontend/optimizer/plan_node/
stream_materialized_exprs.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use pretty_xmlish::Pretty;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_pb::stream_plan::MaterializedExprsNode;
22use risingwave_pb::stream_plan::stream_node::PbNodeBody;
23
24use super::expr_visitable::ExprVisitable;
25use super::generic::{AliasedExpr, GenericPlanRef, PhysicalPlanRef};
26use super::stream::StreamPlanNodeMetadata;
27use super::utils::{Distill, TableCatalogBuilder, childless_record, watermark_pretty};
28use super::{
29    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::catalog::TableCatalog;
32use crate::error::Result;
33use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor, collect_input_refs};
34use crate::optimizer::property::reject_upsert_input;
35use crate::stream_fragmenter::BuildFragmentGraphState;
36use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
37
38/// `StreamMaterializedExprs` materializes the results of a set of expressions.
39/// The expressions are evaluated once and the results are stored in a state table,
40/// avoiding re-evaluation for delete operations.
41/// Particularly useful for expensive or non-deterministic expressions like UDF calls.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamMaterializedExprs {
44    pub base: PlanBase<Stream>,
45    input: PlanRef,
46    exprs: Vec<ExprImpl>,
47    /// Mapping from expr index to field name. May not contain all exprs.
48    field_names: BTreeMap<usize, String>,
49    state_clean_col_idx: Option<usize>,
50}
51
52impl Distill for StreamMaterializedExprs {
53    fn distill<'a>(&self) -> pretty_xmlish::XmlNode<'a> {
54        let verbose = self.base.ctx().is_explain_verbose();
55
56        let schema = self.schema();
57        let mut vec = vec![{
58            let f = |t| Pretty::debug(&t);
59            let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
60            ("exprs", e)
61        }];
62        if let Some(display_output_watermarks) = watermark_pretty(self.watermark_columns(), schema)
63        {
64            vec.push(("output_watermarks", display_output_watermarks));
65        }
66        if verbose && self.state_clean_col_idx.is_some() {
67            vec.push((
68                "state_clean_col_idx",
69                Pretty::display(&self.state_clean_col_idx.unwrap()),
70            ));
71        }
72
73        childless_record("StreamMaterializedExprs", vec)
74    }
75}
76
77impl StreamMaterializedExprs {
78    /// Creates a new `StreamMaterializedExprs` node.
79    pub fn new(
80        input: PlanRef,
81        exprs: Vec<ExprImpl>,
82        field_names: BTreeMap<usize, String>,
83    ) -> Result<Self> {
84        let input_watermark_cols = input.watermark_columns();
85
86        // Determine if we have a watermark column for state cleaning
87        let state_clean_col_idx = input_watermark_cols.iter().next().map(|(i, _)| i);
88
89        // Create a functional dependency set that includes dependencies from UDF inputs to outputs
90        let input_len = input.schema().len();
91        let output_len = input_len + exprs.len();
92
93        // First, rewrite existing functional dependencies from input
94        let mapping = ColIndexMapping::identity_or_none(input_len, output_len);
95        let mut fd_set =
96            mapping.rewrite_functional_dependency_set(input.functional_dependency().clone());
97
98        // Then, add dependencies from UDF parameters to UDF outputs
99        for (i, expr) in exprs.iter().enumerate() {
100            let output_idx = input_len + i;
101
102            // Create a dependency from all input references in the expression to the output
103            let input_refs = collect_input_refs(input_len, std::iter::once(expr));
104            let input_indices: Vec<_> = input_refs.ones().collect();
105
106            if !input_indices.is_empty() {
107                fd_set.add_functional_dependency_by_column_indices(&input_indices, &[output_idx]);
108            }
109        }
110
111        let base = PlanBase::new_stream(
112            input.ctx(),
113            Self::derive_schema(&input, &exprs, &field_names),
114            input.stream_key().map(|v| v.to_vec()),
115            fd_set,
116            input.distribution().clone(),
117            // Note: even though we persist the evaluation results, there are still passthrough columns.
118            // We still cannot handle upsert input.
119            reject_upsert_input!(input),
120            input.emit_on_window_close(),
121            input.watermark_columns().clone(),
122            input.columns_monotonicity().clone(),
123        );
124
125        Ok(Self {
126            base,
127            input,
128            exprs,
129            field_names,
130            state_clean_col_idx,
131        })
132    }
133
134    fn derive_schema(
135        input: &PlanRef,
136        exprs: &[ExprImpl],
137        field_names: &BTreeMap<usize, String>,
138    ) -> Schema {
139        let ctx = input.ctx();
140        let input_schema = input.schema();
141        let mut all_fields = input_schema.fields.clone();
142        all_fields.reserve(exprs.len());
143
144        for (i, expr) in exprs.iter().enumerate() {
145            let field_name = match field_names.get(&i) {
146                Some(name) => name.clone(),
147                None => format!("$expr{}", ctx.next_expr_display_id()),
148            };
149            let field = Field::with_name(expr.return_type(), field_name);
150            all_fields.push(field);
151        }
152
153        Schema::new(all_fields)
154    }
155
156    fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
157        let input_schema_len = self.input.schema().len();
158
159        self.exprs
160            .iter()
161            .zip_eq_fast(schema.fields().iter().skip(input_schema_len))
162            .map(|(expr, field)| AliasedExpr {
163                expr: ExprDisplay {
164                    expr,
165                    input_schema: self.input.schema(),
166                },
167                alias: Some(field.name.clone()),
168            })
169            .collect()
170    }
171
172    /// Builds a state table catalog for `StreamMaterializedExprs`
173    fn build_state_table(&self) -> TableCatalog {
174        let mut catalog_builder = TableCatalogBuilder::default();
175        let dist_keys = self.distribution().dist_column_indices().to_vec();
176
177        // Add all columns
178        self.schema().fields().iter().for_each(|field| {
179            catalog_builder.add_column(field);
180        });
181
182        // Add table PK columns
183        let mut pk_indices = self
184            .input
185            .stream_key()
186            .expect("Expected stream key")
187            .to_vec();
188        let clean_wtmk_in_pk = if let Some(idx) = self.state_clean_col_idx {
189            if let Some(pos) = pk_indices.iter().position(|&x| x == idx) {
190                Some(pos)
191            } else {
192                pk_indices.push(idx);
193                Some(pk_indices.len() - 1)
194            }
195        } else {
196            None
197        };
198
199        pk_indices.iter().for_each(|idx| {
200            catalog_builder.add_order_column(*idx, OrderType::ascending());
201        });
202
203        let read_prefix_len_hint = pk_indices.len();
204        let mut catalog = catalog_builder.build(dist_keys, read_prefix_len_hint);
205
206        if let Some(idx) = clean_wtmk_in_pk {
207            catalog.clean_watermark_index_in_pk = Some(idx);
208            catalog.cleaned_by_watermark = true;
209        }
210
211        catalog
212    }
213}
214
215impl PlanTreeNodeUnary<Stream> for StreamMaterializedExprs {
216    fn input(&self) -> PlanRef {
217        self.input.clone()
218    }
219
220    fn clone_with_input(&self, input: PlanRef) -> Self {
221        Self::new(input, self.exprs.clone(), self.field_names.clone()).unwrap()
222    }
223}
224impl_plan_tree_node_for_unary! { Stream, StreamMaterializedExprs }
225
226impl StreamNode for StreamMaterializedExprs {
227    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
228        PbNodeBody::MaterializedExprs(Box::new(MaterializedExprsNode {
229            exprs: self.exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
230            state_table: Some(
231                self.build_state_table()
232                    .with_id(state.gen_table_id_wrapped())
233                    .to_internal_table_prost(),
234            ),
235            state_clean_col_idx: self.state_clean_col_idx.map(|idx| idx as u32),
236        }))
237    }
238}
239
240impl ExprRewritable<Stream> for StreamMaterializedExprs {
241    fn has_rewritable_expr(&self) -> bool {
242        true
243    }
244
245    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
246        let new_exprs = self
247            .exprs
248            .iter()
249            .map(|e| r.rewrite_expr(e.clone()))
250            .collect();
251        Self::new(self.input.clone(), new_exprs, self.field_names.clone())
252            .unwrap()
253            .into()
254    }
255}
256
257impl ExprVisitable for StreamMaterializedExprs {
258    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
259        self.exprs.iter().for_each(|e| v.visit_expr(e));
260    }
261}