risingwave_frontend/optimizer/plan_node/
stream_materialized_exprs.rs1use std::collections::BTreeMap;
16
17use pretty_xmlish::Pretty;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_pb::stream_plan::MaterializedExprsNode;
22use risingwave_pb::stream_plan::stream_node::PbNodeBody;
23
24use super::expr_visitable::ExprVisitable;
25use super::generic::{AliasedExpr, GenericPlanRef, PhysicalPlanRef};
26use super::stream::StreamPlanNodeMetadata;
27use super::utils::{Distill, TableCatalogBuilder, childless_record, watermark_pretty};
28use super::{
29 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::catalog::TableCatalog;
32use crate::error::Result;
33use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor, collect_input_refs};
34use crate::optimizer::property::reject_upsert_input;
35use crate::stream_fragmenter::BuildFragmentGraphState;
36use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamMaterializedExprs {
44 pub base: PlanBase<Stream>,
45 input: PlanRef,
46 exprs: Vec<ExprImpl>,
47 field_names: BTreeMap<usize, String>,
49 state_clean_col_idx: Option<usize>,
50}
51
52impl Distill for StreamMaterializedExprs {
53 fn distill<'a>(&self) -> pretty_xmlish::XmlNode<'a> {
54 let verbose = self.base.ctx().is_explain_verbose();
55
56 let schema = self.schema();
57 let mut vec = vec![{
58 let f = |t| Pretty::debug(&t);
59 let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
60 ("exprs", e)
61 }];
62 if let Some(display_output_watermarks) = watermark_pretty(self.watermark_columns(), schema)
63 {
64 vec.push(("output_watermarks", display_output_watermarks));
65 }
66 if verbose && self.state_clean_col_idx.is_some() {
67 vec.push((
68 "state_clean_col_idx",
69 Pretty::display(&self.state_clean_col_idx.unwrap()),
70 ));
71 }
72
73 childless_record("StreamMaterializedExprs", vec)
74 }
75}
76
77impl StreamMaterializedExprs {
78 pub fn new(
80 input: PlanRef,
81 exprs: Vec<ExprImpl>,
82 field_names: BTreeMap<usize, String>,
83 ) -> Result<Self> {
84 let input_watermark_cols = input.watermark_columns();
85
86 let state_clean_col_idx = input_watermark_cols.iter().next().map(|(i, _)| i);
88
89 let input_len = input.schema().len();
91 let output_len = input_len + exprs.len();
92
93 let mapping = ColIndexMapping::identity_or_none(input_len, output_len);
95 let mut fd_set =
96 mapping.rewrite_functional_dependency_set(input.functional_dependency().clone());
97
98 for (i, expr) in exprs.iter().enumerate() {
100 let output_idx = input_len + i;
101
102 let input_refs = collect_input_refs(input_len, std::iter::once(expr));
104 let input_indices: Vec<_> = input_refs.ones().collect();
105
106 if !input_indices.is_empty() {
107 fd_set.add_functional_dependency_by_column_indices(&input_indices, &[output_idx]);
108 }
109 }
110
111 let base = PlanBase::new_stream(
112 input.ctx(),
113 Self::derive_schema(&input, &exprs, &field_names),
114 input.stream_key().map(|v| v.to_vec()),
115 fd_set,
116 input.distribution().clone(),
117 reject_upsert_input!(input),
120 input.emit_on_window_close(),
121 input.watermark_columns().clone(),
122 input.columns_monotonicity().clone(),
123 );
124
125 Ok(Self {
126 base,
127 input,
128 exprs,
129 field_names,
130 state_clean_col_idx,
131 })
132 }
133
134 fn derive_schema(
135 input: &PlanRef,
136 exprs: &[ExprImpl],
137 field_names: &BTreeMap<usize, String>,
138 ) -> Schema {
139 let ctx = input.ctx();
140 let input_schema = input.schema();
141 let mut all_fields = input_schema.fields.clone();
142 all_fields.reserve(exprs.len());
143
144 for (i, expr) in exprs.iter().enumerate() {
145 let field_name = match field_names.get(&i) {
146 Some(name) => name.clone(),
147 None => format!("$expr{}", ctx.next_expr_display_id()),
148 };
149 let field = Field::with_name(expr.return_type(), field_name);
150 all_fields.push(field);
151 }
152
153 Schema::new(all_fields)
154 }
155
156 fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
157 let input_schema_len = self.input.schema().len();
158
159 self.exprs
160 .iter()
161 .zip_eq_fast(schema.fields().iter().skip(input_schema_len))
162 .map(|(expr, field)| AliasedExpr {
163 expr: ExprDisplay {
164 expr,
165 input_schema: self.input.schema(),
166 },
167 alias: Some(field.name.clone()),
168 })
169 .collect()
170 }
171
172 fn build_state_table(&self) -> TableCatalog {
174 let mut catalog_builder = TableCatalogBuilder::default();
175 let dist_keys = self.distribution().dist_column_indices().to_vec();
176
177 self.schema().fields().iter().for_each(|field| {
179 catalog_builder.add_column(field);
180 });
181
182 let mut pk_indices = self
184 .input
185 .stream_key()
186 .expect("Expected stream key")
187 .to_vec();
188 let clean_wtmk_in_pk = if let Some(idx) = self.state_clean_col_idx {
189 if let Some(pos) = pk_indices.iter().position(|&x| x == idx) {
190 Some(pos)
191 } else {
192 pk_indices.push(idx);
193 Some(pk_indices.len() - 1)
194 }
195 } else {
196 None
197 };
198
199 pk_indices.iter().for_each(|idx| {
200 catalog_builder.add_order_column(*idx, OrderType::ascending());
201 });
202
203 let read_prefix_len_hint = pk_indices.len();
204 let mut catalog = catalog_builder.build(dist_keys, read_prefix_len_hint);
205
206 if let Some(idx) = clean_wtmk_in_pk {
207 catalog.clean_watermark_index_in_pk = Some(idx);
208 catalog.cleaned_by_watermark = true;
209 }
210
211 catalog
212 }
213}
214
215impl PlanTreeNodeUnary<Stream> for StreamMaterializedExprs {
216 fn input(&self) -> PlanRef {
217 self.input.clone()
218 }
219
220 fn clone_with_input(&self, input: PlanRef) -> Self {
221 Self::new(input, self.exprs.clone(), self.field_names.clone()).unwrap()
222 }
223}
224impl_plan_tree_node_for_unary! { Stream, StreamMaterializedExprs }
225
226impl StreamNode for StreamMaterializedExprs {
227 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
228 PbNodeBody::MaterializedExprs(Box::new(MaterializedExprsNode {
229 exprs: self.exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
230 state_table: Some(
231 self.build_state_table()
232 .with_id(state.gen_table_id_wrapped())
233 .to_internal_table_prost(),
234 ),
235 state_clean_col_idx: self.state_clean_col_idx.map(|idx| idx as u32),
236 }))
237 }
238}
239
240impl ExprRewritable<Stream> for StreamMaterializedExprs {
241 fn has_rewritable_expr(&self) -> bool {
242 true
243 }
244
245 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
246 let new_exprs = self
247 .exprs
248 .iter()
249 .map(|e| r.rewrite_expr(e.clone()))
250 .collect();
251 Self::new(self.input.clone(), new_exprs, self.field_names.clone())
252 .unwrap()
253 .into()
254 }
255}
256
257impl ExprVisitable for StreamMaterializedExprs {
258 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
259 self.exprs.iter().for_each(|e| v.visit_expr(e));
260 }
261}