risingwave_frontend/optimizer/plan_node/generic/
project.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17
18use fixedbitset::FixedBitSet;
19use pretty_xmlish::{Pretty, StrAssocArr};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use super::{GenericPlanNode, GenericPlanRef};
24use crate::expr::{
25    Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
26    assert_input_ref,
27};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::FunctionalDependencySet;
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
31
32fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
33    if expr.has_subquery() {
34        return Err("subquery");
35    }
36    if expr.has_agg_call() {
37        return Err("aggregate function");
38    }
39    if expr.has_table_function() {
40        return Err("table function");
41    }
42    if expr.has_window_function() {
43        return Err("window function");
44    }
45    Ok(())
46}
47
48/// [`Project`] computes a set of expressions from its input relation.
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50#[allow(clippy::manual_non_exhaustive)]
51pub struct Project<PlanRef> {
52    pub exprs: Vec<ExprImpl>,
53    /// Mapping from expr index to field name. May not contain all exprs.
54    pub field_names: BTreeMap<usize, String>,
55    pub input: PlanRef,
56    // we need some check when construct the `Project::new`
57    _private: (),
58}
59
60impl<PlanRef> Project<PlanRef> {
61    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
62        self.exprs = self
63            .exprs
64            .iter()
65            .map(|e| r.rewrite_expr(e.clone()))
66            .collect();
67    }
68
69    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
70        self.exprs.iter().for_each(|e| v.visit_expr(e));
71    }
72}
73
74impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
75    fn schema(&self) -> Schema {
76        let o2i = self.o2i_col_mapping();
77        let exprs = &self.exprs;
78        let input_schema = self.input.schema();
79        let ctx = self.ctx();
80        let fields = exprs
81            .iter()
82            .enumerate()
83            .map(|(i, expr)| {
84                // Get field info from o2i.
85                let name = match o2i.try_map(i) {
86                    Some(input_idx) => {
87                        if let Some(name) = self.field_names.get(&i) {
88                            name.clone()
89                        } else {
90                            input_schema.fields()[input_idx].name.clone()
91                        }
92                    }
93                    None => match expr {
94                        ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {
95                            format!("{:?}", ExprDisplay { expr, input_schema })
96                        }
97                        _ => {
98                            if let Some(name) = self.field_names.get(&i) {
99                                name.clone()
100                            } else {
101                                format!("$expr{}", ctx.next_expr_display_id())
102                            }
103                        }
104                    },
105                };
106                Field::with_name(expr.return_type(), name)
107            })
108            .collect();
109        Schema { fields }
110    }
111
112    fn stream_key(&self) -> Option<Vec<usize>> {
113        let i2o = self.i2o_col_mapping();
114        self.input
115            .stream_key()?
116            .iter()
117            .map(|pk_col| i2o.try_map(*pk_col))
118            .collect::<Option<Vec<_>>>()
119    }
120
121    fn ctx(&self) -> OptimizerContextRef {
122        self.input.ctx()
123    }
124
125    fn functional_dependency(&self) -> FunctionalDependencySet {
126        let i2o = self.i2o_col_mapping();
127        i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
128    }
129}
130
131impl<PlanRef: GenericPlanRef> Project<PlanRef> {
132    pub fn new(exprs: Vec<ExprImpl>, input: PlanRef) -> Self {
133        for expr in &exprs {
134            assert_input_ref!(expr, input.schema().fields().len());
135            check_expr_type(expr)
136                .map_err(|expr| format!("{expr} should not in Project operator"))
137                .unwrap();
138        }
139        Project {
140            exprs,
141            field_names: Default::default(),
142            input,
143            _private: (),
144        }
145    }
146
147    /// Creates a `Project` which select some columns from the input.
148    ///
149    /// `mapping` should maps from `(0..input_fields.len())` to a consecutive range starting from 0.
150    ///
151    /// This is useful in column pruning when we want to add a project to ensure the output schema
152    /// is correct.
153    pub fn with_mapping(input: PlanRef, mapping: ColIndexMapping) -> Self {
154        if mapping.target_size() == 0 {
155            // The mapping is empty, so the parent actually doesn't need the output of the input.
156            // This can happen when the parent node only selects constant expressions.
157            return Self::new(vec![], input);
158        };
159        let mut input_refs = vec![None; mapping.target_size()];
160        for (src, tar) in mapping.mapping_pairs() {
161            assert_eq!(input_refs[tar], None);
162            input_refs[tar] = Some(src);
163        }
164        let input_schema = input.schema();
165        let exprs: Vec<ExprImpl> = input_refs
166            .into_iter()
167            .map(|i| i.unwrap())
168            .map(|i| InputRef::new(i, input_schema.fields()[i].data_type()).into())
169            .collect();
170
171        Self::new(exprs, input)
172    }
173
174    /// Creates a `Project` which select some columns from the input.
175    pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self {
176        Self::with_out_col_idx(input, out_fields.ones())
177    }
178
179    /// Creates a `Project` which select some columns from the input.
180    pub fn with_out_col_idx(input: PlanRef, out_fields: impl Iterator<Item = usize>) -> Self {
181        let input_schema = input.schema();
182        let exprs = out_fields
183            .map(|index| InputRef::new(index, input_schema[index].data_type()).into())
184            .collect();
185        Self::new(exprs, input)
186    }
187
188    /// Creates a `Project` with an additional `_vnode` column at the end of the schema.
189    pub fn with_vnode_col(input: PlanRef, dist_key: &[usize]) -> Self {
190        let input_fields = input.schema().fields();
191        let mut new_exprs: Vec<_> = input_fields
192            .iter()
193            .enumerate()
194            .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
195            .collect();
196        new_exprs.push(
197            FunctionCall::new(
198                ExprType::Vnode,
199                dist_key
200                    .iter()
201                    .map(|idx| InputRef::new(*idx, input_fields[*idx].data_type()).into())
202                    .collect(),
203            )
204            .expect("Vnode function call should be valid here")
205            .into(),
206        );
207        let vnode_expr_idx = new_exprs.len() - 1;
208
209        let mut new = Self::new(new_exprs, input);
210        new.field_names.insert(vnode_expr_idx, "_vnode".to_owned());
211        new
212    }
213
214    pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
215        (self.exprs, self.input)
216    }
217
218    pub fn fields_pretty<'a>(&self, schema: &Schema) -> StrAssocArr<'a> {
219        let f = |t| Pretty::debug(&t);
220        let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
221        vec![("exprs", e)]
222    }
223
224    fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
225        self.exprs
226            .iter()
227            .zip_eq_fast(schema.fields().iter())
228            .map(|(expr, field)| AliasedExpr {
229                expr: ExprDisplay {
230                    expr,
231                    input_schema: self.input.schema(),
232                },
233                alias: {
234                    match expr {
235                        ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
236                        _ => Some(field.name.clone()),
237                    }
238                },
239            })
240            .collect()
241    }
242
243    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
244        let exprs = &self.exprs;
245        let input_len = self.input.schema().len();
246        let mut map = vec![None; exprs.len()];
247        for (i, expr) in exprs.iter().enumerate() {
248            if let ExprImpl::InputRef(input) = expr {
249                map[i] = Some(input.index())
250            }
251        }
252        ColIndexMapping::new(map, input_len)
253    }
254
255    /// get the Mapping of columnIndex from input column index to output column index,if a input
256    /// column corresponds more than one out columns, mapping to any one
257    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
258        let exprs = &self.exprs;
259        let input_len = self.input.schema().len();
260        let mut map = vec![None; input_len];
261        for (i, expr) in exprs.iter().enumerate() {
262            if let ExprImpl::InputRef(input) = expr {
263                map[input.index()] = Some(i)
264            }
265        }
266        ColIndexMapping::new(map, exprs.len())
267    }
268
269    pub fn is_all_inputref(&self) -> bool {
270        self.exprs
271            .iter()
272            .all(|expr| matches!(expr, ExprImpl::InputRef(_)))
273    }
274
275    pub fn is_identity(&self) -> bool {
276        self.exprs.len() == self.input.schema().len()
277        && self
278            .exprs
279            .iter()
280            .zip_eq_fast(self.input.schema().fields())
281            .enumerate()
282            .all(|(i, (expr, field))| {
283                matches!(expr, ExprImpl::InputRef(input_ref) if **input_ref == InputRef::new(i, field.data_type()))
284            })
285    }
286
287    pub fn try_as_projection(&self) -> Option<Vec<usize>> {
288        self.exprs
289            .iter()
290            .map(|expr| match expr {
291                ExprImpl::InputRef(input_ref) => Some(input_ref.index),
292                _ => None,
293            })
294            .collect::<Option<Vec<_>>>()
295    }
296
297    pub(crate) fn likely_produces_noop_updates(&self) -> bool {
298        struct HasJsonbAccess {
299            has: bool,
300        }
301
302        impl ExprVisitor for HasJsonbAccess {
303            fn visit_function_call(&mut self, func_call: &FunctionCall) {
304                if matches!(
305                    func_call.func_type(),
306                    ExprType::JsonbAccess
307                        | ExprType::JsonbAccessStr
308                        | ExprType::JsonbExtractPath
309                        | ExprType::JsonbExtractPathVariadic
310                        | ExprType::JsonbExtractPathText
311                        | ExprType::JsonbExtractPathTextVariadic
312                        | ExprType::JsonbPathExists
313                        | ExprType::JsonbPathMatch
314                        | ExprType::JsonbPathQueryArray
315                        | ExprType::JsonbPathQueryFirst
316                ) {
317                    self.has = true;
318                }
319            }
320        }
321
322        self.exprs.iter().any(|expr| {
323            // When there's a jsonb access in the `Project`, it's very likely that the query is
324            // extracting some fields from a jsonb payload column. In this case, a change from the
325            // input jsonb payload may not change the output of the `Project`.
326            let mut visitor = HasJsonbAccess { has: false };
327            visitor.visit_expr(expr);
328            visitor.has
329        })
330    }
331}
332
333/// Construct a `Project` and dedup expressions.
334/// expressions
335#[derive(Default)]
336pub struct ProjectBuilder {
337    exprs: Vec<ExprImpl>,
338    exprs_index: HashMap<ExprImpl, usize>,
339}
340
341impl ProjectBuilder {
342    /// add an expression to the `LogicalProject` and return the column index of the project's
343    /// output
344    pub fn add_expr(&mut self, expr: &ExprImpl) -> std::result::Result<usize, &'static str> {
345        check_expr_type(expr)?;
346        if let Some(idx) = self.exprs_index.get(expr) {
347            Ok(*idx)
348        } else {
349            let index = self.exprs.len();
350            self.exprs.push(expr.clone());
351            self.exprs_index.insert(expr.clone(), index);
352            Ok(index)
353        }
354    }
355
356    pub fn get_expr(&self, index: usize) -> Option<&ExprImpl> {
357        self.exprs.get(index)
358    }
359
360    pub fn expr_index(&self, expr: &ExprImpl) -> Option<usize> {
361        check_expr_type(expr).ok()?;
362        self.exprs_index.get(expr).copied()
363    }
364
365    /// build the `LogicalProject` from `LogicalProjectBuilder`
366    pub fn build<PlanRef: GenericPlanRef>(self, input: PlanRef) -> Project<PlanRef> {
367        Project::new(self.exprs, input)
368    }
369
370    pub fn exprs_len(&self) -> usize {
371        self.exprs.len()
372    }
373}
374
375/// Auxiliary struct for displaying `expr AS alias`
376pub struct AliasedExpr<'a> {
377    pub expr: ExprDisplay<'a>,
378    pub alias: Option<String>,
379}
380
381impl fmt::Debug for AliasedExpr<'_> {
382    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383        match &self.alias {
384            Some(alias) => write!(f, "{:?} as {}", self.expr, alias),
385            None => write!(f, "{:?}", self.expr),
386        }
387    }
388}