risingwave_frontend/optimizer/plan_node/
batch_project.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::ProjectNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::expr::ExprNode;
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
24};
25use crate::error::Result;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::ToLocalBatch;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchProject {
35 pub base: PlanBase<Batch>,
36 core: generic::Project<PlanRef>,
37}
38
39impl BatchProject {
40 pub fn new(core: generic::Project<PlanRef>) -> Self {
41 let distribution = core
42 .i2o_col_mapping()
43 .rewrite_provided_distribution(core.input.distribution());
44 let order = core
45 .i2o_col_mapping()
46 .rewrite_provided_order(core.input.order());
47
48 let base = PlanBase::new_batch_with_core(&core, distribution, order);
49 BatchProject { base, core }
50 }
51
52 pub fn as_logical(&self) -> &generic::Project<PlanRef> {
53 &self.core
54 }
55
56 pub fn exprs(&self) -> &Vec<ExprImpl> {
57 &self.core.exprs
58 }
59}
60
61impl Distill for BatchProject {
62 fn distill<'a>(&self) -> XmlNode<'a> {
63 childless_record("BatchProject", self.core.fields_pretty(self.schema()))
64 }
65}
66
67impl PlanTreeNodeUnary for BatchProject {
68 fn input(&self) -> PlanRef {
69 self.core.input.clone()
70 }
71
72 fn clone_with_input(&self, input: PlanRef) -> Self {
73 let mut core = self.core.clone();
74 core.input = input;
75 Self::new(core)
76 }
77}
78
79impl_plan_tree_node_for_unary! { BatchProject }
80
81impl ToDistributedBatch for BatchProject {
82 fn to_distributed(&self) -> Result<PlanRef> {
83 let new_input = self.input().to_distributed()?;
84 Ok(self.clone_with_input(new_input).into())
85 }
86}
87
88impl ToBatchPb for BatchProject {
89 fn to_batch_prost_body(&self) -> NodeBody {
90 let select_list = self
91 .core
92 .exprs
93 .iter()
94 .map(|expr| expr.to_expr_proto())
95 .collect::<Vec<ExprNode>>();
96 NodeBody::Project(ProjectNode { select_list })
97 }
98}
99
100impl ToLocalBatch for BatchProject {
101 fn to_local(&self) -> Result<PlanRef> {
102 let new_input = self.input().to_local()?;
103 Ok(self.clone_with_input(new_input).into())
104 }
105}
106
107impl ExprRewritable for BatchProject {
108 fn has_rewritable_expr(&self) -> bool {
109 true
110 }
111
112 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
113 let mut core = self.core.clone();
114 core.rewrite_exprs(r);
115 Self::new(core).into()
116 }
117}
118
119impl ExprVisitable for BatchProject {
120 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
121 self.core.visit_exprs(v);
122 }
123}