risingwave_frontend/optimizer/plan_node/
batch_project_set.rs1use itertools::Itertools;
16use risingwave_pb::batch_plan::ProjectSetNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{BatchPlanRef as PlanRef, ExprRewritable, generic};
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26 PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch,
27};
28use crate::utils::ColIndexMappingRewriteExt;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchProjectSet {
32 pub base: PlanBase<Batch>,
33 core: generic::ProjectSet<PlanRef>,
34}
35
36impl BatchProjectSet {
37 pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
38 let distribution = core
39 .i2o_col_mapping()
40 .rewrite_provided_distribution(core.input.distribution());
41
42 let base =
43 PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
44 BatchProjectSet { base, core }
45 }
46}
47
48impl_distill_by_unit!(BatchProjectSet, core, "BatchProjectSet");
49
50impl PlanTreeNodeUnary<Batch> for BatchProjectSet {
51 fn input(&self) -> PlanRef {
52 self.core.input.clone()
53 }
54
55 fn clone_with_input(&self, input: PlanRef) -> Self {
56 let mut core = self.core.clone();
57 core.input = input;
58 Self::new(core)
59 }
60}
61
62impl_plan_tree_node_for_unary! { Batch, BatchProjectSet }
63
64impl ToDistributedBatch for BatchProjectSet {
65 fn to_distributed(&self) -> Result<PlanRef> {
66 let new_input = self.input().to_distributed()?;
67 Ok(self.clone_with_input(new_input).into())
68 }
69
70 }
72
73impl ToBatchPb for BatchProjectSet {
74 fn to_batch_prost_body(&self) -> NodeBody {
75 NodeBody::ProjectSet(ProjectSetNode {
76 select_list: self
77 .core
78 .select_list
79 .iter()
80 .map(|select_item| select_item.to_project_set_select_item_proto())
81 .collect_vec(),
82 })
83 }
84}
85
86impl ToLocalBatch for BatchProjectSet {
87 fn to_local(&self) -> Result<PlanRef> {
88 let new_input = self.input().to_local()?;
89 Ok(self.clone_with_input(new_input).into())
90 }
91}
92
93impl ExprRewritable<Batch> for BatchProjectSet {
94 fn has_rewritable_expr(&self) -> bool {
95 true
96 }
97
98 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
99 let mut core = self.core.clone();
100 core.rewrite_exprs(r);
101 Self::new(core).into()
102 }
103}
104
105impl ExprVisitable for BatchProjectSet {
106 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
107 self.core.visit_exprs(v);
108 }
109}