risingwave_frontend/optimizer/plan_node/
batch_project_set.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::batch_plan::ProjectSetNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, generic};
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::PlanRef;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{
27    PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch,
28};
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchProjectSet {
33    pub base: PlanBase<Batch>,
34    core: generic::ProjectSet<PlanRef>,
35}
36
37impl BatchProjectSet {
38    pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
39        let distribution = core
40            .i2o_col_mapping()
41            .rewrite_provided_distribution(core.input.distribution());
42
43        let base =
44            PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
45        BatchProjectSet { base, core }
46    }
47}
48
49impl_distill_by_unit!(BatchProjectSet, core, "BatchProjectSet");
50
51impl PlanTreeNodeUnary for BatchProjectSet {
52    fn input(&self) -> PlanRef {
53        self.core.input.clone()
54    }
55
56    fn clone_with_input(&self, input: PlanRef) -> Self {
57        let mut core = self.core.clone();
58        core.input = input;
59        Self::new(core)
60    }
61}
62
63impl_plan_tree_node_for_unary! { BatchProjectSet }
64
65impl ToDistributedBatch for BatchProjectSet {
66    fn to_distributed(&self) -> Result<PlanRef> {
67        let new_input = self.input().to_distributed()?;
68        Ok(self.clone_with_input(new_input).into())
69    }
70
71    // TODO: implement to_distributed_with_required like BatchProject
72}
73
74impl ToBatchPb for BatchProjectSet {
75    fn to_batch_prost_body(&self) -> NodeBody {
76        NodeBody::ProjectSet(ProjectSetNode {
77            select_list: self
78                .core
79                .select_list
80                .iter()
81                .map(|select_item| select_item.to_project_set_select_item_proto())
82                .collect_vec(),
83        })
84    }
85}
86
87impl ToLocalBatch for BatchProjectSet {
88    fn to_local(&self) -> Result<PlanRef> {
89        let new_input = self.input().to_local()?;
90        Ok(self.clone_with_input(new_input).into())
91    }
92}
93
94impl ExprRewritable for BatchProjectSet {
95    fn has_rewritable_expr(&self) -> bool {
96        true
97    }
98
99    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
100        let mut core = self.core.clone();
101        core.rewrite_exprs(r);
102        Self::new(core).into()
103    }
104}
105
106impl ExprVisitable for BatchProjectSet {
107    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
108        self.core.visit_exprs(v);
109    }
110}