risingwave_frontend/optimizer/plan_node/generic/
project.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17
18use fixedbitset::FixedBitSet;
19use pretty_xmlish::{Pretty, StrAssocArr};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use super::{GenericPlanNode, GenericPlanRef};
24use crate::expr::{
25    Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
26    assert_input_ref,
27};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::FunctionalDependencySet;
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
31
32fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
33    if expr.has_subquery() {
34        return Err("subquery");
35    }
36    if expr.has_agg_call() {
37        return Err("aggregate function");
38    }
39    if expr.has_table_function() {
40        return Err("table function");
41    }
42    if expr.has_window_function() {
43        return Err("window function");
44    }
45    Ok(())
46}
47
48/// [`Project`] computes a set of expressions from its input relation.
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50#[allow(clippy::manual_non_exhaustive)]
51pub struct Project<PlanRef> {
52    pub exprs: Vec<ExprImpl>,
53    /// Mapping from expr index to field name. May not contain all exprs.
54    pub field_names: BTreeMap<usize, String>,
55    pub input: PlanRef,
56    // we need some check when construct the `Project::new`
57    _private: (),
58}
59
60impl<PlanRef> Project<PlanRef> {
61    pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> Project<OtherPlanRef> {
62        Project {
63            exprs: self.exprs.clone(),
64            field_names: self.field_names.clone(),
65            input,
66            _private: (),
67        }
68    }
69
70    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
71        self.exprs = self
72            .exprs
73            .iter()
74            .map(|e| r.rewrite_expr(e.clone()))
75            .collect();
76    }
77
78    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
79        self.exprs.iter().for_each(|e| v.visit_expr(e));
80    }
81}
82
83impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
84    fn schema(&self) -> Schema {
85        let o2i = self.o2i_col_mapping();
86        let exprs = &self.exprs;
87        let input_schema = self.input.schema();
88        let ctx = self.ctx();
89        let fields = exprs
90            .iter()
91            .enumerate()
92            .map(|(i, expr)| {
93                // Get field info from o2i.
94                let name = match o2i.try_map(i) {
95                    Some(input_idx) => {
96                        if let Some(name) = self.field_names.get(&i) {
97                            name.clone()
98                        } else {
99                            input_schema.fields()[input_idx].name.clone()
100                        }
101                    }
102                    None => match expr {
103                        ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {
104                            format!("{:?}", ExprDisplay { expr, input_schema })
105                        }
106                        _ => {
107                            if let Some(name) = self.field_names.get(&i) {
108                                name.clone()
109                            } else {
110                                format!("$expr{}", ctx.next_expr_display_id())
111                            }
112                        }
113                    },
114                };
115                Field::with_name(expr.return_type(), name)
116            })
117            .collect();
118        Schema { fields }
119    }
120
121    fn stream_key(&self) -> Option<Vec<usize>> {
122        let i2o = self.i2o_col_mapping();
123        self.input
124            .stream_key()?
125            .iter()
126            .map(|pk_col| i2o.try_map(*pk_col))
127            .collect::<Option<Vec<_>>>()
128    }
129
130    fn ctx(&self) -> OptimizerContextRef {
131        self.input.ctx()
132    }
133
134    fn functional_dependency(&self) -> FunctionalDependencySet {
135        let i2o = self.i2o_col_mapping();
136        i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
137    }
138}
139
140impl<PlanRef: GenericPlanRef> Project<PlanRef> {
141    pub fn new(exprs: Vec<ExprImpl>, input: PlanRef) -> Self {
142        for expr in &exprs {
143            assert_input_ref!(expr, input.schema().fields().len());
144            check_expr_type(expr)
145                .map_err(|expr| format!("{expr} should not in Project operator"))
146                .unwrap();
147        }
148        Project {
149            exprs,
150            field_names: Default::default(),
151            input,
152            _private: (),
153        }
154    }
155
156    /// Creates a `Project` which select some columns from the input.
157    ///
158    /// `mapping` should maps from `(0..input_fields.len())` to a consecutive range starting from 0.
159    ///
160    /// This is useful in column pruning when we want to add a project to ensure the output schema
161    /// is correct.
162    pub fn with_mapping(input: PlanRef, mapping: ColIndexMapping) -> Self {
163        if mapping.target_size() == 0 {
164            // The mapping is empty, so the parent actually doesn't need the output of the input.
165            // This can happen when the parent node only selects constant expressions.
166            return Self::new(vec![], input);
167        };
168        let mut input_refs = vec![None; mapping.target_size()];
169        for (src, tar) in mapping.mapping_pairs() {
170            assert_eq!(input_refs[tar], None);
171            input_refs[tar] = Some(src);
172        }
173        let input_schema = input.schema();
174        let exprs: Vec<ExprImpl> = input_refs
175            .into_iter()
176            .map(|i| i.unwrap())
177            .map(|i| InputRef::new(i, input_schema.fields()[i].data_type()).into())
178            .collect();
179
180        Self::new(exprs, input)
181    }
182
183    /// Creates a `Project` which select some columns from the input.
184    pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self {
185        Self::with_out_col_idx(input, out_fields.ones())
186    }
187
188    /// Creates a `Project` which select some columns from the input.
189    pub fn with_out_col_idx(input: PlanRef, out_fields: impl Iterator<Item = usize>) -> Self {
190        let input_schema = input.schema();
191        let exprs = out_fields
192            .map(|index| InputRef::new(index, input_schema[index].data_type()).into())
193            .collect();
194        Self::new(exprs, input)
195    }
196
197    /// Creates a `Project` with an additional `_vnode` column at the end of the schema.
198    pub fn with_vnode_col(input: PlanRef, dist_key: &[usize]) -> Self {
199        let input_fields = input.schema().fields();
200        let mut new_exprs: Vec<_> = input_fields
201            .iter()
202            .enumerate()
203            .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
204            .collect();
205        new_exprs.push(
206            FunctionCall::new(
207                ExprType::Vnode,
208                dist_key
209                    .iter()
210                    .map(|idx| InputRef::new(*idx, input_fields[*idx].data_type()).into())
211                    .collect(),
212            )
213            .expect("Vnode function call should be valid here")
214            .into(),
215        );
216        let vnode_expr_idx = new_exprs.len() - 1;
217
218        let mut new = Self::new(new_exprs, input);
219        new.field_names.insert(vnode_expr_idx, "_vnode".to_owned());
220        new
221    }
222
223    pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
224        (self.exprs, self.input)
225    }
226
227    pub fn fields_pretty<'a>(&self, schema: &Schema) -> StrAssocArr<'a> {
228        let f = |t| Pretty::debug(&t);
229        let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
230        vec![("exprs", e)]
231    }
232
233    fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
234        self.exprs
235            .iter()
236            .zip_eq_fast(schema.fields().iter())
237            .map(|(expr, field)| AliasedExpr {
238                expr: ExprDisplay {
239                    expr,
240                    input_schema: self.input.schema(),
241                },
242                alias: {
243                    match expr {
244                        ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
245                        _ => Some(field.name.clone()),
246                    }
247                },
248            })
249            .collect()
250    }
251
252    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
253        let exprs = &self.exprs;
254        let input_len = self.input.schema().len();
255        let mut map = vec![None; exprs.len()];
256        for (i, expr) in exprs.iter().enumerate() {
257            if let ExprImpl::InputRef(input) = expr {
258                map[i] = Some(input.index())
259            }
260        }
261        ColIndexMapping::new(map, input_len)
262    }
263
264    /// get the Mapping of columnIndex from input column index to output column index,if a input
265    /// column corresponds more than one out columns, mapping to any one
266    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
267        let exprs = &self.exprs;
268        let input_len = self.input.schema().len();
269        let mut map = vec![None; input_len];
270        for (i, expr) in exprs.iter().enumerate() {
271            if let ExprImpl::InputRef(input) = expr {
272                map[input.index()] = Some(i)
273            }
274        }
275        ColIndexMapping::new(map, exprs.len())
276    }
277
278    pub fn is_all_inputref(&self) -> bool {
279        self.exprs
280            .iter()
281            .all(|expr| matches!(expr, ExprImpl::InputRef(_)))
282    }
283
284    pub fn is_identity(&self) -> bool {
285        self.exprs.len() == self.input.schema().len()
286        && self
287            .exprs
288            .iter()
289            .zip_eq_fast(self.input.schema().fields())
290            .enumerate()
291            .all(|(i, (expr, field))| {
292                matches!(expr, ExprImpl::InputRef(input_ref) if **input_ref == InputRef::new(i, field.data_type()))
293            })
294    }
295
296    pub fn try_as_projection(&self) -> Option<Vec<usize>> {
297        self.exprs
298            .iter()
299            .map(|expr| match expr {
300                ExprImpl::InputRef(input_ref) => Some(input_ref.index),
301                _ => None,
302            })
303            .collect::<Option<Vec<_>>>()
304    }
305
306    pub(crate) fn likely_produces_noop_updates(&self) -> bool {
307        struct HasJsonbAccess {
308            has: bool,
309        }
310
311        impl ExprVisitor for HasJsonbAccess {
312            fn visit_function_call(&mut self, func_call: &FunctionCall) {
313                if matches!(
314                    func_call.func_type(),
315                    ExprType::JsonbAccess
316                        | ExprType::JsonbAccessStr
317                        | ExprType::JsonbExtractPath
318                        | ExprType::JsonbExtractPathVariadic
319                        | ExprType::JsonbExtractPathText
320                        | ExprType::JsonbExtractPathTextVariadic
321                        | ExprType::JsonbPathExists
322                        | ExprType::JsonbPathMatch
323                        | ExprType::JsonbPathQueryArray
324                        | ExprType::JsonbPathQueryFirst
325                ) {
326                    self.has = true;
327                }
328            }
329        }
330
331        self.exprs.iter().any(|expr| {
332            // When there's a jsonb access in the `Project`, it's very likely that the query is
333            // extracting some fields from a jsonb payload column. In this case, a change from the
334            // input jsonb payload may not change the output of the `Project`.
335            let mut visitor = HasJsonbAccess { has: false };
336            visitor.visit_expr(expr);
337            visitor.has
338        })
339    }
340}
341
342/// Construct a `Project` and dedup expressions.
343/// expressions
344#[derive(Default)]
345pub struct ProjectBuilder {
346    exprs: Vec<ExprImpl>,
347    exprs_index: HashMap<ExprImpl, usize>,
348}
349
350impl ProjectBuilder {
351    /// add an expression to the `LogicalProject` and return the column index of the project's
352    /// output
353    pub fn add_expr(&mut self, expr: &ExprImpl) -> std::result::Result<usize, &'static str> {
354        check_expr_type(expr)?;
355        if let Some(idx) = self.exprs_index.get(expr) {
356            Ok(*idx)
357        } else {
358            let index = self.exprs.len();
359            self.exprs.push(expr.clone());
360            self.exprs_index.insert(expr.clone(), index);
361            Ok(index)
362        }
363    }
364
365    pub fn get_expr(&self, index: usize) -> Option<&ExprImpl> {
366        self.exprs.get(index)
367    }
368
369    pub fn expr_index(&self, expr: &ExprImpl) -> Option<usize> {
370        check_expr_type(expr).ok()?;
371        self.exprs_index.get(expr).copied()
372    }
373
374    /// build the `LogicalProject` from `LogicalProjectBuilder`
375    pub fn build<PlanRef: GenericPlanRef>(self, input: PlanRef) -> Project<PlanRef> {
376        Project::new(self.exprs, input)
377    }
378
379    pub fn exprs_len(&self) -> usize {
380        self.exprs.len()
381    }
382}
383
384/// Auxiliary struct for displaying `expr AS alias`
385pub struct AliasedExpr<'a> {
386    pub expr: ExprDisplay<'a>,
387    pub alias: Option<String>,
388}
389
390impl fmt::Debug for AliasedExpr<'_> {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        match &self.alias {
393            Some(alias) => write!(f, "{:?} as {}", self.expr, alias),
394            None => write!(f, "{:?}", self.expr),
395        }
396    }
397}