risingwave_frontend/optimizer/plan_node/
batch_project.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::ProjectNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::expr::ExprNode;
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
24    ToDistributedBatch, generic,
25};
26use crate::error::Result;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::ToLocalBatch;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::utils::ColIndexMappingRewriteExt;
31
32/// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
33/// rows
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchProject {
36    pub base: PlanBase<Batch>,
37    core: generic::Project<PlanRef>,
38}
39
40impl BatchProject {
41    pub fn new(core: generic::Project<PlanRef>) -> Self {
42        let distribution = core
43            .i2o_col_mapping()
44            .rewrite_provided_distribution(core.input.distribution());
45        let orders = core
46            .input
47            .orders()
48            .into_iter()
49            .map(|order| core.i2o_col_mapping().rewrite_provided_order(&order))
50            .collect();
51        let base = PlanBase::new_batch_with_core_and_orders(&core, distribution, orders);
52        BatchProject { base, core }
53    }
54
55    pub fn core(&self) -> &generic::Project<PlanRef> {
56        &self.core
57    }
58
59    pub fn exprs(&self) -> &Vec<ExprImpl> {
60        &self.core.exprs
61    }
62}
63
64impl Distill for BatchProject {
65    fn distill<'a>(&self) -> XmlNode<'a> {
66        childless_record("BatchProject", self.core.fields_pretty(self.schema()))
67    }
68}
69
70impl PlanTreeNodeUnary<Batch> for BatchProject {
71    fn input(&self) -> PlanRef {
72        self.core.input.clone()
73    }
74
75    fn clone_with_input(&self, input: PlanRef) -> Self {
76        let mut core = self.core.clone();
77        core.input = input;
78        Self::new(core)
79    }
80}
81
82impl_plan_tree_node_for_unary! { Batch, BatchProject }
83
84impl ToDistributedBatch for BatchProject {
85    fn to_distributed(&self) -> Result<PlanRef> {
86        let new_input = self.input().to_distributed()?;
87        Ok(self.clone_with_input(new_input).into())
88    }
89}
90
91impl ToBatchPb for BatchProject {
92    fn to_batch_prost_body(&self) -> NodeBody {
93        let select_list = self
94            .core
95            .exprs
96            .iter()
97            .map(|expr| expr.to_expr_proto())
98            .collect::<Vec<ExprNode>>();
99        NodeBody::Project(ProjectNode { select_list })
100    }
101}
102
103impl ToLocalBatch for BatchProject {
104    fn to_local(&self) -> Result<PlanRef> {
105        let new_input = self.input().to_local()?;
106        Ok(self.clone_with_input(new_input).into())
107    }
108}
109
110impl ExprRewritable<Batch> for BatchProject {
111    fn has_rewritable_expr(&self) -> bool {
112        true
113    }
114
115    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
116        let mut core = self.core.clone();
117        core.rewrite_exprs(r);
118        Self::new(core).into()
119    }
120}
121
122impl ExprVisitable for BatchProject {
123    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
124        self.core.visit_exprs(v);
125    }
126}