risingwave_frontend/optimizer/plan_visitor/
mod.rs1use paste::paste;
16mod apply_visitor;
17pub use apply_visitor::*;
18mod plan_correlated_id_finder;
19pub use plan_correlated_id_finder::*;
20mod share_parent_counter;
21pub use share_parent_counter::*;
22
23#[cfg(debug_assertions)]
24mod input_ref_validator;
25#[cfg(debug_assertions)]
26pub use input_ref_validator::*;
27
28mod execution_mode_decider;
29pub use execution_mode_decider::*;
30mod temporal_join_validator;
31pub use temporal_join_validator::*;
32mod relation_collector_visitor;
33mod sys_table_visitor;
34pub use relation_collector_visitor::*;
35pub use sys_table_visitor::*;
36mod side_effect_visitor;
37pub use side_effect_visitor::*;
38mod cardinality_visitor;
39pub use cardinality_visitor::*;
40mod jsonb_stream_key_checker;
41pub use jsonb_stream_key_checker::*;
42mod distributed_dml_visitor;
43mod locality_provider_counter;
44mod rw_timestamp_validator;
45mod sole_sys_table_visitor;
46pub use distributed_dml_visitor::*;
47pub use locality_provider_counter::*;
48pub use rw_timestamp_validator::*;
49pub use sole_sys_table_visitor::*;
50
51#[cfg(feature = "datafusion")]
52mod datafusion_execute_checker;
53#[cfg(feature = "datafusion")]
54pub use datafusion_execute_checker::*;
55#[cfg(feature = "datafusion")]
56mod datafusion_plan_converter;
57#[cfg(feature = "datafusion")]
58pub use datafusion_plan_converter::*;
59
60use crate::for_each_convention_all_plan_nodes;
61use crate::optimizer::plan_node::*;
62
63pub trait DefaultBehavior<R> {
65 fn apply(&self, results: impl IntoIterator<Item = R>) -> R;
67}
68
69pub struct Merge<F>(F);
73
74impl<F, R> DefaultBehavior<R> for Merge<F>
75where
76 F: Fn(R, R) -> R,
77 R: Default,
78{
79 fn apply(&self, results: impl IntoIterator<Item = R>) -> R {
80 results.into_iter().reduce(&self.0).unwrap_or_default()
81 }
82}
83
84pub struct DefaultValue;
86
87impl<R> DefaultBehavior<R> for DefaultValue
88where
89 R: Default,
90{
91 fn apply(&self, results: impl IntoIterator<Item = R>) -> R {
92 let _ = results.into_iter().count(); R::default()
94 }
95}
96
97pub trait PlanVisitor<C: ConventionMarker> {
98 type Result;
99 fn visit(&mut self, plan: PlanRef<C>) -> Self::Result;
100}
101
102macro_rules! def_visitor {
104 ({
105 $( $convention:ident, { $( $name:ident ),* }),*
106 }) => {
107 paste! {
108 $(
109 pub trait [<$convention PlanVisitor>] {
112 type Result;
113 type DefaultBehavior: DefaultBehavior<Self::Result>;
114
115 fn default_behavior() -> Self::DefaultBehavior;
117
118 fn [<visit_ $convention:snake>](&mut self, plan: PlanRef<$convention>) -> Self::Result {
119 use risingwave_common::util::recursive::{tracker, Recurse};
120 use crate::session::current::notice_to_user;
121
122 tracker!().recurse(|t| {
123 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
124 notice_to_user(PLAN_TOO_DEEP_NOTICE);
125 }
126
127 match plan.node_type() {
128 $(
129 [<$convention PlanNodeType>]::[<$convention $name>] => self.[<visit_ $convention:snake _ $name:snake>](plan.downcast_ref::<[<$convention $name>]>().unwrap()),
130 )*
131 }
132 })
133 }
134
135 $(
136 #[doc = "Visit [`" [<$convention $name>] "`] , the function should visit the inputs."]
137 fn [<visit_ $convention:snake _ $name:snake>](&mut self, plan: &[<$convention $name>]) -> Self::Result {
138 let results = plan.inputs().into_iter().map(|input| self.[<visit_ $convention:snake>](input));
139 Self::default_behavior().apply(results)
140 }
141 )*
142
143 }
144
145 impl<V: [<$convention PlanVisitor>]> PlanVisitor<$convention> for V {
146 type Result = V::Result;
147 fn visit(&mut self, plan: PlanRef<$convention>) -> Self::Result {
148 self.[<visit_ $convention:snake>](plan)
149 }
150 }
151 )*
152 }
153 }
154}
155
156for_each_convention_all_plan_nodes! { def_visitor }
157
158macro_rules! impl_has_variant {
159 ( $({$convention:ident $variant_name:ident}),* ) => {
160 paste! {
161 $(
162 pub fn [<has_ $convention:snake _ $variant_name:snake _where>]<P>(plan: PlanRef<$convention>, pred: P) -> bool
163 where
164 P: FnMut(&[<$convention $variant_name>]) -> bool,
165 {
166 struct HasWhere<P> {
167 pred: P,
168 }
169
170 impl<P> [<$convention PlanVisitor>] for HasWhere<P>
171 where
172 P: FnMut(&[<$convention $variant_name>]) -> bool,
173 {
174 type Result = bool;
175 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
176
177 fn default_behavior() -> Self::DefaultBehavior {
178 Merge(|a, b| a | b)
179 }
180
181 fn [<visit_ $convention:snake _ $variant_name:snake>](&mut self, node: &[<$convention $variant_name>]) -> Self::Result {
182 (self.pred)(node)
183 }
184 }
185
186 let mut visitor = HasWhere { pred };
187 visitor.visit(plan)
188 }
189
190 #[allow(dead_code)]
191 pub fn [<has_ $convention:snake _ $variant_name:snake>](plan: PlanRef<$convention>) -> bool {
192 [<has_ $convention:snake _$variant_name:snake _where>](plan, |_| true)
193 }
194 )*
195 }
196 };
197}
198
199impl_has_variant! {
200 {Logical Apply},
201 {Logical MaxOneRow},
202 {Logical OverWindow},
203 {Logical Scan},
204 {Logical Source},
205 {Batch Exchange},
206 {Batch SeqScan},
207 {Batch Source},
208 {Batch Insert},
209 {Batch Delete},
210 {Batch Update}
211}