risingwave_frontend/optimizer/plan_node/generic/
project.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17
18use fixedbitset::FixedBitSet;
19use pretty_xmlish::{Pretty, StrAssocArr};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use super::{GenericPlanNode, GenericPlanRef};
24use crate::expr::{
25 Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
26 assert_input_ref,
27};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::FunctionalDependencySet;
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
31
32fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
33 if expr.has_subquery() {
34 return Err("subquery");
35 }
36 if expr.has_agg_call() {
37 return Err("aggregate function");
38 }
39 if expr.has_table_function() {
40 return Err("table function");
41 }
42 if expr.has_window_function() {
43 return Err("window function");
44 }
45 Ok(())
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50#[allow(clippy::manual_non_exhaustive)]
51pub struct Project<PlanRef> {
52 pub exprs: Vec<ExprImpl>,
53 pub field_names: BTreeMap<usize, String>,
55 pub input: PlanRef,
56 _private: (),
58}
59
60impl<PlanRef> Project<PlanRef> {
61 pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> Project<OtherPlanRef> {
62 Project {
63 exprs: self.exprs.clone(),
64 field_names: self.field_names.clone(),
65 input,
66 _private: (),
67 }
68 }
69
70 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
71 self.exprs = self
72 .exprs
73 .iter()
74 .map(|e| r.rewrite_expr(e.clone()))
75 .collect();
76 }
77
78 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
79 self.exprs.iter().for_each(|e| v.visit_expr(e));
80 }
81}
82
83impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
84 fn schema(&self) -> Schema {
85 let o2i = self.o2i_col_mapping();
86 let exprs = &self.exprs;
87 let input_schema = self.input.schema();
88 let ctx = self.ctx();
89 let fields = exprs
90 .iter()
91 .enumerate()
92 .map(|(i, expr)| {
93 let name = match o2i.try_map(i) {
95 Some(input_idx) => {
96 if let Some(name) = self.field_names.get(&i) {
97 name.clone()
98 } else {
99 input_schema.fields()[input_idx].name.clone()
100 }
101 }
102 None => match expr {
103 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {
104 format!("{:?}", ExprDisplay { expr, input_schema })
105 }
106 _ => {
107 if let Some(name) = self.field_names.get(&i) {
108 name.clone()
109 } else {
110 format!("$expr{}", ctx.next_expr_display_id())
111 }
112 }
113 },
114 };
115 Field::with_name(expr.return_type(), name)
116 })
117 .collect();
118 Schema { fields }
119 }
120
121 fn stream_key(&self) -> Option<Vec<usize>> {
122 let i2o = self.i2o_col_mapping();
123 self.input
124 .stream_key()?
125 .iter()
126 .map(|pk_col| i2o.try_map(*pk_col))
127 .collect::<Option<Vec<_>>>()
128 }
129
130 fn ctx(&self) -> OptimizerContextRef {
131 self.input.ctx()
132 }
133
134 fn functional_dependency(&self) -> FunctionalDependencySet {
135 let i2o = self.i2o_col_mapping();
136 i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
137 }
138}
139
140impl<PlanRef: GenericPlanRef> Project<PlanRef> {
141 pub fn new(exprs: Vec<ExprImpl>, input: PlanRef) -> Self {
142 for expr in &exprs {
143 assert_input_ref!(expr, input.schema().fields().len());
144 check_expr_type(expr)
145 .map_err(|expr| format!("{expr} should not in Project operator"))
146 .unwrap();
147 }
148 Project {
149 exprs,
150 field_names: Default::default(),
151 input,
152 _private: (),
153 }
154 }
155
156 pub fn with_mapping(input: PlanRef, mapping: ColIndexMapping) -> Self {
163 if mapping.target_size() == 0 {
164 return Self::new(vec![], input);
167 };
168 let mut input_refs = vec![None; mapping.target_size()];
169 for (src, tar) in mapping.mapping_pairs() {
170 assert_eq!(input_refs[tar], None);
171 input_refs[tar] = Some(src);
172 }
173 let input_schema = input.schema();
174 let exprs: Vec<ExprImpl> = input_refs
175 .into_iter()
176 .map(|i| i.unwrap())
177 .map(|i| InputRef::new(i, input_schema.fields()[i].data_type()).into())
178 .collect();
179
180 Self::new(exprs, input)
181 }
182
183 pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self {
185 Self::with_out_col_idx(input, out_fields.ones())
186 }
187
188 pub fn with_out_col_idx(input: PlanRef, out_fields: impl Iterator<Item = usize>) -> Self {
190 let input_schema = input.schema();
191 let exprs = out_fields
192 .map(|index| InputRef::new(index, input_schema[index].data_type()).into())
193 .collect();
194 Self::new(exprs, input)
195 }
196
197 pub fn with_vnode_col(input: PlanRef, dist_key: &[usize]) -> Self {
199 let input_fields = input.schema().fields();
200 let mut new_exprs: Vec<_> = input_fields
201 .iter()
202 .enumerate()
203 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
204 .collect();
205 new_exprs.push(
206 FunctionCall::new(
207 ExprType::Vnode,
208 dist_key
209 .iter()
210 .map(|idx| InputRef::new(*idx, input_fields[*idx].data_type()).into())
211 .collect(),
212 )
213 .expect("Vnode function call should be valid here")
214 .into(),
215 );
216 let vnode_expr_idx = new_exprs.len() - 1;
217
218 let mut new = Self::new(new_exprs, input);
219 new.field_names.insert(vnode_expr_idx, "_vnode".to_owned());
220 new
221 }
222
223 pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
224 (self.exprs, self.input)
225 }
226
227 pub fn fields_pretty<'a>(&self, schema: &Schema) -> StrAssocArr<'a> {
228 let f = |t| Pretty::debug(&t);
229 let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
230 vec![("exprs", e)]
231 }
232
233 fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
234 self.exprs
235 .iter()
236 .zip_eq_fast(schema.fields().iter())
237 .map(|(expr, field)| AliasedExpr {
238 expr: ExprDisplay {
239 expr,
240 input_schema: self.input.schema(),
241 },
242 alias: {
243 match expr {
244 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
245 _ => Some(field.name.clone()),
246 }
247 },
248 })
249 .collect()
250 }
251
252 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
253 let exprs = &self.exprs;
254 let input_len = self.input.schema().len();
255 let mut map = vec![None; exprs.len()];
256 for (i, expr) in exprs.iter().enumerate() {
257 if let ExprImpl::InputRef(input) = expr {
258 map[i] = Some(input.index())
259 }
260 }
261 ColIndexMapping::new(map, input_len)
262 }
263
264 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
267 let exprs = &self.exprs;
268 let input_len = self.input.schema().len();
269 let mut map = vec![None; input_len];
270 for (i, expr) in exprs.iter().enumerate() {
271 if let ExprImpl::InputRef(input) = expr {
272 map[input.index()] = Some(i)
273 }
274 }
275 ColIndexMapping::new(map, exprs.len())
276 }
277
278 pub fn is_all_inputref(&self) -> bool {
279 self.exprs
280 .iter()
281 .all(|expr| matches!(expr, ExprImpl::InputRef(_)))
282 }
283
284 pub fn is_identity(&self) -> bool {
285 self.exprs.len() == self.input.schema().len()
286 && self
287 .exprs
288 .iter()
289 .zip_eq_fast(self.input.schema().fields())
290 .enumerate()
291 .all(|(i, (expr, field))| {
292 matches!(expr, ExprImpl::InputRef(input_ref) if **input_ref == InputRef::new(i, field.data_type()))
293 })
294 }
295
296 pub fn try_as_projection(&self) -> Option<Vec<usize>> {
297 self.exprs
298 .iter()
299 .map(|expr| match expr {
300 ExprImpl::InputRef(input_ref) => Some(input_ref.index),
301 _ => None,
302 })
303 .collect::<Option<Vec<_>>>()
304 }
305
306 pub(crate) fn likely_produces_noop_updates(&self) -> bool {
307 struct HasJsonbAccess {
308 has: bool,
309 }
310
311 impl ExprVisitor for HasJsonbAccess {
312 fn visit_function_call(&mut self, func_call: &FunctionCall) {
313 if matches!(
314 func_call.func_type(),
315 ExprType::JsonbAccess
316 | ExprType::JsonbAccessStr
317 | ExprType::JsonbExtractPath
318 | ExprType::JsonbExtractPathVariadic
319 | ExprType::JsonbExtractPathText
320 | ExprType::JsonbExtractPathTextVariadic
321 | ExprType::JsonbPathExists
322 | ExprType::JsonbPathMatch
323 | ExprType::JsonbPathQueryArray
324 | ExprType::JsonbPathQueryFirst
325 ) {
326 self.has = true;
327 }
328 }
329 }
330
331 self.exprs.iter().any(|expr| {
332 let mut visitor = HasJsonbAccess { has: false };
336 visitor.visit_expr(expr);
337 visitor.has
338 })
339 }
340}
341
342#[derive(Default)]
345pub struct ProjectBuilder {
346 exprs: Vec<ExprImpl>,
347 exprs_index: HashMap<ExprImpl, usize>,
348}
349
350impl ProjectBuilder {
351 pub fn add_expr(&mut self, expr: &ExprImpl) -> std::result::Result<usize, &'static str> {
354 check_expr_type(expr)?;
355 if let Some(idx) = self.exprs_index.get(expr) {
356 Ok(*idx)
357 } else {
358 let index = self.exprs.len();
359 self.exprs.push(expr.clone());
360 self.exprs_index.insert(expr.clone(), index);
361 Ok(index)
362 }
363 }
364
365 pub fn get_expr(&self, index: usize) -> Option<&ExprImpl> {
366 self.exprs.get(index)
367 }
368
369 pub fn expr_index(&self, expr: &ExprImpl) -> Option<usize> {
370 check_expr_type(expr).ok()?;
371 self.exprs_index.get(expr).copied()
372 }
373
374 pub fn build<PlanRef: GenericPlanRef>(self, input: PlanRef) -> Project<PlanRef> {
376 Project::new(self.exprs, input)
377 }
378
379 pub fn exprs_len(&self) -> usize {
380 self.exprs.len()
381 }
382}
383
384pub struct AliasedExpr<'a> {
386 pub expr: ExprDisplay<'a>,
387 pub alias: Option<String>,
388}
389
390impl fmt::Debug for AliasedExpr<'_> {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 match &self.alias {
393 Some(alias) => write!(f, "{:?} as {}", self.expr, alias),
394 None => write!(f, "{:?}", self.expr),
395 }
396 }
397}