risingwave_frontend/optimizer/plan_node/
stream_materialized_exprs.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use pretty_xmlish::Pretty;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_pb::stream_plan::MaterializedExprsNode;
22use risingwave_pb::stream_plan::stream_node::PbNodeBody;
23
24use super::expr_visitable::ExprVisitable;
25use super::generic::{AliasedExpr, GenericPlanRef, PhysicalPlanRef};
26use super::stream::StreamPlanRef;
27use super::utils::{Distill, TableCatalogBuilder, childless_record, watermark_pretty};
28use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, StreamNode};
29use crate::catalog::TableCatalog;
30use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor, collect_input_refs};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
33
34/// `StreamMaterializedExprs` materializes the results of a set of expressions.
35/// The expressions are evaluated once and the results are stored in a state table,
36/// avoiding re-evaluation for delete operations.
37/// Particularly useful for expensive or non-deterministic expressions like UDF calls.
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamMaterializedExprs {
40    pub base: PlanBase<Stream>,
41    input: PlanRef,
42    exprs: Vec<ExprImpl>,
43    /// Mapping from expr index to field name. May not contain all exprs.
44    field_names: BTreeMap<usize, String>,
45    state_clean_col_idx: Option<usize>,
46}
47
48impl Distill for StreamMaterializedExprs {
49    fn distill<'a>(&self) -> pretty_xmlish::XmlNode<'a> {
50        let verbose = self.base.ctx().is_explain_verbose();
51
52        let schema = self.schema();
53        let mut vec = vec![{
54            let f = |t| Pretty::debug(&t);
55            let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
56            ("exprs", e)
57        }];
58        if let Some(display_output_watermarks) = watermark_pretty(self.watermark_columns(), schema)
59        {
60            vec.push(("output_watermarks", display_output_watermarks));
61        }
62        if verbose && self.state_clean_col_idx.is_some() {
63            vec.push((
64                "state_clean_col_idx",
65                Pretty::display(&self.state_clean_col_idx.unwrap()),
66            ));
67        }
68
69        childless_record("StreamMaterializedExprs", vec)
70    }
71}
72
73impl StreamMaterializedExprs {
74    /// Creates a new `StreamMaterializedExprs` node.
75    pub fn new(input: PlanRef, exprs: Vec<ExprImpl>, field_names: BTreeMap<usize, String>) -> Self {
76        let input_watermark_cols = input.watermark_columns();
77
78        // Determine if we have a watermark column for state cleaning
79        let state_clean_col_idx = input_watermark_cols.iter().next().map(|(i, _)| i);
80
81        // Create a functional dependency set that includes dependencies from UDF inputs to outputs
82        let input_len = input.schema().len();
83        let output_len = input_len + exprs.len();
84
85        // First, rewrite existing functional dependencies from input
86        let mapping = ColIndexMapping::identity_or_none(input_len, output_len);
87        let mut fd_set =
88            mapping.rewrite_functional_dependency_set(input.functional_dependency().clone());
89
90        // Then, add dependencies from UDF parameters to UDF outputs
91        for (i, expr) in exprs.iter().enumerate() {
92            let output_idx = input_len + i;
93
94            // Create a dependency from all input references in the expression to the output
95            let input_refs = collect_input_refs(input_len, std::iter::once(expr));
96            let input_indices: Vec<_> = input_refs.ones().collect();
97
98            if !input_indices.is_empty() {
99                fd_set.add_functional_dependency_by_column_indices(&input_indices, &[output_idx]);
100            }
101        }
102
103        let base = PlanBase::new_stream(
104            input.ctx(),
105            Self::derive_schema(&input, &exprs, &field_names),
106            input.stream_key().map(|v| v.to_vec()),
107            fd_set,
108            input.distribution().clone(),
109            input.append_only(),
110            input.emit_on_window_close(),
111            input.watermark_columns().clone(),
112            input.columns_monotonicity().clone(),
113        );
114
115        Self {
116            base,
117            input,
118            exprs,
119            field_names,
120            state_clean_col_idx,
121        }
122    }
123
124    fn derive_schema(
125        input: &PlanRef,
126        exprs: &[ExprImpl],
127        field_names: &BTreeMap<usize, String>,
128    ) -> Schema {
129        let ctx = input.ctx();
130        let input_schema = input.schema();
131        let mut all_fields = input_schema.fields.clone();
132        all_fields.reserve(exprs.len());
133
134        for (i, expr) in exprs.iter().enumerate() {
135            let field_name = match field_names.get(&i) {
136                Some(name) => name.clone(),
137                None => format!("$expr{}", ctx.next_expr_display_id()),
138            };
139            let field = Field::with_name(expr.return_type(), field_name);
140            all_fields.push(field);
141        }
142
143        Schema::new(all_fields)
144    }
145
146    fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
147        let input_schema_len = self.input.schema().len();
148
149        self.exprs
150            .iter()
151            .zip_eq_fast(schema.fields().iter().skip(input_schema_len))
152            .map(|(expr, field)| AliasedExpr {
153                expr: ExprDisplay {
154                    expr,
155                    input_schema: self.input.schema(),
156                },
157                alias: Some(field.name.clone()),
158            })
159            .collect()
160    }
161
162    /// Builds a state table catalog for `StreamMaterializedExprs`
163    fn build_state_table(&self) -> TableCatalog {
164        let mut catalog_builder = TableCatalogBuilder::default();
165        let dist_keys = self.distribution().dist_column_indices().to_vec();
166
167        // Add all columns
168        self.schema().fields().iter().for_each(|field| {
169            catalog_builder.add_column(field);
170        });
171
172        // Add table PK columns
173        let mut pk_indices = self
174            .input
175            .stream_key()
176            .expect("Expected stream key")
177            .to_vec();
178        let clean_wtmk_in_pk = if let Some(idx) = self.state_clean_col_idx {
179            if let Some(pos) = pk_indices.iter().position(|&x| x == idx) {
180                Some(pos)
181            } else {
182                pk_indices.push(idx);
183                Some(pk_indices.len() - 1)
184            }
185        } else {
186            None
187        };
188
189        pk_indices.iter().for_each(|idx| {
190            catalog_builder.add_order_column(*idx, OrderType::ascending());
191        });
192
193        let read_prefix_len_hint = pk_indices.len();
194        let mut catalog = catalog_builder.build(dist_keys, read_prefix_len_hint);
195
196        if let Some(idx) = clean_wtmk_in_pk {
197            catalog.clean_watermark_index_in_pk = Some(idx);
198            catalog.cleaned_by_watermark = true;
199        }
200
201        catalog
202    }
203}
204
205impl PlanTreeNodeUnary for StreamMaterializedExprs {
206    fn input(&self) -> PlanRef {
207        self.input.clone()
208    }
209
210    fn clone_with_input(&self, input: PlanRef) -> Self {
211        Self::new(input, self.exprs.clone(), self.field_names.clone())
212    }
213}
214impl_plan_tree_node_for_unary! { StreamMaterializedExprs }
215
216impl StreamNode for StreamMaterializedExprs {
217    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
218        PbNodeBody::MaterializedExprs(Box::new(MaterializedExprsNode {
219            exprs: self.exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
220            state_table: Some(
221                self.build_state_table()
222                    .with_id(state.gen_table_id_wrapped())
223                    .to_internal_table_prost(),
224            ),
225            state_clean_col_idx: self.state_clean_col_idx.map(|idx| idx as u32),
226        }))
227    }
228}
229
230impl ExprRewritable for StreamMaterializedExprs {
231    fn has_rewritable_expr(&self) -> bool {
232        true
233    }
234
235    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
236        let new_exprs = self
237            .exprs
238            .iter()
239            .map(|e| r.rewrite_expr(e.clone()))
240            .collect();
241        Self::new(self.input.clone(), new_exprs, self.field_names.clone()).into()
242    }
243}
244
245impl ExprVisitable for StreamMaterializedExprs {
246    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
247        self.exprs.iter().for_each(|e| v.visit_expr(e));
248    }
249}