risingwave_frontend/optimizer/plan_node/
stream_materialized_exprs.rs1use std::collections::BTreeMap;
16
17use pretty_xmlish::Pretty;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_pb::stream_plan::MaterializedExprsNode;
22use risingwave_pb::stream_plan::stream_node::PbNodeBody;
23
24use super::expr_visitable::ExprVisitable;
25use super::generic::{AliasedExpr, GenericPlanRef, PhysicalPlanRef};
26use super::stream::StreamPlanNodeMetadata;
27use super::utils::{Distill, TableCatalogBuilder, childless_record, watermark_pretty};
28use super::{
29 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::catalog::TableCatalog;
32use crate::error::Result;
33use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor, collect_input_refs};
34use crate::optimizer::property::reject_upsert_input;
35use crate::stream_fragmenter::BuildFragmentGraphState;
36use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamMaterializedExprs {
44 pub base: PlanBase<Stream>,
45 input: PlanRef,
46 exprs: Vec<ExprImpl>,
47 field_names: BTreeMap<usize, String>,
49 state_clean_col_idx: Option<usize>,
50}
51
52impl Distill for StreamMaterializedExprs {
53 fn distill<'a>(&self) -> pretty_xmlish::XmlNode<'a> {
54 let verbose = self.base.ctx().is_explain_verbose();
55
56 let schema = self.schema();
57 let mut vec = vec![{
58 let f = |t| Pretty::debug(&t);
59 let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
60 ("exprs", e)
61 }];
62 if let Some(display_output_watermarks) = watermark_pretty(self.watermark_columns(), schema)
63 {
64 vec.push(("output_watermarks", display_output_watermarks));
65 }
66 if let Some(idx) = self.state_clean_col_idx
67 && verbose
68 {
69 vec.push(("state_clean_col_idx", Pretty::display(&idx)));
70 }
71
72 childless_record("StreamMaterializedExprs", vec)
73 }
74}
75
76impl StreamMaterializedExprs {
77 pub fn new(
79 input: PlanRef,
80 exprs: Vec<ExprImpl>,
81 field_names: BTreeMap<usize, String>,
82 ) -> Result<Self> {
83 let input_watermark_cols = input.watermark_columns();
84
85 let state_clean_col_idx = input_watermark_cols.iter().next().map(|(i, _)| i);
87
88 let input_len = input.schema().len();
90 let output_len = input_len + exprs.len();
91
92 let mapping = ColIndexMapping::identity_or_none(input_len, output_len);
94 let mut fd_set =
95 mapping.rewrite_functional_dependency_set(input.functional_dependency().clone());
96
97 for (i, expr) in exprs.iter().enumerate() {
99 let output_idx = input_len + i;
100
101 let input_refs = collect_input_refs(input_len, std::iter::once(expr));
103 let input_indices: Vec<_> = input_refs.ones().collect();
104
105 if !input_indices.is_empty() {
106 fd_set.add_functional_dependency_by_column_indices(&input_indices, &[output_idx]);
107 }
108 }
109
110 let base = PlanBase::new_stream(
111 input.ctx(),
112 Self::derive_schema(&input, &exprs, &field_names),
113 input.stream_key().map(|v| v.to_vec()),
114 fd_set,
115 input.distribution().clone(),
116 reject_upsert_input!(input),
119 input.emit_on_window_close(),
120 input.watermark_columns().clone(),
121 input.columns_monotonicity().clone(),
122 );
123
124 Ok(Self {
125 base,
126 input,
127 exprs,
128 field_names,
129 state_clean_col_idx,
130 })
131 }
132
133 fn derive_schema(
134 input: &PlanRef,
135 exprs: &[ExprImpl],
136 field_names: &BTreeMap<usize, String>,
137 ) -> Schema {
138 let ctx = input.ctx();
139 let input_schema = input.schema();
140 let mut all_fields = input_schema.fields.clone();
141 all_fields.reserve(exprs.len());
142
143 for (i, expr) in exprs.iter().enumerate() {
144 let field_name = match field_names.get(&i) {
145 Some(name) => name.clone(),
146 None => format!("$expr{}", ctx.next_expr_display_id()),
147 };
148 let field = Field::with_name(expr.return_type(), field_name);
149 all_fields.push(field);
150 }
151
152 Schema::new(all_fields)
153 }
154
155 fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
156 let input_schema_len = self.input.schema().len();
157
158 self.exprs
159 .iter()
160 .zip_eq_fast(schema.fields().iter().skip(input_schema_len))
161 .map(|(expr, field)| AliasedExpr {
162 expr: ExprDisplay {
163 expr,
164 input_schema: self.input.schema(),
165 },
166 alias: Some(field.name.clone()),
167 })
168 .collect()
169 }
170
171 fn build_state_table(&self) -> TableCatalog {
173 let mut catalog_builder = TableCatalogBuilder::default();
174 let dist_keys = self.distribution().dist_column_indices().to_vec();
175
176 self.schema().fields().iter().for_each(|field| {
178 catalog_builder.add_column(field);
179 });
180
181 let mut pk_indices = self
183 .input
184 .stream_key()
185 .expect("Expected stream key")
186 .to_vec();
187 let clean_wtmk_in_pk = if let Some(idx) = self.state_clean_col_idx {
188 if let Some(pos) = pk_indices.iter().position(|&x| x == idx) {
189 Some(pos)
190 } else {
191 pk_indices.push(idx);
192 Some(pk_indices.len() - 1)
193 }
194 } else {
195 None
196 };
197
198 pk_indices.iter().for_each(|idx| {
199 catalog_builder.add_order_column(*idx, OrderType::ascending());
200 });
201
202 let read_prefix_len_hint = pk_indices.len();
203 let mut catalog = catalog_builder.build(dist_keys, read_prefix_len_hint);
204
205 if let Some(idx) = clean_wtmk_in_pk {
206 catalog.clean_watermark_index_in_pk = Some(idx);
207 }
208
209 if let Some(col_idx) = self.state_clean_col_idx {
211 catalog.clean_watermark_indices = vec![col_idx];
212 }
213
214 catalog
215 }
216}
217
218impl PlanTreeNodeUnary<Stream> for StreamMaterializedExprs {
219 fn input(&self) -> PlanRef {
220 self.input.clone()
221 }
222
223 fn clone_with_input(&self, input: PlanRef) -> Self {
224 Self::new(input, self.exprs.clone(), self.field_names.clone()).unwrap()
225 }
226}
227impl_plan_tree_node_for_unary! { Stream, StreamMaterializedExprs }
228
229impl StreamNode for StreamMaterializedExprs {
230 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
231 PbNodeBody::MaterializedExprs(Box::new(MaterializedExprsNode {
234 exprs: self.exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
235 state_table: Some(
236 self.build_state_table()
237 .with_id(state.gen_table_id_wrapped())
238 .to_internal_table_prost(),
239 ),
240 state_clean_col_idx: self.state_clean_col_idx.map(|idx| idx as u32),
241 }))
242 }
243}
244
245impl ExprRewritable<Stream> for StreamMaterializedExprs {
246 fn has_rewritable_expr(&self) -> bool {
247 true
248 }
249
250 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
251 let new_exprs = self
252 .exprs
253 .iter()
254 .map(|e| r.rewrite_expr(e.clone()))
255 .collect();
256 Self::new(self.input.clone(), new_exprs, self.field_names.clone())
257 .unwrap()
258 .into()
259 }
260}
261
262impl ExprVisitable for StreamMaterializedExprs {
263 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
264 self.exprs.iter().for_each(|e| v.visit_expr(e));
265 }
266}