risingwave_frontend/optimizer/plan_node/
batch_project_set.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::batch_plan::ProjectSetNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{BatchPlanRef as PlanRef, ExprRewritable, generic};
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26    PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch,
27};
28use crate::utils::ColIndexMappingRewriteExt;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchProjectSet {
32    pub base: PlanBase<Batch>,
33    core: generic::ProjectSet<PlanRef>,
34}
35
36impl BatchProjectSet {
37    pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
38        let distribution = core
39            .i2o_col_mapping()
40            .rewrite_provided_distribution(core.input.distribution());
41
42        let base =
43            PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
44        BatchProjectSet { base, core }
45    }
46}
47
48impl_distill_by_unit!(BatchProjectSet, core, "BatchProjectSet");
49
50impl PlanTreeNodeUnary<Batch> for BatchProjectSet {
51    fn input(&self) -> PlanRef {
52        self.core.input.clone()
53    }
54
55    fn clone_with_input(&self, input: PlanRef) -> Self {
56        let mut core = self.core.clone();
57        core.input = input;
58        Self::new(core)
59    }
60}
61
62impl_plan_tree_node_for_unary! { Batch, BatchProjectSet }
63
64impl ToDistributedBatch for BatchProjectSet {
65    fn to_distributed(&self) -> Result<PlanRef> {
66        let new_input = self.input().to_distributed()?;
67        Ok(self.clone_with_input(new_input).into())
68    }
69
70    // TODO: implement to_distributed_with_required like BatchProject
71}
72
73impl ToBatchPb for BatchProjectSet {
74    fn to_batch_prost_body(&self) -> NodeBody {
75        NodeBody::ProjectSet(ProjectSetNode {
76            select_list: self
77                .core
78                .select_list
79                .iter()
80                .map(|select_item| select_item.to_project_set_select_item_proto())
81                .collect_vec(),
82        })
83    }
84}
85
86impl ToLocalBatch for BatchProjectSet {
87    fn to_local(&self) -> Result<PlanRef> {
88        let new_input = self.input().to_local()?;
89        Ok(self.clone_with_input(new_input).into())
90    }
91}
92
93impl ExprRewritable<Batch> for BatchProjectSet {
94    fn has_rewritable_expr(&self) -> bool {
95        true
96    }
97
98    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
99        let mut core = self.core.clone();
100        core.rewrite_exprs(r);
101        Self::new(core).into()
102    }
103}
104
105impl ExprVisitable for BatchProjectSet {
106    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
107        self.core.visit_exprs(v);
108    }
109}