risingwave_frontend/optimizer/plan_node/
stream_materialized_exprs.rs1use std::collections::BTreeMap;
16
17use pretty_xmlish::Pretty;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_pb::stream_plan::MaterializedExprsNode;
22use risingwave_pb::stream_plan::stream_node::PbNodeBody;
23
24use super::expr_visitable::ExprVisitable;
25use super::generic::{AliasedExpr, GenericPlanRef, PhysicalPlanRef};
26use super::stream::StreamPlanRef;
27use super::utils::{Distill, TableCatalogBuilder, childless_record, watermark_pretty};
28use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, StreamNode};
29use crate::catalog::TableCatalog;
30use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor, collect_input_refs};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamMaterializedExprs {
40 pub base: PlanBase<Stream>,
41 input: PlanRef,
42 exprs: Vec<ExprImpl>,
43 field_names: BTreeMap<usize, String>,
45 state_clean_col_idx: Option<usize>,
46}
47
48impl Distill for StreamMaterializedExprs {
49 fn distill<'a>(&self) -> pretty_xmlish::XmlNode<'a> {
50 let verbose = self.base.ctx().is_explain_verbose();
51
52 let schema = self.schema();
53 let mut vec = vec![{
54 let f = |t| Pretty::debug(&t);
55 let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
56 ("exprs", e)
57 }];
58 if let Some(display_output_watermarks) = watermark_pretty(self.watermark_columns(), schema)
59 {
60 vec.push(("output_watermarks", display_output_watermarks));
61 }
62 if verbose && self.state_clean_col_idx.is_some() {
63 vec.push((
64 "state_clean_col_idx",
65 Pretty::display(&self.state_clean_col_idx.unwrap()),
66 ));
67 }
68
69 childless_record("StreamMaterializedExprs", vec)
70 }
71}
72
73impl StreamMaterializedExprs {
74 pub fn new(input: PlanRef, exprs: Vec<ExprImpl>, field_names: BTreeMap<usize, String>) -> Self {
76 let input_watermark_cols = input.watermark_columns();
77
78 let state_clean_col_idx = input_watermark_cols.iter().next().map(|(i, _)| i);
80
81 let input_len = input.schema().len();
83 let output_len = input_len + exprs.len();
84
85 let mapping = ColIndexMapping::identity_or_none(input_len, output_len);
87 let mut fd_set =
88 mapping.rewrite_functional_dependency_set(input.functional_dependency().clone());
89
90 for (i, expr) in exprs.iter().enumerate() {
92 let output_idx = input_len + i;
93
94 let input_refs = collect_input_refs(input_len, std::iter::once(expr));
96 let input_indices: Vec<_> = input_refs.ones().collect();
97
98 if !input_indices.is_empty() {
99 fd_set.add_functional_dependency_by_column_indices(&input_indices, &[output_idx]);
100 }
101 }
102
103 let base = PlanBase::new_stream(
104 input.ctx(),
105 Self::derive_schema(&input, &exprs, &field_names),
106 input.stream_key().map(|v| v.to_vec()),
107 fd_set,
108 input.distribution().clone(),
109 input.append_only(),
110 input.emit_on_window_close(),
111 input.watermark_columns().clone(),
112 input.columns_monotonicity().clone(),
113 );
114
115 Self {
116 base,
117 input,
118 exprs,
119 field_names,
120 state_clean_col_idx,
121 }
122 }
123
124 fn derive_schema(
125 input: &PlanRef,
126 exprs: &[ExprImpl],
127 field_names: &BTreeMap<usize, String>,
128 ) -> Schema {
129 let ctx = input.ctx();
130 let input_schema = input.schema();
131 let mut all_fields = input_schema.fields.clone();
132 all_fields.reserve(exprs.len());
133
134 for (i, expr) in exprs.iter().enumerate() {
135 let field_name = match field_names.get(&i) {
136 Some(name) => name.clone(),
137 None => format!("$expr{}", ctx.next_expr_display_id()),
138 };
139 let field = Field::with_name(expr.return_type(), field_name);
140 all_fields.push(field);
141 }
142
143 Schema::new(all_fields)
144 }
145
146 fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
147 let input_schema_len = self.input.schema().len();
148
149 self.exprs
150 .iter()
151 .zip_eq_fast(schema.fields().iter().skip(input_schema_len))
152 .map(|(expr, field)| AliasedExpr {
153 expr: ExprDisplay {
154 expr,
155 input_schema: self.input.schema(),
156 },
157 alias: Some(field.name.clone()),
158 })
159 .collect()
160 }
161
162 fn build_state_table(&self) -> TableCatalog {
164 let mut catalog_builder = TableCatalogBuilder::default();
165 let dist_keys = self.distribution().dist_column_indices().to_vec();
166
167 self.schema().fields().iter().for_each(|field| {
169 catalog_builder.add_column(field);
170 });
171
172 let mut pk_indices = self
174 .input
175 .stream_key()
176 .expect("Expected stream key")
177 .to_vec();
178 let clean_wtmk_in_pk = if let Some(idx) = self.state_clean_col_idx {
179 if let Some(pos) = pk_indices.iter().position(|&x| x == idx) {
180 Some(pos)
181 } else {
182 pk_indices.push(idx);
183 Some(pk_indices.len() - 1)
184 }
185 } else {
186 None
187 };
188
189 pk_indices.iter().for_each(|idx| {
190 catalog_builder.add_order_column(*idx, OrderType::ascending());
191 });
192
193 let read_prefix_len_hint = pk_indices.len();
194 let mut catalog = catalog_builder.build(dist_keys, read_prefix_len_hint);
195
196 if let Some(idx) = clean_wtmk_in_pk {
197 catalog.clean_watermark_index_in_pk = Some(idx);
198 catalog.cleaned_by_watermark = true;
199 }
200
201 catalog
202 }
203}
204
205impl PlanTreeNodeUnary for StreamMaterializedExprs {
206 fn input(&self) -> PlanRef {
207 self.input.clone()
208 }
209
210 fn clone_with_input(&self, input: PlanRef) -> Self {
211 Self::new(input, self.exprs.clone(), self.field_names.clone())
212 }
213}
214impl_plan_tree_node_for_unary! { StreamMaterializedExprs }
215
216impl StreamNode for StreamMaterializedExprs {
217 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
218 PbNodeBody::MaterializedExprs(Box::new(MaterializedExprsNode {
219 exprs: self.exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
220 state_table: Some(
221 self.build_state_table()
222 .with_id(state.gen_table_id_wrapped())
223 .to_internal_table_prost(),
224 ),
225 state_clean_col_idx: self.state_clean_col_idx.map(|idx| idx as u32),
226 }))
227 }
228}
229
230impl ExprRewritable for StreamMaterializedExprs {
231 fn has_rewritable_expr(&self) -> bool {
232 true
233 }
234
235 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
236 let new_exprs = self
237 .exprs
238 .iter()
239 .map(|e| r.rewrite_expr(e.clone()))
240 .collect();
241 Self::new(self.input.clone(), new_exprs, self.field_names.clone()).into()
242 }
243}
244
245impl ExprVisitable for StreamMaterializedExprs {
246 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
247 self.exprs.iter().for_each(|e| v.visit_expr(e));
248 }
249}