risingwave_frontend/optimizer/plan_node/
batch_project.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::ProjectNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::expr::ExprNode;
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
24 ToDistributedBatch, generic,
25};
26use crate::error::Result;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::ToLocalBatch;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::utils::ColIndexMappingRewriteExt;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchProject {
36 pub base: PlanBase<Batch>,
37 core: generic::Project<PlanRef>,
38}
39
40impl BatchProject {
41 pub fn new(core: generic::Project<PlanRef>) -> Self {
42 let distribution = core
43 .i2o_col_mapping()
44 .rewrite_provided_distribution(core.input.distribution());
45 let order = core
46 .i2o_col_mapping()
47 .rewrite_provided_order(core.input.order());
48
49 let base = PlanBase::new_batch_with_core(&core, distribution, order);
50 BatchProject { base, core }
51 }
52
53 pub fn core(&self) -> &generic::Project<PlanRef> {
54 &self.core
55 }
56
57 pub fn exprs(&self) -> &Vec<ExprImpl> {
58 &self.core.exprs
59 }
60}
61
62impl Distill for BatchProject {
63 fn distill<'a>(&self) -> XmlNode<'a> {
64 childless_record("BatchProject", self.core.fields_pretty(self.schema()))
65 }
66}
67
68impl PlanTreeNodeUnary<Batch> for BatchProject {
69 fn input(&self) -> PlanRef {
70 self.core.input.clone()
71 }
72
73 fn clone_with_input(&self, input: PlanRef) -> Self {
74 let mut core = self.core.clone();
75 core.input = input;
76 Self::new(core)
77 }
78}
79
80impl_plan_tree_node_for_unary! { Batch, BatchProject }
81
82impl ToDistributedBatch for BatchProject {
83 fn to_distributed(&self) -> Result<PlanRef> {
84 let new_input = self.input().to_distributed()?;
85 Ok(self.clone_with_input(new_input).into())
86 }
87}
88
89impl ToBatchPb for BatchProject {
90 fn to_batch_prost_body(&self) -> NodeBody {
91 let select_list = self
92 .core
93 .exprs
94 .iter()
95 .map(|expr| expr.to_expr_proto())
96 .collect::<Vec<ExprNode>>();
97 NodeBody::Project(ProjectNode { select_list })
98 }
99}
100
101impl ToLocalBatch for BatchProject {
102 fn to_local(&self) -> Result<PlanRef> {
103 let new_input = self.input().to_local()?;
104 Ok(self.clone_with_input(new_input).into())
105 }
106}
107
108impl ExprRewritable<Batch> for BatchProject {
109 fn has_rewritable_expr(&self) -> bool {
110 true
111 }
112
113 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
114 let mut core = self.core.clone();
115 core.rewrite_exprs(r);
116 Self::new(core).into()
117 }
118}
119
120impl ExprVisitable for BatchProject {
121 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
122 self.core.visit_exprs(v);
123 }
124}