risingwave_frontend/optimizer/plan_visitor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use paste::paste;
16mod apply_visitor;
17pub use apply_visitor::*;
18mod plan_correlated_id_finder;
19pub use plan_correlated_id_finder::*;
20mod share_parent_counter;
21pub use share_parent_counter::*;
22
23#[cfg(debug_assertions)]
24mod input_ref_validator;
25#[cfg(debug_assertions)]
26pub use input_ref_validator::*;
27
28mod execution_mode_decider;
29pub use execution_mode_decider::*;
30mod temporal_join_validator;
31pub use temporal_join_validator::*;
32mod relation_collector_visitor;
33mod sys_table_visitor;
34pub use relation_collector_visitor::*;
35pub use sys_table_visitor::*;
36mod side_effect_visitor;
37pub use side_effect_visitor::*;
38mod cardinality_visitor;
39pub use cardinality_visitor::*;
40mod jsonb_stream_key_checker;
41pub use jsonb_stream_key_checker::*;
42mod distributed_dml_visitor;
43mod locality_provider_counter;
44mod rw_timestamp_validator;
45mod sole_sys_table_visitor;
46pub use distributed_dml_visitor::*;
47pub use locality_provider_counter::*;
48pub use rw_timestamp_validator::*;
49pub use sole_sys_table_visitor::*;
50mod iceberg_scan_detector;
51pub use iceberg_scan_detector::*;
52
53#[cfg(feature = "datafusion")]
54mod datafusion_plan_converter;
55#[cfg(feature = "datafusion")]
56pub use datafusion_plan_converter::*;
57
58use crate::for_each_convention_all_plan_nodes;
59use crate::optimizer::plan_node::*;
60
61/// The behavior for the default implementations of `visit_xxx`.
62pub trait DefaultBehavior<R> {
63    /// Apply this behavior to the plan node with the given results.
64    fn apply(&self, results: impl IntoIterator<Item = R>) -> R;
65}
66
67/// Visit all input nodes, merge the results with a function.
68/// - If there's no input node, return the default value of the result type.
69/// - If there's only a single input node, directly return its result.
70pub struct Merge<F>(F);
71
72impl<F, R> DefaultBehavior<R> for Merge<F>
73where
74    F: Fn(R, R) -> R,
75    R: Default,
76{
77    fn apply(&self, results: impl IntoIterator<Item = R>) -> R {
78        results.into_iter().reduce(&self.0).unwrap_or_default()
79    }
80}
81
82/// Visit all input nodes, return the default value of the result type.
83pub struct DefaultValue;
84
85impl<R> DefaultBehavior<R> for DefaultValue
86where
87    R: Default,
88{
89    fn apply(&self, results: impl IntoIterator<Item = R>) -> R {
90        let _ = results.into_iter().count(); // consume the iterator
91        R::default()
92    }
93}
94
95pub trait PlanVisitor<C: ConventionMarker> {
96    type Result;
97    fn visit(&mut self, plan: PlanRef<C>) -> Self::Result;
98}
99
100/// Define `PlanVisitor` trait.
101macro_rules! def_visitor {
102    ({
103        $( $convention:ident, { $( $name:ident ),* }),*
104    }) => {
105        paste! {
106            $(
107                /// The visitor for plan nodes. visit all inputs and return the ret value of the left most input,
108                /// and leaf node returns `R::default()`
109                pub trait [<$convention  PlanVisitor>] {
110                    type Result;
111                    type DefaultBehavior: DefaultBehavior<Self::Result>;
112
113                    /// The behavior for the default implementations of `visit_xxx`.
114                    fn default_behavior() -> Self::DefaultBehavior;
115
116                    fn [<visit_ $convention:snake>](&mut self, plan: PlanRef<$convention>) -> Self::Result {
117                        use risingwave_common::util::recursive::{tracker, Recurse};
118                        use crate::session::current::notice_to_user;
119
120                        tracker!().recurse(|t| {
121                            if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
122                                notice_to_user(PLAN_TOO_DEEP_NOTICE);
123                            }
124
125                            match plan.node_type() {
126                                $(
127                                    [<$convention PlanNodeType>]::[<$convention $name>] => self.[<visit_ $convention:snake _ $name:snake>](plan.downcast_ref::<[<$convention $name>]>().unwrap()),
128                                )*
129                            }
130                        })
131                    }
132
133                    $(
134                        #[doc = "Visit [`" [<$convention $name>] "`] , the function should visit the inputs."]
135                        fn [<visit_ $convention:snake _ $name:snake>](&mut self, plan: &[<$convention $name>]) -> Self::Result {
136                            let results = plan.inputs().into_iter().map(|input| self.[<visit_ $convention:snake>](input));
137                            Self::default_behavior().apply(results)
138                        }
139                    )*
140
141                }
142
143                impl<V: [<$convention  PlanVisitor>]> PlanVisitor<$convention> for V {
144                    type Result = V::Result;
145                    fn visit(&mut self, plan: PlanRef<$convention>) -> Self::Result {
146                        self.[<visit_ $convention:snake>](plan)
147                    }
148                }
149            )*
150        }
151    }
152}
153
154for_each_convention_all_plan_nodes! { def_visitor }
155
156macro_rules! impl_has_variant {
157    ( $({$convention:ident $variant_name:ident}),* ) => {
158        paste! {
159            $(
160                pub fn [<has_ $convention:snake _ $variant_name:snake _where>]<P>(plan: PlanRef<$convention>, pred: P) -> bool
161                where
162                    P: FnMut(&[<$convention $variant_name>]) -> bool,
163                {
164                    struct HasWhere<P> {
165                        pred: P,
166                    }
167
168                    impl<P> [<$convention PlanVisitor>] for HasWhere<P>
169                    where
170                        P: FnMut(&[<$convention $variant_name>]) -> bool,
171                    {
172                        type Result = bool;
173                        type DefaultBehavior = impl DefaultBehavior<Self::Result>;
174
175                        fn default_behavior() -> Self::DefaultBehavior {
176                            Merge(|a, b| a | b)
177                        }
178
179                        fn [<visit_ $convention:snake _ $variant_name:snake>](&mut self, node: &[<$convention $variant_name>]) -> Self::Result {
180                            (self.pred)(node)
181                        }
182                    }
183
184                    let mut visitor = HasWhere { pred };
185                    visitor.visit(plan)
186                }
187
188                #[allow(dead_code)]
189                pub fn [<has_ $convention:snake _ $variant_name:snake>](plan: PlanRef<$convention>) -> bool {
190                    [<has_ $convention:snake _$variant_name:snake _where>](plan, |_| true)
191                }
192            )*
193        }
194    };
195}
196
197impl_has_variant! {
198    {Logical Apply},
199    {Logical MaxOneRow},
200    {Logical OverWindow},
201    {Logical Scan},
202    {Logical Source},
203    {Batch Exchange},
204    {Batch SeqScan},
205    {Batch Source},
206    {Batch Insert},
207    {Batch Delete},
208    {Batch Update}
209}