risingwave_frontend/optimizer/plan_rewriter/
plan_cloner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18
19use crate::PlanRef;
20use crate::optimizer::PlanRewriter;
21use crate::optimizer::plan_node::generic::GenericPlanRef;
22use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare};
23
24#[derive(Debug, Clone, Default)]
25pub struct PlanCloner {
26    /// Original share node plan id to new share node.
27    /// Rewriter will rewrite all nodes, but we need to keep the shape of the DAG.
28    share_map: HashMap<PlanNodeId, PlanRef>,
29}
30
31impl PlanCloner {
32    pub fn clone_whole_plan(plan: PlanRef) -> PlanRef {
33        let mut plan_cloner = PlanCloner {
34            share_map: Default::default(),
35        };
36        plan_cloner.rewrite(plan)
37    }
38}
39
40impl PlanRewriter for PlanCloner {
41    fn rewrite_logical_share(&mut self, share: &LogicalShare) -> PlanRef {
42        // When we use the plan rewriter, we need to take care of the share operator,
43        // because our plan is a DAG rather than a tree.
44        match self.share_map.get(&share.id()) {
45            None => {
46                let new_inputs = share
47                    .inputs()
48                    .into_iter()
49                    .map(|input| self.rewrite(input))
50                    .collect_vec();
51                let new_share = share.clone_with_inputs(&new_inputs);
52                self.share_map.insert(share.id(), new_share.clone());
53                new_share
54            }
55            Some(new_share) => new_share.clone(),
56        }
57    }
58
59    fn rewrite_stream_share(&mut self, share: &StreamShare) -> PlanRef {
60        // When we use the plan rewriter, we need to take care of the share operator,
61        // because our plan is a DAG rather than a tree.
62        match self.share_map.get(&share.id()) {
63            None => {
64                let new_inputs = share
65                    .inputs()
66                    .into_iter()
67                    .map(|input| self.rewrite(input))
68                    .collect_vec();
69                let new_share = share.clone_with_inputs(&new_inputs);
70                self.share_map.insert(share.id(), new_share.clone());
71                new_share
72            }
73            Some(new_share) => new_share.clone(),
74        }
75    }
76}