risingwave_frontend/optimizer/plan_node/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines all kinds of node in the plan tree, each node represent a relational expression.
16//!
17//! We use a immutable style tree structure, every Node are immutable and cannot be modified after
18//! it has been created. If you want to modify the node, such as rewriting the expression in a
19//! `ProjectNode` or changing a node's input node, you need to create a new node. We use Rc as the
20//! node's reference, and a node just storage its inputs' reference, so change a node just need
21//! create one new node but not the entire sub-tree.
22//!
23//! So when you want to add a new node, make sure:
24//! - each field in the node struct are private
25//! - recommend to implement the construction of Node in a unified `new()` function, if have multi
26//!   methods to construct, make they have a consistent behavior
27//! - all field should be valued in construction, so the properties' derivation should be finished
28//!   in the `new()` function.
29
30use std::collections::HashMap;
31use std::fmt::Debug;
32use std::hash::Hash;
33use std::ops::Deref;
34use std::rc::Rc;
35
36use downcast_rs::{Downcast, impl_downcast};
37use dyn_clone::DynClone;
38use itertools::Itertools;
39use paste::paste;
40use petgraph::dot::{Config, Dot};
41use petgraph::graph::Graph;
42use pretty_xmlish::{Pretty, PrettyConfig};
43use risingwave_common::catalog::Schema;
44use risingwave_common::util::recursive::{self, Recurse};
45use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
46use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
47use serde::Serialize;
48use smallvec::SmallVec;
49
50use self::batch::BatchPlanRef;
51use self::generic::{GenericPlanRef, PhysicalPlanRef};
52use self::stream::StreamPlanRef;
53use self::utils::Distill;
54use super::property::{
55    Distribution, FunctionalDependencySet, MonotonicityMap, Order, WatermarkColumns,
56};
57use crate::error::{ErrorCode, Result};
58use crate::optimizer::ExpressionSimplifyRewriter;
59use crate::session::current::notice_to_user;
60use crate::utils::{PrettySerde, build_graph_from_pretty};
61
62/// A marker trait for different conventions, used for enforcing type safety.
63///
64/// Implementors are [`Logical`], [`Batch`], and [`Stream`].
65pub trait ConventionMarker: 'static + Sized {
66    /// The extra fields in the [`PlanBase`] of this convention.
67    type Extra: 'static + Eq + Hash + Clone + Debug;
68
69    /// Get the [`Convention`] enum value.
70    fn value() -> Convention;
71}
72
73/// The marker for logical convention.
74pub struct Logical;
75impl ConventionMarker for Logical {
76    type Extra = plan_base::NoExtra;
77
78    fn value() -> Convention {
79        Convention::Logical
80    }
81}
82
83/// The marker for batch convention.
84pub struct Batch;
85impl ConventionMarker for Batch {
86    type Extra = plan_base::BatchExtra;
87
88    fn value() -> Convention {
89        Convention::Batch
90    }
91}
92
93/// The marker for stream convention.
94pub struct Stream;
95impl ConventionMarker for Stream {
96    type Extra = plan_base::StreamExtra;
97
98    fn value() -> Convention {
99        Convention::Stream
100    }
101}
102
103/// The trait for accessing the meta data and [`PlanBase`] for plan nodes.
104pub trait PlanNodeMeta {
105    type Convention: ConventionMarker;
106
107    const NODE_TYPE: PlanNodeType;
108
109    /// Get the reference to the [`PlanBase`] with corresponding convention.
110    fn plan_base(&self) -> &PlanBase<Self::Convention>;
111
112    /// Get the reference to the [`PlanBase`] with erased convention.
113    ///
114    /// This is mainly used for implementing [`AnyPlanNodeMeta`]. Callers should prefer
115    /// [`PlanNodeMeta::plan_base`] instead as it is more type-safe.
116    fn plan_base_ref(&self) -> PlanBaseRef<'_>;
117}
118
119// Intentionally made private.
120mod plan_node_meta {
121    use super::*;
122
123    /// The object-safe version of [`PlanNodeMeta`], used as a super trait of [`PlanNode`].
124    ///
125    /// Check [`PlanNodeMeta`] for more details.
126    pub trait AnyPlanNodeMeta {
127        fn node_type(&self) -> PlanNodeType;
128        fn plan_base(&self) -> PlanBaseRef<'_>;
129        fn convention(&self) -> Convention;
130    }
131
132    /// Implement [`AnyPlanNodeMeta`] for all [`PlanNodeMeta`].
133    impl<P> AnyPlanNodeMeta for P
134    where
135        P: PlanNodeMeta,
136    {
137        fn node_type(&self) -> PlanNodeType {
138            P::NODE_TYPE
139        }
140
141        fn plan_base(&self) -> PlanBaseRef<'_> {
142            PlanNodeMeta::plan_base_ref(self)
143        }
144
145        fn convention(&self) -> Convention {
146            P::Convention::value()
147        }
148    }
149}
150use plan_node_meta::AnyPlanNodeMeta;
151
152/// The common trait over all plan nodes. Used by optimizer framework which will treat all node as
153/// `dyn PlanNode`
154///
155/// We split the trait into lots of sub-trait so that we can easily use macro to impl them.
156pub trait PlanNode:
157    PlanTreeNode
158    + DynClone
159    + DynEq
160    + DynHash
161    + Distill
162    + Debug
163    + Downcast
164    + ColPrunable
165    + ExprRewritable
166    + ExprVisitable
167    + ToBatch
168    + ToStream
169    + ToDistributedBatch
170    + ToPb
171    + ToLocalBatch
172    + PredicatePushdown
173    + AnyPlanNodeMeta
174{
175}
176
177impl Hash for dyn PlanNode {
178    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
179        self.dyn_hash(state);
180    }
181}
182
183impl PartialEq for dyn PlanNode {
184    fn eq(&self, other: &Self) -> bool {
185        self.dyn_eq(other.as_dyn_eq())
186    }
187}
188
189impl Eq for dyn PlanNode {}
190
191impl_downcast!(PlanNode);
192
193// Using a new type wrapper allows direct function implementation on `PlanRef`,
194// and we currently need a manual implementation of `PartialEq` for `PlanRef`.
195#[allow(clippy::derived_hash_with_manual_eq)]
196#[derive(Clone, Debug, Eq, Hash)]
197pub struct PlanRef(Rc<dyn PlanNode>);
198
199// Cannot use the derived implementation for now.
200// See https://github.com/rust-lang/rust/issues/31740
201#[allow(clippy::op_ref)]
202impl PartialEq for PlanRef {
203    fn eq(&self, other: &Self) -> bool {
204        &self.0 == &other.0
205    }
206}
207
208impl Deref for PlanRef {
209    type Target = dyn PlanNode;
210
211    fn deref(&self) -> &Self::Target {
212        self.0.deref()
213    }
214}
215
216impl<T: PlanNode> From<T> for PlanRef {
217    fn from(value: T) -> Self {
218        PlanRef(Rc::new(value))
219    }
220}
221
222impl Layer for PlanRef {
223    type Sub = Self;
224
225    fn map<F>(self, f: F) -> Self
226    where
227        F: FnMut(Self::Sub) -> Self::Sub,
228    {
229        self.clone_with_inputs(&self.inputs().into_iter().map(f).collect_vec())
230    }
231
232    fn descent<F>(&self, f: F)
233    where
234        F: FnMut(&Self::Sub),
235    {
236        self.inputs().iter().for_each(f);
237    }
238}
239
240#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
241pub struct PlanNodeId(pub i32);
242
243/// A more sophisticated `Endo` taking into account of the DAG structure of `PlanRef`.
244/// In addition to `Endo`, one have to specify the `cached` function
245/// to persist transformed `LogicalShare` and their results,
246/// and the `dag_apply` function will take care to only transform every `LogicalShare` nodes once.
247///
248/// Note: Due to the way super trait is designed in rust,
249/// one need to have separate implementation blocks of `Endo<PlanRef>` and `EndoPlan`.
250/// And conventionally the real transformation `apply` is under `Endo<PlanRef>`,
251/// although one can refer to `dag_apply` in the implementation of `apply`.
252pub trait EndoPlan: Endo<PlanRef> {
253    // Return the cached result of `plan` if present,
254    // otherwise store and return the value provided by `f`.
255    // Notice that to allow mutable access of `self` in `f`,
256    // we let `f` to take `&mut Self` as its first argument.
257    fn cached<F>(&mut self, plan: PlanRef, f: F) -> PlanRef
258    where
259        F: FnMut(&mut Self) -> PlanRef;
260
261    fn dag_apply(&mut self, plan: PlanRef) -> PlanRef {
262        match plan.as_logical_share() {
263            Some(_) => self.cached(plan.clone(), |this| this.tree_apply(plan.clone())),
264            None => self.tree_apply(plan),
265        }
266    }
267}
268
269/// A more sophisticated `Visit` taking into account of the DAG structure of `PlanRef`.
270/// In addition to `Visit`, one have to specify `visited`
271/// to store and report visited `LogicalShare` nodes,
272/// and the `dag_visit` function will take care to only visit every `LogicalShare` nodes once.
273/// See also `EndoPlan`.
274pub trait VisitPlan: Visit<PlanRef> {
275    // Skip visiting `plan` if visited, otherwise run the traversal provided by `f`.
276    // Notice that to allow mutable access of `self` in `f`,
277    // we let `f` to take `&mut Self` as its first argument.
278    fn visited<F>(&mut self, plan: &PlanRef, f: F)
279    where
280        F: FnMut(&mut Self);
281
282    fn dag_visit(&mut self, plan: &PlanRef) {
283        match plan.as_logical_share() {
284            Some(_) => self.visited(plan, |this| this.tree_visit(plan)),
285            None => self.tree_visit(plan),
286        }
287    }
288}
289
290#[derive(Clone, Copy, Debug, PartialEq)]
291pub enum Convention {
292    Logical,
293    Batch,
294    Stream,
295}
296
297pub(crate) trait RewriteExprsRecursive {
298    fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef;
299}
300
301impl RewriteExprsRecursive for PlanRef {
302    fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef {
303        let new = self.rewrite_exprs(r);
304        let inputs: Vec<PlanRef> = new
305            .inputs()
306            .iter()
307            .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r))
308            .collect();
309        new.clone_with_inputs(&inputs[..])
310    }
311}
312
313pub(crate) trait VisitExprsRecursive {
314    fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor);
315}
316
317impl VisitExprsRecursive for PlanRef {
318    fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor) {
319        self.visit_exprs(r);
320        self.inputs()
321            .iter()
322            .for_each(|plan_ref| plan_ref.visit_exprs_recursive(r));
323    }
324}
325
326impl PlanRef {
327    pub fn expect_stream_key(&self) -> &[usize] {
328        self.stream_key().unwrap_or_else(|| {
329            panic!(
330                "a stream key is expected but not exist, plan:\n{}",
331                self.explain_to_string()
332            )
333        })
334    }
335
336    fn prune_col_inner(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
337        if let Some(logical_share) = self.as_logical_share() {
338            // Check the share cache first. If cache exists, it means this is the second round of
339            // column pruning.
340            if let Some((new_share, merge_required_cols)) = ctx.get_share_cache(self.id()) {
341                // Piggyback share remove if its has only one parent.
342                if ctx.get_parent_num(logical_share) == 1 {
343                    let input: PlanRef = logical_share.input();
344                    return input.prune_col(required_cols, ctx);
345                }
346
347                // If it is the first visit, recursively call `prune_col` for its input and
348                // replace it.
349                if ctx.visit_share_at_second_round(self.id()) {
350                    let new_logical_share: &LogicalShare = new_share
351                        .as_logical_share()
352                        .expect("must be share operator");
353                    let new_share_input = new_logical_share.input().prune_col(
354                        &(0..new_logical_share.base.schema().len()).collect_vec(),
355                        ctx,
356                    );
357                    new_logical_share.replace_input(new_share_input);
358                }
359
360                // Calculate the new required columns based on the new share.
361                let new_required_cols: Vec<usize> = required_cols
362                    .iter()
363                    .map(|col| merge_required_cols.iter().position(|x| x == col).unwrap())
364                    .collect_vec();
365                let mapping = ColIndexMapping::with_remaining_columns(
366                    &new_required_cols,
367                    new_share.schema().len(),
368                );
369                return LogicalProject::with_mapping(new_share, mapping).into();
370            }
371
372            // `LogicalShare` can't clone, so we implement column pruning for `LogicalShare`
373            // here.
374            // Basically, we need to wait for all parents of `LogicalShare` to prune columns before
375            // we merge the required columns and prune.
376            let parent_has_pushed = ctx.add_required_cols(self.id(), required_cols.into());
377            if parent_has_pushed == ctx.get_parent_num(logical_share) {
378                let merge_require_cols = ctx
379                    .take_required_cols(self.id())
380                    .expect("must have required columns")
381                    .into_iter()
382                    .flat_map(|x| x.into_iter())
383                    .sorted()
384                    .dedup()
385                    .collect_vec();
386                let input: PlanRef = logical_share.input();
387                let input = input.prune_col(&merge_require_cols, ctx);
388
389                // Cache the new share operator for the second round.
390                let new_logical_share = LogicalShare::create(input.clone());
391                ctx.add_share_cache(self.id(), new_logical_share, merge_require_cols.clone());
392
393                let exprs = logical_share
394                    .base
395                    .schema()
396                    .fields
397                    .iter()
398                    .enumerate()
399                    .map(|(i, field)| {
400                        if let Some(pos) = merge_require_cols.iter().position(|x| *x == i) {
401                            ExprImpl::InputRef(Box::new(InputRef::new(
402                                pos,
403                                field.data_type.clone(),
404                            )))
405                        } else {
406                            ExprImpl::Literal(Box::new(Literal::new(None, field.data_type.clone())))
407                        }
408                    })
409                    .collect_vec();
410                let project = LogicalProject::create(input, exprs);
411                logical_share.replace_input(project);
412            }
413            let mapping =
414                ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
415            LogicalProject::with_mapping(self.clone(), mapping).into()
416        } else {
417            // Dispatch to dyn PlanNode instead of PlanRef.
418            let dyn_t = self.deref();
419            dyn_t.prune_col(required_cols, ctx)
420        }
421    }
422
423    fn predicate_pushdown_inner(
424        &self,
425        predicate: Condition,
426        ctx: &mut PredicatePushdownContext,
427    ) -> PlanRef {
428        if let Some(logical_share) = self.as_logical_share() {
429            // Piggyback share remove if its has only one parent.
430            if ctx.get_parent_num(logical_share) == 1 {
431                let input: PlanRef = logical_share.input();
432                return input.predicate_pushdown(predicate, ctx);
433            }
434
435            // `LogicalShare` can't clone, so we implement predicate pushdown for `LogicalShare`
436            // here.
437            // Basically, we need to wait for all parents of `LogicalShare` to push down the
438            // predicate before we merge the predicates and pushdown.
439            let parent_has_pushed = ctx.add_predicate(self.id(), predicate.clone());
440            if parent_has_pushed == ctx.get_parent_num(logical_share) {
441                let merge_predicate = ctx
442                    .take_predicate(self.id())
443                    .expect("must have predicate")
444                    .into_iter()
445                    .map(|mut c| Condition {
446                        conjunctions: c
447                            .conjunctions
448                            .extract_if(.., |e| {
449                                // If predicates contain now, impure or correlated input ref, don't push through share operator.
450                                // The predicate with now() function is regarded as a temporal filter predicate, which will be transformed to a temporal filter operator and can not do the OR operation with other predicates.
451                                let mut finder = ExprCorrelatedIdFinder::default();
452                                finder.visit_expr(e);
453                                e.count_nows() == 0
454                                    && e.is_pure()
455                                    && !finder.has_correlated_input_ref()
456                            })
457                            .collect(),
458                    })
459                    .reduce(|a, b| a.or(b))
460                    .unwrap();
461
462                // rewrite the *entire* predicate for `LogicalShare`
463                // before pushing down to whatever plan node(s)
464                // ps: the reason here contains a "special" optimization
465                // rather than directly apply explicit rule in stream or
466                // batch plan optimization, is because predicate push down
467                // will *instantly* push down all predicates, and rule(s)
468                // can not be applied in the middle.
469                // thus we need some on-the-fly (in the middle) rewrite
470                // technique to help with this kind of optimization.
471                let mut expr_rewriter = ExpressionSimplifyRewriter {};
472                let mut new_predicate = Condition::true_cond();
473
474                for c in merge_predicate.conjunctions {
475                    let c = Condition::with_expr(expr_rewriter.rewrite_cond(c));
476                    // rebuild the conjunctions
477                    new_predicate = new_predicate.and(c);
478                }
479
480                let input: PlanRef = logical_share.input();
481                let input = input.predicate_pushdown(new_predicate, ctx);
482                logical_share.replace_input(input);
483            }
484            LogicalFilter::create(self.clone(), predicate)
485        } else {
486            // Dispatch to dyn PlanNode instead of PlanRef.
487            let dyn_t = self.deref();
488            dyn_t.predicate_pushdown(predicate, ctx)
489        }
490    }
491}
492
493impl ColPrunable for PlanRef {
494    #[allow(clippy::let_and_return)]
495    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
496        let res = self.prune_col_inner(required_cols, ctx);
497        #[cfg(debug_assertions)]
498        super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
499            "column pruning",
500            &LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
501            &res,
502        );
503        res
504    }
505}
506
507impl PredicatePushdown for PlanRef {
508    #[allow(clippy::let_and_return)]
509    fn predicate_pushdown(
510        &self,
511        predicate: Condition,
512        ctx: &mut PredicatePushdownContext,
513    ) -> PlanRef {
514        #[cfg(debug_assertions)]
515        let predicate_clone = predicate.clone();
516
517        let res = self.predicate_pushdown_inner(predicate, ctx);
518
519        #[cfg(debug_assertions)]
520        super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
521            "predicate push down",
522            &LogicalFilter::new(self.clone(), predicate_clone).into(),
523            &res,
524        );
525
526        res
527    }
528}
529
530impl PlanTreeNode for PlanRef {
531    fn inputs(&self) -> SmallVec<[PlanRef; 2]> {
532        // Dispatch to dyn PlanNode instead of PlanRef.
533        let dyn_t = self.deref();
534        dyn_t.inputs()
535    }
536
537    fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
538        if let Some(logical_share) = self.clone().as_logical_share() {
539            assert_eq!(inputs.len(), 1);
540            // We can't clone `LogicalShare`, but only can replace input instead.
541            logical_share.replace_input(inputs[0].clone());
542            self.clone()
543        } else if let Some(stream_share) = self.clone().as_stream_share() {
544            assert_eq!(inputs.len(), 1);
545            // We can't clone `StreamShare`, but only can replace input instead.
546            stream_share.replace_input(inputs[0].clone());
547            self.clone()
548        } else {
549            // Dispatch to dyn PlanNode instead of PlanRef.
550            let dyn_t = self.deref();
551            dyn_t.clone_with_inputs(inputs)
552        }
553    }
554}
555
556/// Implement again for the `dyn` newtype wrapper.
557impl AnyPlanNodeMeta for PlanRef {
558    fn node_type(&self) -> PlanNodeType {
559        self.0.node_type()
560    }
561
562    fn plan_base(&self) -> PlanBaseRef<'_> {
563        self.0.plan_base()
564    }
565
566    fn convention(&self) -> Convention {
567        self.0.convention()
568    }
569}
570
571/// Allow access to all fields defined in [`GenericPlanRef`] for the type-erased plan node.
572// TODO: may also implement on `dyn PlanNode` directly.
573impl GenericPlanRef for PlanRef {
574    fn id(&self) -> PlanNodeId {
575        self.plan_base().id()
576    }
577
578    fn schema(&self) -> &Schema {
579        self.plan_base().schema()
580    }
581
582    fn stream_key(&self) -> Option<&[usize]> {
583        self.plan_base().stream_key()
584    }
585
586    fn ctx(&self) -> OptimizerContextRef {
587        self.plan_base().ctx()
588    }
589
590    fn functional_dependency(&self) -> &FunctionalDependencySet {
591        self.plan_base().functional_dependency()
592    }
593}
594
595/// Allow access to all fields defined in [`PhysicalPlanRef`] for the type-erased plan node.
596// TODO: may also implement on `dyn PlanNode` directly.
597impl PhysicalPlanRef for PlanRef {
598    fn distribution(&self) -> &Distribution {
599        self.plan_base().distribution()
600    }
601}
602
603/// Allow access to all fields defined in [`StreamPlanRef`] for the type-erased plan node.
604// TODO: may also implement on `dyn PlanNode` directly.
605impl StreamPlanRef for PlanRef {
606    fn append_only(&self) -> bool {
607        self.plan_base().append_only()
608    }
609
610    fn emit_on_window_close(&self) -> bool {
611        self.plan_base().emit_on_window_close()
612    }
613
614    fn watermark_columns(&self) -> &WatermarkColumns {
615        self.plan_base().watermark_columns()
616    }
617
618    fn columns_monotonicity(&self) -> &MonotonicityMap {
619        self.plan_base().columns_monotonicity()
620    }
621}
622
623/// Allow access to all fields defined in [`BatchPlanRef`] for the type-erased plan node.
624// TODO: may also implement on `dyn PlanNode` directly.
625impl BatchPlanRef for PlanRef {
626    fn order(&self) -> &Order {
627        self.plan_base().order()
628    }
629}
630
631/// In order to let expression display id started from 1 for explaining, hidden column names and
632/// other places. We will reset expression display id to 0 and clone the whole plan to reset the
633/// schema.
634pub fn reorganize_elements_id(plan: PlanRef) -> PlanRef {
635    let backup = plan.ctx().backup_elem_ids();
636    plan.ctx().reset_elem_ids();
637    let plan = PlanCloner::clone_whole_plan(plan);
638    plan.ctx().restore_elem_ids(backup);
639    plan
640}
641
642pub trait Explain {
643    /// Write explain the whole plan tree.
644    fn explain<'a>(&self) -> Pretty<'a>;
645
646    /// Write explain the whole plan tree with node id.
647    fn explain_with_id<'a>(&self) -> Pretty<'a>;
648
649    /// Explain the plan node and return a string.
650    fn explain_to_string(&self) -> String;
651
652    /// Explain the plan node and return a json string.
653    fn explain_to_json(&self) -> String;
654
655    /// Explain the plan node and return a xml string.
656    fn explain_to_xml(&self) -> String;
657
658    /// Explain the plan node and return a yaml string.
659    fn explain_to_yaml(&self) -> String;
660
661    /// Explain the plan node and return a dot format string.
662    fn explain_to_dot(&self) -> String;
663}
664
665impl Explain for PlanRef {
666    /// Write explain the whole plan tree.
667    fn explain<'a>(&self) -> Pretty<'a> {
668        let mut node = self.distill();
669        let inputs = self.inputs();
670        for input in inputs.iter().peekable() {
671            node.children.push(input.explain());
672        }
673        Pretty::Record(node)
674    }
675
676    /// Write explain the whole plan tree with node id.
677    fn explain_with_id<'a>(&self) -> Pretty<'a> {
678        let node_id = self.id();
679        let mut node = self.distill();
680        // NOTE(kwannoel): Can lead to poor performance if plan is very large,
681        // but we want to show the id first.
682        node.fields
683            .insert(0, ("id".into(), Pretty::display(&node_id.0)));
684        let inputs = self.inputs();
685        for input in inputs.iter().peekable() {
686            node.children.push(input.explain_with_id());
687        }
688        Pretty::Record(node)
689    }
690
691    /// Explain the plan node and return a string.
692    fn explain_to_string(&self) -> String {
693        let plan = reorganize_elements_id(self.clone());
694
695        let mut output = String::with_capacity(2048);
696        let mut config = pretty_config();
697        config.unicode(&mut output, &plan.explain());
698        output
699    }
700
701    /// Explain the plan node and return a json string.
702    fn explain_to_json(&self) -> String {
703        let plan = reorganize_elements_id(self.clone());
704        let explain_ir = plan.explain();
705        serde_json::to_string_pretty(&PrettySerde(explain_ir, true))
706            .expect("failed to serialize plan to json")
707    }
708
709    /// Explain the plan node and return a xml string.
710    fn explain_to_xml(&self) -> String {
711        let plan = reorganize_elements_id(self.clone());
712        let explain_ir = plan.explain();
713        quick_xml::se::to_string(&PrettySerde(explain_ir, true))
714            .expect("failed to serialize plan to xml")
715    }
716
717    /// Explain the plan node and return a yaml string.
718    fn explain_to_yaml(&self) -> String {
719        let plan = reorganize_elements_id(self.clone());
720        let explain_ir = plan.explain();
721        serde_yaml::to_string(&PrettySerde(explain_ir, true))
722            .expect("failed to serialize plan to yaml")
723    }
724
725    /// Explain the plan node and return a dot format string.
726    fn explain_to_dot(&self) -> String {
727        let plan = reorganize_elements_id(self.clone());
728        let explain_ir = plan.explain_with_id();
729        let mut graph = Graph::<String, String>::new();
730        let mut nodes = HashMap::new();
731        build_graph_from_pretty(&explain_ir, &mut graph, &mut nodes, None);
732        let dot = Dot::with_config(&graph, &[Config::EdgeNoLabel]);
733        dot.to_string()
734    }
735}
736
737pub(crate) fn pretty_config() -> PrettyConfig {
738    PrettyConfig {
739        indent: 3,
740        need_boundaries: false,
741        width: 2048,
742        reduced_spaces: true,
743    }
744}
745
746/// Directly implement methods for [`PlanNode`] to access the fields defined in [`GenericPlanRef`].
747// TODO: always require `GenericPlanRef` to make it more consistent.
748impl dyn PlanNode {
749    pub fn id(&self) -> PlanNodeId {
750        self.plan_base().id()
751    }
752
753    pub fn ctx(&self) -> OptimizerContextRef {
754        self.plan_base().ctx().clone()
755    }
756
757    pub fn schema(&self) -> &Schema {
758        self.plan_base().schema()
759    }
760
761    pub fn stream_key(&self) -> Option<&[usize]> {
762        self.plan_base().stream_key()
763    }
764
765    pub fn functional_dependency(&self) -> &FunctionalDependencySet {
766        self.plan_base().functional_dependency()
767    }
768}
769
770/// Recursion depth threshold for plan node visitor to send notice to user.
771pub const PLAN_DEPTH_THRESHOLD: usize = 30;
772/// Notice message for plan node visitor to send to user when the depth threshold is reached.
773pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
774Consider simplifying or splitting the query if you encounter any issues.";
775
776impl dyn PlanNode {
777    /// Serialize the plan node and its children to a stream plan proto.
778    ///
779    /// Note that some operators has their own implementation of `to_stream_prost`. We have a
780    /// hook inside to do some ad-hoc things.
781    pub fn to_stream_prost(
782        &self,
783        state: &mut BuildFragmentGraphState,
784    ) -> SchedulerResult<PbStreamPlan> {
785        recursive::tracker!().recurse(|t| {
786            if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
787                notice_to_user(PLAN_TOO_DEEP_NOTICE);
788            }
789
790            use stream::prelude::*;
791
792            if let Some(stream_table_scan) = self.as_stream_table_scan() {
793                return stream_table_scan.adhoc_to_stream_prost(state);
794            }
795            if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
796                return stream_cdc_table_scan.adhoc_to_stream_prost(state);
797            }
798            if let Some(stream_source_scan) = self.as_stream_source_scan() {
799                return stream_source_scan.adhoc_to_stream_prost(state);
800            }
801            if let Some(stream_share) = self.as_stream_share() {
802                return stream_share.adhoc_to_stream_prost(state);
803            }
804
805            let node = Some(self.try_to_stream_prost_body(state)?);
806            let input = self
807                .inputs()
808                .into_iter()
809                .map(|plan| plan.to_stream_prost(state))
810                .try_collect()?;
811            // TODO: support pk_indices and operator_id
812            Ok(PbStreamPlan {
813                input,
814                identity: self.explain_myself_to_string(),
815                node_body: node,
816                operator_id: self.id().0 as _,
817                stream_key: self
818                    .stream_key()
819                    .unwrap_or_default()
820                    .iter()
821                    .map(|x| *x as u32)
822                    .collect(),
823                fields: self.schema().to_prost(),
824                append_only: self.plan_base().append_only(),
825            })
826        })
827    }
828
829    /// Serialize the plan node and its children to a batch plan proto.
830    pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
831        self.to_batch_prost_identity(true)
832    }
833
834    /// Serialize the plan node and its children to a batch plan proto without the identity field
835    /// (for testing).
836    pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
837        recursive::tracker!().recurse(|t| {
838            if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
839                notice_to_user(PLAN_TOO_DEEP_NOTICE);
840            }
841
842            let node_body = Some(self.try_to_batch_prost_body()?);
843            let children = self
844                .inputs()
845                .into_iter()
846                .map(|plan| plan.to_batch_prost_identity(identity))
847                .try_collect()?;
848            Ok(PbBatchPlan {
849                children,
850                identity: if identity {
851                    self.explain_myself_to_string()
852                } else {
853                    "".into()
854                },
855                node_body,
856            })
857        })
858    }
859
860    pub fn explain_myself_to_string(&self) -> String {
861        self.distill_to_string()
862    }
863}
864
865mod plan_base;
866pub use plan_base::*;
867#[macro_use]
868mod plan_tree_node;
869pub use plan_tree_node::*;
870mod col_pruning;
871pub use col_pruning::*;
872mod expr_rewritable;
873pub use expr_rewritable::*;
874mod expr_visitable;
875
876mod convert;
877pub use convert::*;
878mod eq_join_predicate;
879pub use eq_join_predicate::*;
880mod to_prost;
881pub use to_prost::*;
882mod predicate_pushdown;
883pub use predicate_pushdown::*;
884mod merge_eq_nodes;
885pub use merge_eq_nodes::*;
886
887pub mod batch;
888pub mod generic;
889pub mod stream;
890
891pub use generic::{PlanAggCall, PlanAggCallDisplay};
892
893mod batch_delete;
894mod batch_exchange;
895mod batch_expand;
896mod batch_filter;
897mod batch_group_topn;
898mod batch_hash_agg;
899mod batch_hash_join;
900mod batch_hop_window;
901mod batch_insert;
902mod batch_limit;
903mod batch_log_seq_scan;
904mod batch_lookup_join;
905mod batch_max_one_row;
906mod batch_nested_loop_join;
907mod batch_over_window;
908mod batch_project;
909mod batch_project_set;
910mod batch_seq_scan;
911mod batch_simple_agg;
912mod batch_sort;
913mod batch_sort_agg;
914mod batch_source;
915mod batch_sys_seq_scan;
916mod batch_table_function;
917mod batch_topn;
918mod batch_union;
919mod batch_update;
920mod batch_values;
921mod logical_agg;
922mod logical_apply;
923mod logical_cdc_scan;
924mod logical_changelog;
925mod logical_cte_ref;
926mod logical_dedup;
927mod logical_delete;
928mod logical_except;
929mod logical_expand;
930mod logical_filter;
931mod logical_hop_window;
932mod logical_insert;
933mod logical_intersect;
934mod logical_join;
935mod logical_kafka_scan;
936mod logical_limit;
937mod logical_max_one_row;
938mod logical_multi_join;
939mod logical_now;
940mod logical_over_window;
941mod logical_project;
942mod logical_project_set;
943mod logical_recursive_union;
944mod logical_scan;
945mod logical_share;
946mod logical_source;
947mod logical_sys_scan;
948mod logical_table_function;
949mod logical_topn;
950mod logical_union;
951mod logical_update;
952mod logical_values;
953mod stream_asof_join;
954mod stream_changelog;
955mod stream_dedup;
956mod stream_delta_join;
957mod stream_dml;
958mod stream_dynamic_filter;
959mod stream_eowc_over_window;
960mod stream_exchange;
961mod stream_expand;
962mod stream_filter;
963mod stream_fs_fetch;
964mod stream_global_approx_percentile;
965mod stream_group_topn;
966mod stream_hash_agg;
967mod stream_hash_join;
968mod stream_hop_window;
969mod stream_join_common;
970mod stream_local_approx_percentile;
971mod stream_materialize;
972mod stream_now;
973mod stream_over_window;
974mod stream_project;
975mod stream_project_set;
976mod stream_row_id_gen;
977mod stream_row_merge;
978mod stream_simple_agg;
979mod stream_sink;
980mod stream_sort;
981mod stream_source;
982mod stream_source_scan;
983mod stream_stateless_simple_agg;
984mod stream_sync_log_store;
985mod stream_table_scan;
986mod stream_topn;
987mod stream_values;
988mod stream_watermark_filter;
989
990mod batch_file_scan;
991mod batch_iceberg_scan;
992mod batch_kafka_scan;
993mod batch_postgres_query;
994
995mod batch_mysql_query;
996mod derive;
997mod logical_file_scan;
998mod logical_iceberg_scan;
999mod logical_postgres_query;
1000
1001mod logical_mysql_query;
1002mod stream_cdc_table_scan;
1003mod stream_share;
1004mod stream_temporal_join;
1005mod stream_union;
1006pub mod utils;
1007
1008pub use batch_delete::BatchDelete;
1009pub use batch_exchange::BatchExchange;
1010pub use batch_expand::BatchExpand;
1011pub use batch_file_scan::BatchFileScan;
1012pub use batch_filter::BatchFilter;
1013pub use batch_group_topn::BatchGroupTopN;
1014pub use batch_hash_agg::BatchHashAgg;
1015pub use batch_hash_join::BatchHashJoin;
1016pub use batch_hop_window::BatchHopWindow;
1017pub use batch_iceberg_scan::BatchIcebergScan;
1018pub use batch_insert::BatchInsert;
1019pub use batch_kafka_scan::BatchKafkaScan;
1020pub use batch_limit::BatchLimit;
1021pub use batch_log_seq_scan::BatchLogSeqScan;
1022pub use batch_lookup_join::BatchLookupJoin;
1023pub use batch_max_one_row::BatchMaxOneRow;
1024pub use batch_mysql_query::BatchMySqlQuery;
1025pub use batch_nested_loop_join::BatchNestedLoopJoin;
1026pub use batch_over_window::BatchOverWindow;
1027pub use batch_postgres_query::BatchPostgresQuery;
1028pub use batch_project::BatchProject;
1029pub use batch_project_set::BatchProjectSet;
1030pub use batch_seq_scan::BatchSeqScan;
1031pub use batch_simple_agg::BatchSimpleAgg;
1032pub use batch_sort::BatchSort;
1033pub use batch_sort_agg::BatchSortAgg;
1034pub use batch_source::BatchSource;
1035pub use batch_sys_seq_scan::BatchSysSeqScan;
1036pub use batch_table_function::BatchTableFunction;
1037pub use batch_topn::BatchTopN;
1038pub use batch_union::BatchUnion;
1039pub use batch_update::BatchUpdate;
1040pub use batch_values::BatchValues;
1041pub use logical_agg::LogicalAgg;
1042pub use logical_apply::LogicalApply;
1043pub use logical_cdc_scan::LogicalCdcScan;
1044pub use logical_changelog::LogicalChangeLog;
1045pub use logical_cte_ref::LogicalCteRef;
1046pub use logical_dedup::LogicalDedup;
1047pub use logical_delete::LogicalDelete;
1048pub use logical_except::LogicalExcept;
1049pub use logical_expand::LogicalExpand;
1050pub use logical_file_scan::LogicalFileScan;
1051pub use logical_filter::LogicalFilter;
1052pub use logical_hop_window::LogicalHopWindow;
1053pub use logical_iceberg_scan::LogicalIcebergScan;
1054pub use logical_insert::LogicalInsert;
1055pub use logical_intersect::LogicalIntersect;
1056pub use logical_join::LogicalJoin;
1057pub use logical_kafka_scan::LogicalKafkaScan;
1058pub use logical_limit::LogicalLimit;
1059pub use logical_max_one_row::LogicalMaxOneRow;
1060pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder};
1061pub use logical_mysql_query::LogicalMySqlQuery;
1062pub use logical_now::LogicalNow;
1063pub use logical_over_window::LogicalOverWindow;
1064pub use logical_postgres_query::LogicalPostgresQuery;
1065pub use logical_project::LogicalProject;
1066pub use logical_project_set::LogicalProjectSet;
1067pub use logical_recursive_union::LogicalRecursiveUnion;
1068pub use logical_scan::LogicalScan;
1069pub use logical_share::LogicalShare;
1070pub use logical_source::LogicalSource;
1071pub use logical_sys_scan::LogicalSysScan;
1072pub use logical_table_function::LogicalTableFunction;
1073pub use logical_topn::LogicalTopN;
1074pub use logical_union::LogicalUnion;
1075pub use logical_update::LogicalUpdate;
1076pub use logical_values::LogicalValues;
1077pub use stream_asof_join::StreamAsOfJoin;
1078pub use stream_cdc_table_scan::StreamCdcTableScan;
1079pub use stream_changelog::StreamChangeLog;
1080pub use stream_dedup::StreamDedup;
1081pub use stream_delta_join::StreamDeltaJoin;
1082pub use stream_dml::StreamDml;
1083pub use stream_dynamic_filter::StreamDynamicFilter;
1084pub use stream_eowc_over_window::StreamEowcOverWindow;
1085pub use stream_exchange::StreamExchange;
1086pub use stream_expand::StreamExpand;
1087pub use stream_filter::StreamFilter;
1088pub use stream_fs_fetch::StreamFsFetch;
1089pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
1090pub use stream_group_topn::StreamGroupTopN;
1091pub use stream_hash_agg::StreamHashAgg;
1092pub use stream_hash_join::StreamHashJoin;
1093pub use stream_hop_window::StreamHopWindow;
1094use stream_join_common::StreamJoinCommon;
1095pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
1096pub use stream_materialize::StreamMaterialize;
1097pub use stream_now::StreamNow;
1098pub use stream_over_window::StreamOverWindow;
1099pub use stream_project::StreamProject;
1100pub use stream_project_set::StreamProjectSet;
1101pub use stream_row_id_gen::StreamRowIdGen;
1102pub use stream_row_merge::StreamRowMerge;
1103pub use stream_share::StreamShare;
1104pub use stream_simple_agg::StreamSimpleAgg;
1105pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
1106pub use stream_sort::StreamEowcSort;
1107pub use stream_source::StreamSource;
1108pub use stream_source_scan::StreamSourceScan;
1109pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
1110pub use stream_sync_log_store::StreamSyncLogStore;
1111pub use stream_table_scan::StreamTableScan;
1112pub use stream_temporal_join::StreamTemporalJoin;
1113pub use stream_topn::StreamTopN;
1114pub use stream_union::StreamUnion;
1115pub use stream_values::StreamValues;
1116pub use stream_watermark_filter::StreamWatermarkFilter;
1117
1118use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
1119use crate::optimizer::optimizer_context::OptimizerContextRef;
1120use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
1121use crate::optimizer::plan_rewriter::PlanCloner;
1122use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
1123use crate::scheduler::SchedulerResult;
1124use crate::stream_fragmenter::BuildFragmentGraphState;
1125use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};
1126
1127/// `for_all_plan_nodes` includes all plan nodes. If you added a new plan node
1128/// inside the project, be sure to add here and in its conventions like `for_logical_plan_nodes`
1129///
1130/// Every tuple has two elements, where `{ convention, name }`
1131/// You can use it as follows
1132/// ```rust
1133/// macro_rules! use_plan {
1134///     ($({ $convention:ident, $name:ident }),*) => {};
1135/// }
1136/// risingwave_frontend::for_all_plan_nodes! { use_plan }
1137/// ```
1138/// See the following implementations for example.
1139#[macro_export]
1140macro_rules! for_all_plan_nodes {
1141    ($macro:ident) => {
1142        $macro! {
1143              { Logical, Agg }
1144            , { Logical, Apply }
1145            , { Logical, Filter }
1146            , { Logical, Project }
1147            , { Logical, Scan }
1148            , { Logical, CdcScan }
1149            , { Logical, SysScan }
1150            , { Logical, Source }
1151            , { Logical, Insert }
1152            , { Logical, Delete }
1153            , { Logical, Update }
1154            , { Logical, Join }
1155            , { Logical, Values }
1156            , { Logical, Limit }
1157            , { Logical, TopN }
1158            , { Logical, HopWindow }
1159            , { Logical, TableFunction }
1160            , { Logical, MultiJoin }
1161            , { Logical, Expand }
1162            , { Logical, ProjectSet }
1163            , { Logical, Union }
1164            , { Logical, OverWindow }
1165            , { Logical, Share }
1166            , { Logical, Now }
1167            , { Logical, Dedup }
1168            , { Logical, Intersect }
1169            , { Logical, Except }
1170            , { Logical, MaxOneRow }
1171            , { Logical, KafkaScan }
1172            , { Logical, IcebergScan }
1173            , { Logical, RecursiveUnion }
1174            , { Logical, CteRef }
1175            , { Logical, ChangeLog }
1176            , { Logical, FileScan }
1177            , { Logical, PostgresQuery }
1178            , { Logical, MySqlQuery }
1179            , { Batch, SimpleAgg }
1180            , { Batch, HashAgg }
1181            , { Batch, SortAgg }
1182            , { Batch, Project }
1183            , { Batch, Filter }
1184            , { Batch, Insert }
1185            , { Batch, Delete }
1186            , { Batch, Update }
1187            , { Batch, SeqScan }
1188            , { Batch, SysSeqScan }
1189            , { Batch, LogSeqScan }
1190            , { Batch, HashJoin }
1191            , { Batch, NestedLoopJoin }
1192            , { Batch, Values }
1193            , { Batch, Sort }
1194            , { Batch, Exchange }
1195            , { Batch, Limit }
1196            , { Batch, TopN }
1197            , { Batch, HopWindow }
1198            , { Batch, TableFunction }
1199            , { Batch, Expand }
1200            , { Batch, LookupJoin }
1201            , { Batch, ProjectSet }
1202            , { Batch, Union }
1203            , { Batch, GroupTopN }
1204            , { Batch, Source }
1205            , { Batch, OverWindow }
1206            , { Batch, MaxOneRow }
1207            , { Batch, KafkaScan }
1208            , { Batch, IcebergScan }
1209            , { Batch, FileScan }
1210            , { Batch, PostgresQuery }
1211            , { Batch, MySqlQuery }
1212            , { Stream, Project }
1213            , { Stream, Filter }
1214            , { Stream, TableScan }
1215            , { Stream, CdcTableScan }
1216            , { Stream, Sink }
1217            , { Stream, Source }
1218            , { Stream, SourceScan }
1219            , { Stream, HashJoin }
1220            , { Stream, Exchange }
1221            , { Stream, HashAgg }
1222            , { Stream, SimpleAgg }
1223            , { Stream, StatelessSimpleAgg }
1224            , { Stream, Materialize }
1225            , { Stream, TopN }
1226            , { Stream, HopWindow }
1227            , { Stream, DeltaJoin }
1228            , { Stream, Expand }
1229            , { Stream, DynamicFilter }
1230            , { Stream, ProjectSet }
1231            , { Stream, GroupTopN }
1232            , { Stream, Union }
1233            , { Stream, RowIdGen }
1234            , { Stream, Dml }
1235            , { Stream, Now }
1236            , { Stream, Share }
1237            , { Stream, WatermarkFilter }
1238            , { Stream, TemporalJoin }
1239            , { Stream, Values }
1240            , { Stream, Dedup }
1241            , { Stream, EowcOverWindow }
1242            , { Stream, EowcSort }
1243            , { Stream, OverWindow }
1244            , { Stream, FsFetch }
1245            , { Stream, ChangeLog }
1246            , { Stream, GlobalApproxPercentile }
1247            , { Stream, LocalApproxPercentile }
1248            , { Stream, RowMerge }
1249            , { Stream, AsOfJoin }
1250            , { Stream, SyncLogStore }
1251        }
1252    };
1253}
1254
1255/// `for_logical_plan_nodes` includes all plan nodes with logical convention.
1256#[macro_export]
1257macro_rules! for_logical_plan_nodes {
1258    ($macro:ident) => {
1259        $macro! {
1260              { Logical, Agg }
1261            , { Logical, Apply }
1262            , { Logical, Filter }
1263            , { Logical, Project }
1264            , { Logical, Scan }
1265            , { Logical, CdcScan }
1266            , { Logical, SysScan }
1267            , { Logical, Source }
1268            , { Logical, Insert }
1269            , { Logical, Delete }
1270            , { Logical, Update }
1271            , { Logical, Join }
1272            , { Logical, Values }
1273            , { Logical, Limit }
1274            , { Logical, TopN }
1275            , { Logical, HopWindow }
1276            , { Logical, TableFunction }
1277            , { Logical, MultiJoin }
1278            , { Logical, Expand }
1279            , { Logical, ProjectSet }
1280            , { Logical, Union }
1281            , { Logical, OverWindow }
1282            , { Logical, Share }
1283            , { Logical, Now }
1284            , { Logical, Dedup }
1285            , { Logical, Intersect }
1286            , { Logical, Except }
1287            , { Logical, MaxOneRow }
1288            , { Logical, KafkaScan }
1289            , { Logical, IcebergScan }
1290            , { Logical, RecursiveUnion }
1291            , { Logical, CteRef }
1292            , { Logical, ChangeLog }
1293            , { Logical, FileScan }
1294            , { Logical, PostgresQuery }
1295            , { Logical, MySqlQuery }
1296        }
1297    };
1298}
1299
1300/// `for_batch_plan_nodes` includes all plan nodes with batch convention.
1301#[macro_export]
1302macro_rules! for_batch_plan_nodes {
1303    ($macro:ident) => {
1304        $macro! {
1305              { Batch, SimpleAgg }
1306            , { Batch, HashAgg }
1307            , { Batch, SortAgg }
1308            , { Batch, Project }
1309            , { Batch, Filter }
1310            , { Batch, SeqScan }
1311            , { Batch, SysSeqScan }
1312            , { Batch, LogSeqScan }
1313            , { Batch, HashJoin }
1314            , { Batch, NestedLoopJoin }
1315            , { Batch, Values }
1316            , { Batch, Limit }
1317            , { Batch, Sort }
1318            , { Batch, TopN }
1319            , { Batch, Exchange }
1320            , { Batch, Insert }
1321            , { Batch, Delete }
1322            , { Batch, Update }
1323            , { Batch, HopWindow }
1324            , { Batch, TableFunction }
1325            , { Batch, Expand }
1326            , { Batch, LookupJoin }
1327            , { Batch, ProjectSet }
1328            , { Batch, Union }
1329            , { Batch, GroupTopN }
1330            , { Batch, Source }
1331            , { Batch, OverWindow }
1332            , { Batch, MaxOneRow }
1333            , { Batch, KafkaScan }
1334            , { Batch, IcebergScan }
1335            , { Batch, FileScan }
1336            , { Batch, PostgresQuery }
1337            , { Batch, MySqlQuery }
1338        }
1339    };
1340}
1341
1342/// `for_stream_plan_nodes` includes all plan nodes with stream convention.
1343#[macro_export]
1344macro_rules! for_stream_plan_nodes {
1345    ($macro:ident) => {
1346        $macro! {
1347              { Stream, Project }
1348            , { Stream, Filter }
1349            , { Stream, HashJoin }
1350            , { Stream, Exchange }
1351            , { Stream, TableScan }
1352            , { Stream, CdcTableScan }
1353            , { Stream, Sink }
1354            , { Stream, Source }
1355            , { Stream, SourceScan }
1356            , { Stream, HashAgg }
1357            , { Stream, SimpleAgg }
1358            , { Stream, StatelessSimpleAgg }
1359            , { Stream, Materialize }
1360            , { Stream, TopN }
1361            , { Stream, HopWindow }
1362            , { Stream, DeltaJoin }
1363            , { Stream, Expand }
1364            , { Stream, DynamicFilter }
1365            , { Stream, ProjectSet }
1366            , { Stream, GroupTopN }
1367            , { Stream, Union }
1368            , { Stream, RowIdGen }
1369            , { Stream, Dml }
1370            , { Stream, Now }
1371            , { Stream, Share }
1372            , { Stream, WatermarkFilter }
1373            , { Stream, TemporalJoin }
1374            , { Stream, Values }
1375            , { Stream, Dedup }
1376            , { Stream, EowcOverWindow }
1377            , { Stream, EowcSort }
1378            , { Stream, OverWindow }
1379            , { Stream, FsFetch }
1380            , { Stream, ChangeLog }
1381            , { Stream, GlobalApproxPercentile }
1382            , { Stream, LocalApproxPercentile }
1383            , { Stream, RowMerge }
1384            , { Stream, AsOfJoin }
1385            , { Stream, SyncLogStore }
1386        }
1387    };
1388}
1389
1390/// impl [`PlanNodeType`] fn for each node.
1391macro_rules! impl_plan_node_meta {
1392    ($( { $convention:ident, $name:ident }),*) => {
1393        paste!{
1394            /// each enum value represent a PlanNode struct type, help us to dispatch and downcast
1395            #[derive(Copy, Clone, PartialEq, Debug, Hash, Eq, Serialize)]
1396            pub enum PlanNodeType {
1397                $( [<$convention $name>] ),*
1398            }
1399
1400            $(impl PlanNodeMeta for [<$convention $name>] {
1401                type Convention = $convention;
1402                const NODE_TYPE: PlanNodeType = PlanNodeType::[<$convention $name>];
1403
1404                fn plan_base(&self) -> &PlanBase<$convention> {
1405                    &self.base
1406                }
1407
1408                fn plan_base_ref(&self) -> PlanBaseRef<'_> {
1409                    PlanBaseRef::$convention(&self.base)
1410                }
1411            }
1412
1413            impl Deref for [<$convention $name>] {
1414                type Target = PlanBase<$convention>;
1415
1416                fn deref(&self) -> &Self::Target {
1417                    &self.base
1418                }
1419            })*
1420        }
1421    }
1422}
1423
1424for_all_plan_nodes! { impl_plan_node_meta }
1425
1426macro_rules! impl_plan_node {
1427    ($({ $convention:ident, $name:ident }),*) => {
1428        paste!{
1429            $(impl PlanNode for [<$convention $name>] { })*
1430        }
1431    }
1432}
1433
1434for_all_plan_nodes! { impl_plan_node }
1435
1436/// impl plan node downcast fn for each node.
1437macro_rules! impl_down_cast_fn {
1438    ($( { $convention:ident, $name:ident }),*) => {
1439        paste!{
1440            impl dyn PlanNode {
1441                $( pub fn [< as_$convention:snake _ $name:snake>](&self) -> Option<&[<$convention $name>]> {
1442                    self.downcast_ref::<[<$convention $name>]>()
1443                } )*
1444            }
1445        }
1446    }
1447}
1448
1449for_all_plan_nodes! { impl_down_cast_fn }