risingwave_frontend/optimizer/plan_visitor/
mod.rs1use paste::paste;
16mod apply_visitor;
17pub use apply_visitor::*;
18mod plan_correlated_id_finder;
19pub use plan_correlated_id_finder::*;
20mod share_parent_counter;
21pub use share_parent_counter::*;
22
23#[cfg(debug_assertions)]
24mod input_ref_validator;
25#[cfg(debug_assertions)]
26pub use input_ref_validator::*;
27
28mod execution_mode_decider;
29pub use execution_mode_decider::*;
30mod temporal_join_validator;
31pub use temporal_join_validator::*;
32mod relation_collector_visitor;
33mod sys_table_visitor;
34pub use relation_collector_visitor::*;
35pub use sys_table_visitor::*;
36mod side_effect_visitor;
37pub use side_effect_visitor::*;
38mod cardinality_visitor;
39pub use cardinality_visitor::*;
40mod jsonb_stream_key_checker;
41pub use jsonb_stream_key_checker::*;
42mod distributed_dml_visitor;
43mod rw_timestamp_validator;
44pub use distributed_dml_visitor::*;
45pub use rw_timestamp_validator::*;
46
47use crate::for_each_convention_all_plan_nodes;
48use crate::optimizer::plan_node::*;
49
50pub trait DefaultBehavior<R> {
52 fn apply(&self, results: impl IntoIterator<Item = R>) -> R;
54}
55
56pub struct Merge<F>(F);
60
61impl<F, R> DefaultBehavior<R> for Merge<F>
62where
63 F: Fn(R, R) -> R,
64 R: Default,
65{
66 fn apply(&self, results: impl IntoIterator<Item = R>) -> R {
67 results.into_iter().reduce(&self.0).unwrap_or_default()
68 }
69}
70
71pub struct DefaultValue;
73
74impl<R> DefaultBehavior<R> for DefaultValue
75where
76 R: Default,
77{
78 fn apply(&self, results: impl IntoIterator<Item = R>) -> R {
79 let _ = results.into_iter().count(); R::default()
81 }
82}
83
84pub trait PlanVisitor<C: ConventionMarker> {
85 type Result;
86 fn visit(&mut self, plan: PlanRef<C>) -> Self::Result;
87}
88
89macro_rules! def_visitor {
91 ({
92 $( $convention:ident, { $( $name:ident ),* }),*
93 }) => {
94 paste! {
95 $(
96 pub trait [<$convention PlanVisitor>] {
99 type Result: Default;
100 type DefaultBehavior: DefaultBehavior<Self::Result>;
101
102 fn default_behavior() -> Self::DefaultBehavior;
104
105 fn [<visit_ $convention:snake>](&mut self, plan: PlanRef<$convention>) -> Self::Result {
106 use risingwave_common::util::recursive::{tracker, Recurse};
107 use crate::session::current::notice_to_user;
108
109 tracker!().recurse(|t| {
110 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
111 notice_to_user(PLAN_TOO_DEEP_NOTICE);
112 }
113
114 match plan.node_type() {
115 $(
116 [<$convention PlanNodeType>]::[<$convention $name>] => self.[<visit_ $convention:snake _ $name:snake>](plan.downcast_ref::<[<$convention $name>]>().unwrap()),
117 )*
118 }
119 })
120 }
121
122 $(
123 #[doc = "Visit [`" [<$convention $name>] "`] , the function should visit the inputs."]
124 fn [<visit_ $convention:snake _ $name:snake>](&mut self, plan: &[<$convention $name>]) -> Self::Result {
125 let results = plan.inputs().into_iter().map(|input| self.[<visit_ $convention:snake>](input));
126 Self::default_behavior().apply(results)
127 }
128 )*
129
130 }
131
132 impl<V: [<$convention PlanVisitor>]> PlanVisitor<$convention> for V {
133 type Result = V::Result;
134 fn visit(&mut self, plan: PlanRef<$convention>) -> Self::Result {
135 self.[<visit_ $convention:snake>](plan)
136 }
137 }
138 )*
139 }
140 }
141}
142
143for_each_convention_all_plan_nodes! { def_visitor }
144
145macro_rules! impl_has_variant {
146 ( $({$convention:ident $variant_name:ident}),* ) => {
147 paste! {
148 $(
149 pub fn [<has_ $convention:snake _ $variant_name:snake _where>]<P>(plan: PlanRef<$convention>, pred: P) -> bool
150 where
151 P: FnMut(&[<$convention $variant_name>]) -> bool,
152 {
153 struct HasWhere<P> {
154 pred: P,
155 }
156
157 impl<P> [<$convention PlanVisitor>] for HasWhere<P>
158 where
159 P: FnMut(&[<$convention $variant_name>]) -> bool,
160 {
161 type Result = bool;
162 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
163
164 fn default_behavior() -> Self::DefaultBehavior {
165 Merge(|a, b| a | b)
166 }
167
168 fn [<visit_ $convention:snake _ $variant_name:snake>](&mut self, node: &[<$convention $variant_name>]) -> Self::Result {
169 (self.pred)(node)
170 }
171 }
172
173 let mut visitor = HasWhere { pred };
174 visitor.visit(plan)
175 }
176
177 #[allow(dead_code)]
178 pub fn [<has_ $convention:snake _ $variant_name:snake>](plan: PlanRef<$convention>) -> bool {
179 [<has_ $convention:snake _$variant_name:snake _where>](plan, |_| true)
180 }
181 )*
182 }
183 };
184}
185
186impl_has_variant! {
187 {Logical Apply},
188 {Logical MaxOneRow},
189 {Logical OverWindow},
190 {Logical Scan},
191 {Logical Source},
192 {Batch Exchange},
193 {Batch SeqScan},
194 {Batch Source},
195 {Batch Insert},
196 {Batch Delete},
197 {Batch Update}
198}