risingwave_frontend/optimizer/plan_rewriter/
plan_cloner.rs1use std::collections::HashMap;
16
17use itertools::Itertools;
18
19use crate::PlanRef;
20use crate::optimizer::PlanRewriter;
21use crate::optimizer::plan_node::generic::GenericPlanRef;
22use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare};
23
24#[derive(Debug, Clone, Default)]
25pub struct PlanCloner {
26 share_map: HashMap<PlanNodeId, PlanRef>,
29}
30
31impl PlanCloner {
32 pub fn clone_whole_plan(plan: PlanRef) -> PlanRef {
33 let mut plan_cloner = PlanCloner {
34 share_map: Default::default(),
35 };
36 plan_cloner.rewrite(plan)
37 }
38}
39
40impl PlanRewriter for PlanCloner {
41 fn rewrite_logical_share(&mut self, share: &LogicalShare) -> PlanRef {
42 match self.share_map.get(&share.id()) {
45 None => {
46 let new_inputs = share
47 .inputs()
48 .into_iter()
49 .map(|input| self.rewrite(input))
50 .collect_vec();
51 let new_share = share.clone_with_inputs(&new_inputs);
52 self.share_map.insert(share.id(), new_share.clone());
53 new_share
54 }
55 Some(new_share) => new_share.clone(),
56 }
57 }
58
59 fn rewrite_stream_share(&mut self, share: &StreamShare) -> PlanRef {
60 match self.share_map.get(&share.id()) {
63 None => {
64 let new_inputs = share
65 .inputs()
66 .into_iter()
67 .map(|input| self.rewrite(input))
68 .collect_vec();
69 let new_share = share.clone_with_inputs(&new_inputs);
70 self.share_map.insert(share.id(), new_share.clone());
71 new_share
72 }
73 Some(new_share) => new_share.clone(),
74 }
75 }
76}