Skip to main content

risingwave_frontend/optimizer/plan_node/
stream_materialized_exprs.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use pretty_xmlish::Pretty;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_pb::stream_plan::MaterializedExprsNode;
22use risingwave_pb::stream_plan::stream_node::PbNodeBody;
23
24use super::expr_visitable::ExprVisitable;
25use super::generic::{AliasedExpr, GenericPlanRef, PhysicalPlanRef};
26use super::stream::StreamPlanNodeMetadata;
27use super::utils::{Distill, TableCatalogBuilder, childless_record, watermark_pretty};
28use super::{
29    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::catalog::TableCatalog;
32use crate::error::Result;
33use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor, collect_input_refs};
34use crate::optimizer::property::reject_upsert_input;
35use crate::stream_fragmenter::BuildFragmentGraphState;
36use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
37
38/// `StreamMaterializedExprs` materializes the results of a set of expressions.
39/// The expressions are evaluated once and the results are stored in a state table,
40/// avoiding re-evaluation for delete operations.
41/// Particularly useful for expensive or non-deterministic expressions like UDF calls.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamMaterializedExprs {
44    pub base: PlanBase<Stream>,
45    input: PlanRef,
46    exprs: Vec<ExprImpl>,
47    /// Mapping from expr index to field name. May not contain all exprs.
48    field_names: BTreeMap<usize, String>,
49    state_clean_col_idx: Option<usize>,
50}
51
52impl Distill for StreamMaterializedExprs {
53    fn distill<'a>(&self) -> pretty_xmlish::XmlNode<'a> {
54        let verbose = self.base.ctx().is_explain_verbose();
55
56        let schema = self.schema();
57        let mut vec = vec![{
58            let f = |t| Pretty::debug(&t);
59            let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
60            ("exprs", e)
61        }];
62        if let Some(display_output_watermarks) = watermark_pretty(self.watermark_columns(), schema)
63        {
64            vec.push(("output_watermarks", display_output_watermarks));
65        }
66        if let Some(idx) = self.state_clean_col_idx
67            && verbose
68        {
69            vec.push(("state_clean_col_idx", Pretty::display(&idx)));
70        }
71
72        childless_record("StreamMaterializedExprs", vec)
73    }
74}
75
76impl StreamMaterializedExprs {
77    /// Creates a new `StreamMaterializedExprs` node.
78    pub fn new(
79        input: PlanRef,
80        exprs: Vec<ExprImpl>,
81        field_names: BTreeMap<usize, String>,
82    ) -> Result<Self> {
83        let input_watermark_cols = input.watermark_columns();
84
85        // Determine if we have a watermark column for state cleaning
86        let state_clean_col_idx = input_watermark_cols.iter().next().map(|(i, _)| i);
87
88        // Create a functional dependency set that includes dependencies from UDF inputs to outputs
89        let input_len = input.schema().len();
90        let output_len = input_len + exprs.len();
91
92        // First, rewrite existing functional dependencies from input
93        let mapping = ColIndexMapping::identity_or_none(input_len, output_len);
94        let mut fd_set =
95            mapping.rewrite_functional_dependency_set(input.functional_dependency().clone());
96
97        // Then, add dependencies from UDF parameters to UDF outputs
98        for (i, expr) in exprs.iter().enumerate() {
99            let output_idx = input_len + i;
100
101            // Create a dependency from all input references in the expression to the output
102            let input_refs = collect_input_refs(input_len, std::iter::once(expr));
103            let input_indices: Vec<_> = input_refs.ones().collect();
104
105            if !input_indices.is_empty() {
106                fd_set.add_functional_dependency_by_column_indices(&input_indices, &[output_idx]);
107            }
108        }
109
110        let base = PlanBase::new_stream(
111            input.ctx(),
112            Self::derive_schema(&input, &exprs, &field_names),
113            input.stream_key().map(|v| v.to_vec()),
114            fd_set,
115            input.distribution().clone(),
116            // Note: even though we persist the evaluation results, there are still passthrough columns.
117            // We still cannot handle upsert input.
118            reject_upsert_input!(input),
119            input.emit_on_window_close(),
120            input.watermark_columns().clone(),
121            input.columns_monotonicity().clone(),
122        );
123
124        Ok(Self {
125            base,
126            input,
127            exprs,
128            field_names,
129            state_clean_col_idx,
130        })
131    }
132
133    fn derive_schema(
134        input: &PlanRef,
135        exprs: &[ExprImpl],
136        field_names: &BTreeMap<usize, String>,
137    ) -> Schema {
138        let ctx = input.ctx();
139        let input_schema = input.schema();
140        let mut all_fields = input_schema.fields.clone();
141        all_fields.reserve(exprs.len());
142
143        for (i, expr) in exprs.iter().enumerate() {
144            let field_name = match field_names.get(&i) {
145                Some(name) => name.clone(),
146                None => format!("$expr{}", ctx.next_expr_display_id()),
147            };
148            let field = Field::with_name(expr.return_type(), field_name);
149            all_fields.push(field);
150        }
151
152        Schema::new(all_fields)
153    }
154
155    fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
156        let input_schema_len = self.input.schema().len();
157
158        self.exprs
159            .iter()
160            .zip_eq_fast(schema.fields().iter().skip(input_schema_len))
161            .map(|(expr, field)| AliasedExpr {
162                expr: ExprDisplay {
163                    expr,
164                    input_schema: self.input.schema(),
165                },
166                alias: Some(field.name.clone()),
167            })
168            .collect()
169    }
170
171    /// Builds a state table catalog for `StreamMaterializedExprs`
172    fn build_state_table(&self) -> TableCatalog {
173        let mut catalog_builder = TableCatalogBuilder::default();
174        let dist_keys = self.distribution().dist_column_indices().to_vec();
175
176        // Add all columns
177        self.schema().fields().iter().for_each(|field| {
178            catalog_builder.add_column(field);
179        });
180
181        // Add table PK columns
182        let mut pk_indices = self
183            .input
184            .stream_key()
185            .expect("Expected stream key")
186            .to_vec();
187        let clean_wtmk_in_pk = if let Some(idx) = self.state_clean_col_idx {
188            if let Some(pos) = pk_indices.iter().position(|&x| x == idx) {
189                Some(pos)
190            } else {
191                pk_indices.push(idx);
192                Some(pk_indices.len() - 1)
193            }
194        } else {
195            None
196        };
197
198        pk_indices.iter().for_each(|idx| {
199            catalog_builder.add_order_column(*idx, OrderType::ascending());
200        });
201
202        let read_prefix_len_hint = pk_indices.len();
203        let mut catalog = catalog_builder.build(dist_keys, read_prefix_len_hint);
204
205        if let Some(idx) = clean_wtmk_in_pk {
206            catalog.clean_watermark_index_in_pk = Some(idx);
207        }
208
209        // Also populate the new clean_watermark_indices field
210        if let Some(col_idx) = self.state_clean_col_idx {
211            catalog.clean_watermark_indices = vec![col_idx];
212        }
213
214        catalog
215    }
216}
217
218impl PlanTreeNodeUnary<Stream> for StreamMaterializedExprs {
219    fn input(&self) -> PlanRef {
220        self.input.clone()
221    }
222
223    fn clone_with_input(&self, input: PlanRef) -> Self {
224        Self::new(input, self.exprs.clone(), self.field_names.clone()).unwrap()
225    }
226}
227impl_plan_tree_node_for_unary! { Stream, StreamMaterializedExprs }
228
229impl StreamNode for StreamMaterializedExprs {
230    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
231        // `StreamMaterializedExprs` is specifically used to safely evaluate impure expressions by
232        // materializing results into state. So we don't need to check for pureness here.
233        PbNodeBody::MaterializedExprs(Box::new(MaterializedExprsNode {
234            exprs: self.exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
235            state_table: Some(
236                self.build_state_table()
237                    .with_id(state.gen_table_id_wrapped())
238                    .to_internal_table_prost(),
239            ),
240            state_clean_col_idx: self.state_clean_col_idx.map(|idx| idx as u32),
241        }))
242    }
243}
244
245impl ExprRewritable<Stream> for StreamMaterializedExprs {
246    fn has_rewritable_expr(&self) -> bool {
247        true
248    }
249
250    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
251        let new_exprs = self
252            .exprs
253            .iter()
254            .map(|e| r.rewrite_expr(e.clone()))
255            .collect();
256        Self::new(self.input.clone(), new_exprs, self.field_names.clone())
257            .unwrap()
258            .into()
259    }
260}
261
262impl ExprVisitable for StreamMaterializedExprs {
263    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
264        self.exprs.iter().for_each(|e| v.visit_expr(e));
265    }
266}