risingwave_frontend/optimizer/plan_node/
batch_project_set.rs1use itertools::Itertools;
16use risingwave_pb::batch_plan::ProjectSetNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, generic};
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::PlanRef;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{
27 PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch,
28};
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchProjectSet {
33 pub base: PlanBase<Batch>,
34 core: generic::ProjectSet<PlanRef>,
35}
36
37impl BatchProjectSet {
38 pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
39 let distribution = core
40 .i2o_col_mapping()
41 .rewrite_provided_distribution(core.input.distribution());
42
43 let base =
44 PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
45 BatchProjectSet { base, core }
46 }
47}
48
49impl_distill_by_unit!(BatchProjectSet, core, "BatchProjectSet");
50
51impl PlanTreeNodeUnary for BatchProjectSet {
52 fn input(&self) -> PlanRef {
53 self.core.input.clone()
54 }
55
56 fn clone_with_input(&self, input: PlanRef) -> Self {
57 let mut core = self.core.clone();
58 core.input = input;
59 Self::new(core)
60 }
61}
62
63impl_plan_tree_node_for_unary! { BatchProjectSet }
64
65impl ToDistributedBatch for BatchProjectSet {
66 fn to_distributed(&self) -> Result<PlanRef> {
67 let new_input = self.input().to_distributed()?;
68 Ok(self.clone_with_input(new_input).into())
69 }
70
71 }
73
74impl ToBatchPb for BatchProjectSet {
75 fn to_batch_prost_body(&self) -> NodeBody {
76 NodeBody::ProjectSet(ProjectSetNode {
77 select_list: self
78 .core
79 .select_list
80 .iter()
81 .map(|select_item| select_item.to_project_set_select_item_proto())
82 .collect_vec(),
83 })
84 }
85}
86
87impl ToLocalBatch for BatchProjectSet {
88 fn to_local(&self) -> Result<PlanRef> {
89 let new_input = self.input().to_local()?;
90 Ok(self.clone_with_input(new_input).into())
91 }
92}
93
94impl ExprRewritable for BatchProjectSet {
95 fn has_rewritable_expr(&self) -> bool {
96 true
97 }
98
99 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
100 let mut core = self.core.clone();
101 core.rewrite_exprs(r);
102 Self::new(core).into()
103 }
104}
105
106impl ExprVisitable for BatchProjectSet {
107 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
108 self.core.visit_exprs(v);
109 }
110}