risingwave_frontend/optimizer/plan_node/generic/
project_set.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::{Field, PROJECTED_ROW_ID_COLUMN_NAME, Schema};
17use risingwave_common::types::DataType;
18
19use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
20use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor};
21use crate::optimizer::optimizer_context::OptimizerContextRef;
22use crate::optimizer::plan_node::BatchPlanRef;
23use crate::optimizer::plan_node::batch::BatchPlanNodeMetadata;
24use crate::optimizer::plan_node::utils::childless_record;
25use crate::optimizer::property::{FunctionalDependencySet, Order};
26use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
27
28/// [`ProjectSet`] projects one row multiple times according to `select_list`.
29///
30/// Different from `Project`, it supports [`TableFunction`](crate::expr::TableFunction)s.
31/// See also [`ProjectSetSelectItem`](risingwave_pb::expr::ProjectSetSelectItem) for examples.
32///
33/// To have a pk, it has a hidden column `projected_row_id` at the beginning. The implementation of
34/// `LogicalProjectSet` is highly similar to [`super::super::LogicalProject`], except for the
35/// additional hidden column.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct ProjectSet<PlanRef> {
38    pub select_list: Vec<ExprImpl>,
39    pub input: PlanRef,
40}
41
42impl<PlanRef> ProjectSet<PlanRef> {
43    pub(crate) fn clone_with_input<OtherPlanRef>(
44        &self,
45        input: OtherPlanRef,
46    ) -> ProjectSet<OtherPlanRef> {
47        ProjectSet {
48            select_list: self.select_list.clone(),
49            input,
50        }
51    }
52
53    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
54        self.select_list = self
55            .select_list
56            .iter()
57            .map(|e| r.rewrite_expr(e.clone()))
58            .collect();
59    }
60
61    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
62        self.select_list.iter().for_each(|e| v.visit_expr(e));
63    }
64
65    pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
66        (self.select_list, self.input)
67    }
68}
69
70impl<PlanRef> DistillUnit for ProjectSet<PlanRef> {
71    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
72        let fields = vec![("select_list", Pretty::debug(&self.select_list))];
73        childless_record(name, fields)
74    }
75}
76
77impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
78    fn schema(&self) -> Schema {
79        let input_schema = self.input.schema();
80        let o2i = self.o2i_col_mapping();
81        let mut fields = vec![Field::with_name(
82            DataType::Int64,
83            PROJECTED_ROW_ID_COLUMN_NAME,
84        )];
85        fields.extend(self.select_list.iter().enumerate().map(|(idx, expr)| {
86            let idx = idx + 1;
87            // Get field info from o2i.
88            let name = match o2i.try_map(idx) {
89                Some(input_idx) => input_schema.fields()[input_idx].name.clone(),
90                None => format!("{:?}", ExprDisplay { expr, input_schema }),
91            };
92            Field::with_name(expr.return_type(), name)
93        }));
94
95        Schema { fields }
96    }
97
98    fn stream_key(&self) -> Option<Vec<usize>> {
99        let i2o = self.i2o_col_mapping();
100        let mut pk = self
101            .input
102            .stream_key()?
103            .iter()
104            .map(|pk_col| i2o.try_map(*pk_col))
105            .collect::<Option<Vec<_>>>()
106            .unwrap_or_default();
107        // add `projected_row_id` to pk
108        pk.push(0);
109        Some(pk)
110    }
111
112    fn ctx(&self) -> OptimizerContextRef {
113        self.input.ctx()
114    }
115
116    fn functional_dependency(&self) -> FunctionalDependencySet {
117        let i2o = self.i2o_col_mapping();
118        i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
119    }
120}
121
122impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
123    /// Gets the Mapping of columnIndex from output column index to input column index
124    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
125        let input_len = self.input.schema().len();
126        let mut map = vec![None; 1 + self.select_list.len()];
127        for (i, item) in self.select_list.iter().enumerate() {
128            if let ExprImpl::InputRef(input) = item {
129                map[1 + i] = Some(input.index())
130            }
131        }
132        ColIndexMapping::new(map, input_len)
133    }
134
135    /// Gets the Mapping of columnIndex from input column index to output column index,if a input
136    /// column corresponds more than one out columns, mapping to any one
137    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
138        let input_len = self.input.schema().len();
139        let mut map = vec![None; input_len];
140        for (i, item) in self.select_list.iter().enumerate() {
141            if let ExprImpl::InputRef(input) = item {
142                map[input.index()] = Some(1 + i)
143            }
144        }
145        ColIndexMapping::new(map, 1 + self.select_list.len())
146    }
147}
148
149impl ProjectSet<BatchPlanRef> {
150    /// Map the order of the input to use the updated indices
151    pub fn get_out_column_index_order(&self) -> Order {
152        self.i2o_col_mapping()
153            .rewrite_provided_order(self.input.order())
154    }
155}