risingwave_frontend/optimizer/plan_node/generic/
project.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17
18use fixedbitset::FixedBitSet;
19use pretty_xmlish::{Pretty, StrAssocArr};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use super::{GenericPlanNode, GenericPlanRef};
24use crate::expr::{
25 Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
26 assert_input_ref,
27};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::FunctionalDependencySet;
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
31
32fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
33 if expr.has_subquery() {
34 return Err("subquery");
35 }
36 if expr.has_agg_call() {
37 return Err("aggregate function");
38 }
39 if expr.has_table_function() {
40 return Err("table function");
41 }
42 if expr.has_window_function() {
43 return Err("window function");
44 }
45 Ok(())
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50#[allow(clippy::manual_non_exhaustive)]
51pub struct Project<PlanRef> {
52 pub exprs: Vec<ExprImpl>,
53 pub field_names: BTreeMap<usize, String>,
55 pub input: PlanRef,
56 _private: (),
58}
59
60impl<PlanRef> Project<PlanRef> {
61 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
62 self.exprs = self
63 .exprs
64 .iter()
65 .map(|e| r.rewrite_expr(e.clone()))
66 .collect();
67 }
68
69 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
70 self.exprs.iter().for_each(|e| v.visit_expr(e));
71 }
72}
73
74impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
75 fn schema(&self) -> Schema {
76 let o2i = self.o2i_col_mapping();
77 let exprs = &self.exprs;
78 let input_schema = self.input.schema();
79 let ctx = self.ctx();
80 let fields = exprs
81 .iter()
82 .enumerate()
83 .map(|(i, expr)| {
84 let name = match o2i.try_map(i) {
86 Some(input_idx) => {
87 if let Some(name) = self.field_names.get(&i) {
88 name.clone()
89 } else {
90 input_schema.fields()[input_idx].name.clone()
91 }
92 }
93 None => match expr {
94 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {
95 format!("{:?}", ExprDisplay { expr, input_schema })
96 }
97 _ => {
98 if let Some(name) = self.field_names.get(&i) {
99 name.clone()
100 } else {
101 format!("$expr{}", ctx.next_expr_display_id())
102 }
103 }
104 },
105 };
106 Field::with_name(expr.return_type(), name)
107 })
108 .collect();
109 Schema { fields }
110 }
111
112 fn stream_key(&self) -> Option<Vec<usize>> {
113 let i2o = self.i2o_col_mapping();
114 self.input
115 .stream_key()?
116 .iter()
117 .map(|pk_col| i2o.try_map(*pk_col))
118 .collect::<Option<Vec<_>>>()
119 }
120
121 fn ctx(&self) -> OptimizerContextRef {
122 self.input.ctx()
123 }
124
125 fn functional_dependency(&self) -> FunctionalDependencySet {
126 let i2o = self.i2o_col_mapping();
127 i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
128 }
129}
130
131impl<PlanRef: GenericPlanRef> Project<PlanRef> {
132 pub fn new(exprs: Vec<ExprImpl>, input: PlanRef) -> Self {
133 for expr in &exprs {
134 assert_input_ref!(expr, input.schema().fields().len());
135 check_expr_type(expr)
136 .map_err(|expr| format!("{expr} should not in Project operator"))
137 .unwrap();
138 }
139 Project {
140 exprs,
141 field_names: Default::default(),
142 input,
143 _private: (),
144 }
145 }
146
147 pub fn with_mapping(input: PlanRef, mapping: ColIndexMapping) -> Self {
154 if mapping.target_size() == 0 {
155 return Self::new(vec![], input);
158 };
159 let mut input_refs = vec![None; mapping.target_size()];
160 for (src, tar) in mapping.mapping_pairs() {
161 assert_eq!(input_refs[tar], None);
162 input_refs[tar] = Some(src);
163 }
164 let input_schema = input.schema();
165 let exprs: Vec<ExprImpl> = input_refs
166 .into_iter()
167 .map(|i| i.unwrap())
168 .map(|i| InputRef::new(i, input_schema.fields()[i].data_type()).into())
169 .collect();
170
171 Self::new(exprs, input)
172 }
173
174 pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self {
176 Self::with_out_col_idx(input, out_fields.ones())
177 }
178
179 pub fn with_out_col_idx(input: PlanRef, out_fields: impl Iterator<Item = usize>) -> Self {
181 let input_schema = input.schema();
182 let exprs = out_fields
183 .map(|index| InputRef::new(index, input_schema[index].data_type()).into())
184 .collect();
185 Self::new(exprs, input)
186 }
187
188 pub fn with_vnode_col(input: PlanRef, dist_key: &[usize]) -> Self {
190 let input_fields = input.schema().fields();
191 let mut new_exprs: Vec<_> = input_fields
192 .iter()
193 .enumerate()
194 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
195 .collect();
196 new_exprs.push(
197 FunctionCall::new(
198 ExprType::Vnode,
199 dist_key
200 .iter()
201 .map(|idx| InputRef::new(*idx, input_fields[*idx].data_type()).into())
202 .collect(),
203 )
204 .expect("Vnode function call should be valid here")
205 .into(),
206 );
207 let vnode_expr_idx = new_exprs.len() - 1;
208
209 let mut new = Self::new(new_exprs, input);
210 new.field_names.insert(vnode_expr_idx, "_vnode".to_owned());
211 new
212 }
213
214 pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
215 (self.exprs, self.input)
216 }
217
218 pub fn fields_pretty<'a>(&self, schema: &Schema) -> StrAssocArr<'a> {
219 let f = |t| Pretty::debug(&t);
220 let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
221 vec![("exprs", e)]
222 }
223
224 fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
225 self.exprs
226 .iter()
227 .zip_eq_fast(schema.fields().iter())
228 .map(|(expr, field)| AliasedExpr {
229 expr: ExprDisplay {
230 expr,
231 input_schema: self.input.schema(),
232 },
233 alias: {
234 match expr {
235 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
236 _ => Some(field.name.clone()),
237 }
238 },
239 })
240 .collect()
241 }
242
243 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
244 let exprs = &self.exprs;
245 let input_len = self.input.schema().len();
246 let mut map = vec![None; exprs.len()];
247 for (i, expr) in exprs.iter().enumerate() {
248 if let ExprImpl::InputRef(input) = expr {
249 map[i] = Some(input.index())
250 }
251 }
252 ColIndexMapping::new(map, input_len)
253 }
254
255 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
258 let exprs = &self.exprs;
259 let input_len = self.input.schema().len();
260 let mut map = vec![None; input_len];
261 for (i, expr) in exprs.iter().enumerate() {
262 if let ExprImpl::InputRef(input) = expr {
263 map[input.index()] = Some(i)
264 }
265 }
266 ColIndexMapping::new(map, exprs.len())
267 }
268
269 pub fn is_all_inputref(&self) -> bool {
270 self.exprs
271 .iter()
272 .all(|expr| matches!(expr, ExprImpl::InputRef(_)))
273 }
274
275 pub fn is_identity(&self) -> bool {
276 self.exprs.len() == self.input.schema().len()
277 && self
278 .exprs
279 .iter()
280 .zip_eq_fast(self.input.schema().fields())
281 .enumerate()
282 .all(|(i, (expr, field))| {
283 matches!(expr, ExprImpl::InputRef(input_ref) if **input_ref == InputRef::new(i, field.data_type()))
284 })
285 }
286
287 pub fn try_as_projection(&self) -> Option<Vec<usize>> {
288 self.exprs
289 .iter()
290 .map(|expr| match expr {
291 ExprImpl::InputRef(input_ref) => Some(input_ref.index),
292 _ => None,
293 })
294 .collect::<Option<Vec<_>>>()
295 }
296
297 pub(crate) fn likely_produces_noop_updates(&self) -> bool {
298 struct HasJsonbAccess {
299 has: bool,
300 }
301
302 impl ExprVisitor for HasJsonbAccess {
303 fn visit_function_call(&mut self, func_call: &FunctionCall) {
304 if matches!(
305 func_call.func_type(),
306 ExprType::JsonbAccess
307 | ExprType::JsonbAccessStr
308 | ExprType::JsonbExtractPath
309 | ExprType::JsonbExtractPathVariadic
310 | ExprType::JsonbExtractPathText
311 | ExprType::JsonbExtractPathTextVariadic
312 | ExprType::JsonbPathExists
313 | ExprType::JsonbPathMatch
314 | ExprType::JsonbPathQueryArray
315 | ExprType::JsonbPathQueryFirst
316 ) {
317 self.has = true;
318 }
319 }
320 }
321
322 self.exprs.iter().any(|expr| {
323 let mut visitor = HasJsonbAccess { has: false };
327 visitor.visit_expr(expr);
328 visitor.has
329 })
330 }
331}
332
333#[derive(Default)]
336pub struct ProjectBuilder {
337 exprs: Vec<ExprImpl>,
338 exprs_index: HashMap<ExprImpl, usize>,
339}
340
341impl ProjectBuilder {
342 pub fn add_expr(&mut self, expr: &ExprImpl) -> std::result::Result<usize, &'static str> {
345 check_expr_type(expr)?;
346 if let Some(idx) = self.exprs_index.get(expr) {
347 Ok(*idx)
348 } else {
349 let index = self.exprs.len();
350 self.exprs.push(expr.clone());
351 self.exprs_index.insert(expr.clone(), index);
352 Ok(index)
353 }
354 }
355
356 pub fn get_expr(&self, index: usize) -> Option<&ExprImpl> {
357 self.exprs.get(index)
358 }
359
360 pub fn expr_index(&self, expr: &ExprImpl) -> Option<usize> {
361 check_expr_type(expr).ok()?;
362 self.exprs_index.get(expr).copied()
363 }
364
365 pub fn build<PlanRef: GenericPlanRef>(self, input: PlanRef) -> Project<PlanRef> {
367 Project::new(self.exprs, input)
368 }
369
370 pub fn exprs_len(&self) -> usize {
371 self.exprs.len()
372 }
373}
374
375pub struct AliasedExpr<'a> {
377 pub expr: ExprDisplay<'a>,
378 pub alias: Option<String>,
379}
380
381impl fmt::Debug for AliasedExpr<'_> {
382 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383 match &self.alias {
384 Some(alias) => write!(f, "{:?} as {}", self.expr, alias),
385 None => write!(f, "{:?}", self.expr),
386 }
387 }
388}