risingwave_frontend/optimizer/plan_node/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines all kinds of node in the plan tree, each node represent a relational expression.
16//!
17//! We use a immutable style tree structure, every Node are immutable and cannot be modified after
18//! it has been created. If you want to modify the node, such as rewriting the expression in a
19//! `ProjectNode` or changing a node's input node, you need to create a new node. We use Rc as the
20//! node's reference, and a node just storage its inputs' reference, so change a node just need
21//! create one new node but not the entire sub-tree.
22//!
23//! So when you want to add a new node, make sure:
24//! - each field in the node struct are private
25//! - recommend to implement the construction of Node in a unified `new()` function, if have multi
26//!   methods to construct, make they have a consistent behavior
27//! - all field should be valued in construction, so the properties' derivation should be finished
28//!   in the `new()` function.
29
30use std::collections::HashMap;
31use std::fmt::Debug;
32use std::hash::Hash;
33use std::marker::PhantomData;
34use std::ops::Deref;
35use std::rc::Rc;
36
37use downcast_rs::{Downcast, impl_downcast};
38use dyn_clone::DynClone;
39use itertools::Itertools;
40use paste::paste;
41use petgraph::dot::{Config, Dot};
42use petgraph::graph::Graph;
43use pretty_xmlish::{Pretty, PrettyConfig};
44use risingwave_common::catalog::Schema;
45use risingwave_common::util::recursive::{self, Recurse};
46use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
47use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
48use serde::Serialize;
49
50use self::batch::BatchPlanNodeMetadata;
51use self::generic::{GenericPlanRef, PhysicalPlanRef};
52use self::stream::StreamPlanNodeMetadata;
53use self::utils::Distill;
54use super::property::{
55    Distribution, FunctionalDependencySet, MonotonicityMap, Order, WatermarkColumns,
56};
57use crate::error::{ErrorCode, Result};
58use crate::optimizer::ExpressionSimplifyRewriter;
59use crate::optimizer::property::StreamKind;
60use crate::session::current::notice_to_user;
61use crate::utils::{PrettySerde, build_graph_from_pretty};
62
63/// A marker trait for different conventions, used for enforcing type safety.
64///
65/// Implementors are [`Logical`], [`Batch`], and [`Stream`].
66pub trait ConventionMarker: 'static + Sized + Clone + Debug + Eq + PartialEq + Hash {
67    /// The extra fields in the [`PlanBase`] of this convention.
68    type Extra: 'static + Eq + Hash + Clone + Debug;
69    type ShareNode: ShareNode<Self>;
70    type PlanRefDyn: PlanNodeCommon<Self> + Eq + Hash + ?Sized;
71    type PlanNodeType;
72
73    fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode>;
74}
75
76pub trait ShareNode<C: ConventionMarker>:
77    AnyPlanNodeMeta<C> + PlanTreeNodeUnary<C> + 'static
78{
79    fn new_share(share: generic::Share<PlanRef<C>>) -> PlanRef<C>;
80    fn replace_input(&self, plan: PlanRef<C>);
81}
82
83pub struct NoShareNode<C: ConventionMarker>(!, PhantomData<C>);
84
85impl<C: ConventionMarker> ShareNode<C> for NoShareNode<C> {
86    fn new_share(_plan: generic::Share<PlanRef<C>>) -> PlanRef<C> {
87        unreachable!()
88    }
89
90    fn replace_input(&self, _plan: PlanRef<C>) {
91        unreachable!()
92    }
93}
94
95impl<C: ConventionMarker> PlanTreeNodeUnary<C> for NoShareNode<C> {
96    fn input(&self) -> PlanRef<C> {
97        unreachable!()
98    }
99
100    fn clone_with_input(&self, _input: PlanRef<C>) -> Self {
101        unreachable!()
102    }
103}
104
105impl<C: ConventionMarker> AnyPlanNodeMeta<C> for NoShareNode<C> {
106    fn node_type(&self) -> C::PlanNodeType {
107        unreachable!()
108    }
109
110    fn plan_base(&self) -> &PlanBase<C> {
111        unreachable!()
112    }
113}
114
115/// The marker for logical convention.
116#[derive(Clone, Debug, Eq, PartialEq, Hash)]
117pub struct Logical;
118impl ConventionMarker for Logical {
119    type Extra = plan_base::NoExtra;
120    type PlanNodeType = LogicalPlanNodeType;
121    type PlanRefDyn = dyn LogicalPlanNode;
122    type ShareNode = LogicalShare;
123
124    fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
125        plan.as_logical_share()
126    }
127}
128
129/// The marker for batch convention.
130#[derive(Clone, Debug, Eq, PartialEq, Hash)]
131pub struct Batch;
132impl ConventionMarker for Batch {
133    type Extra = plan_base::BatchExtra;
134    type PlanNodeType = BatchPlanNodeType;
135    type PlanRefDyn = dyn BatchPlanNode;
136    type ShareNode = NoShareNode<Batch>;
137
138    fn as_share(_plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
139        None
140    }
141}
142
143/// The marker for stream convention.
144#[derive(Clone, Debug, Eq, PartialEq, Hash)]
145pub struct Stream;
146impl ConventionMarker for Stream {
147    type Extra = plan_base::StreamExtra;
148    type PlanNodeType = StreamPlanNodeType;
149    type PlanRefDyn = dyn StreamPlanNode;
150    type ShareNode = StreamShare;
151
152    fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
153        plan.as_stream_share()
154    }
155}
156
157/// The trait for accessing the meta data and [`PlanBase`] for plan nodes.
158pub trait PlanNodeMeta {
159    type Convention: ConventionMarker;
160    const NODE_TYPE: <Self::Convention as ConventionMarker>::PlanNodeType;
161    /// Get the reference to the [`PlanBase`] with corresponding convention.
162    fn plan_base(&self) -> &PlanBase<Self::Convention>;
163}
164
165// Intentionally made private.
166mod plan_node_meta {
167    use super::*;
168
169    /// The object-safe version of [`PlanNodeMeta`], used as a super trait of `PlanNode`.
170    ///
171    /// Check [`PlanNodeMeta`] for more details.
172    pub trait AnyPlanNodeMeta<C: ConventionMarker> {
173        fn node_type(&self) -> C::PlanNodeType;
174        fn plan_base(&self) -> &PlanBase<C>;
175    }
176
177    /// Implement [`AnyPlanNodeMeta`] for all [`PlanNodeMeta`].
178    impl<P> AnyPlanNodeMeta<P::Convention> for P
179    where
180        P: PlanNodeMeta,
181    {
182        fn node_type(&self) -> <P::Convention as ConventionMarker>::PlanNodeType {
183            P::NODE_TYPE
184        }
185
186        fn plan_base(&self) -> &PlanBase<P::Convention> {
187            <Self as PlanNodeMeta>::plan_base(self)
188        }
189    }
190}
191use plan_node_meta::AnyPlanNodeMeta;
192
193pub trait PlanNodeCommon<C: ConventionMarker> = PlanTreeNode<C>
194    + DynClone
195    + DynEq
196    + DynHash
197    + Distill
198    + Debug
199    + Downcast
200    + ExprRewritable<C>
201    + ExprVisitable
202    + AnyPlanNodeMeta<C>;
203
204/// The common trait over all plan nodes. Used by optimizer framework which will treat all node as
205/// `dyn PlanNode`
206///
207/// We split the trait into lots of sub-trait so that we can easily use macro to impl them.
208pub trait StreamPlanNode: PlanNodeCommon<Stream> + TryToStreamPb {}
209pub trait BatchPlanNode:
210    PlanNodeCommon<Batch> + ToDistributedBatch + ToLocalBatch + TryToBatchPb
211{
212}
213pub trait LogicalPlanNode:
214    PlanNodeCommon<Logical> + ColPrunable + PredicatePushdown + ToBatch + ToStream
215{
216}
217
218macro_rules! impl_trait {
219    ($($convention:ident),+) => {
220        paste! {
221            $(
222                impl Hash for dyn [<$convention  PlanNode>] {
223                    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
224                        self.dyn_hash(state);
225                    }
226                }
227
228                impl PartialEq for dyn [<$convention  PlanNode>] {
229                    fn eq(&self, other: &Self) -> bool {
230                        self.dyn_eq(other.as_dyn_eq())
231                    }
232                }
233
234                impl Eq for dyn [<$convention  PlanNode>] {}
235            )+
236        }
237    };
238}
239
240impl_trait!(Batch, Stream, Logical);
241impl_downcast!(BatchPlanNode);
242impl_downcast!(LogicalPlanNode);
243impl_downcast!(StreamPlanNode);
244
245// Using a new type wrapper allows direct function implementation on `PlanRef`,
246// and we currently need a manual implementation of `PartialEq` for `PlanRef`.
247#[allow(clippy::derived_hash_with_manual_eq)]
248#[derive(Debug, Eq, Hash)]
249pub struct PlanRef<C: ConventionMarker>(Rc<C::PlanRefDyn>);
250
251impl<C: ConventionMarker> Clone for PlanRef<C> {
252    fn clone(&self) -> Self {
253        Self(self.0.clone())
254    }
255}
256
257pub type LogicalPlanRef = PlanRef<Logical>;
258pub type StreamPlanRef = PlanRef<Stream>;
259pub type BatchPlanRef = PlanRef<Batch>;
260
261// Cannot use the derived implementation for now.
262// See https://github.com/rust-lang/rust/issues/31740
263#[allow(clippy::op_ref)]
264impl<C: ConventionMarker> PartialEq for PlanRef<C> {
265    fn eq(&self, other: &Self) -> bool {
266        &self.0 == &other.0
267    }
268}
269
270impl<C: ConventionMarker> Deref for PlanRef<C> {
271    type Target = C::PlanRefDyn;
272
273    fn deref(&self) -> &Self::Target {
274        self.0.deref()
275    }
276}
277
278impl<T: LogicalPlanNode> From<T> for PlanRef<Logical> {
279    fn from(value: T) -> Self {
280        PlanRef(Rc::new(value) as _)
281    }
282}
283
284impl<T: StreamPlanNode> From<T> for PlanRef<Stream> {
285    fn from(value: T) -> Self {
286        PlanRef(Rc::new(value) as _)
287    }
288}
289
290impl<T: BatchPlanNode> From<T> for PlanRef<Batch> {
291    fn from(value: T) -> Self {
292        PlanRef(Rc::new(value) as _)
293    }
294}
295
296impl<C: ConventionMarker> Layer for PlanRef<C> {
297    type Sub = Self;
298
299    fn map<F>(self, f: F) -> Self
300    where
301        F: FnMut(Self::Sub) -> Self::Sub,
302    {
303        self.clone_root_with_inputs(&self.inputs().into_iter().map(f).collect_vec())
304    }
305
306    fn descent<F>(&self, f: F)
307    where
308        F: FnMut(&Self::Sub),
309    {
310        self.inputs().iter().for_each(f);
311    }
312}
313
314#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
315pub struct PlanNodeId(pub i32);
316
317/// A more sophisticated `Endo` taking into account of the DAG structure of `PlanRef`.
318/// In addition to `Endo`, one have to specify the `cached` function
319/// to persist transformed `LogicalShare` and their results,
320/// and the `dag_apply` function will take care to only transform every `LogicalShare` nodes once.
321///
322/// Note: Due to the way super trait is designed in rust,
323/// one need to have separate implementation blocks of `Endo<PlanRef>` and `EndoPlan`.
324/// And conventionally the real transformation `apply` is under `Endo<PlanRef>`,
325/// although one can refer to `dag_apply` in the implementation of `apply`.
326pub trait EndoPlan: Endo<LogicalPlanRef> {
327    // Return the cached result of `plan` if present,
328    // otherwise store and return the value provided by `f`.
329    // Notice that to allow mutable access of `self` in `f`,
330    // we let `f` to take `&mut Self` as its first argument.
331    fn cached<F>(&mut self, plan: LogicalPlanRef, f: F) -> LogicalPlanRef
332    where
333        F: FnMut(&mut Self) -> LogicalPlanRef;
334
335    fn dag_apply(&mut self, plan: LogicalPlanRef) -> LogicalPlanRef {
336        match plan.as_logical_share() {
337            Some(_) => self.cached(plan.clone(), |this| this.tree_apply(plan.clone())),
338            None => self.tree_apply(plan),
339        }
340    }
341}
342
343/// A more sophisticated `Visit` taking into account of the DAG structure of `PlanRef`.
344/// In addition to `Visit`, one have to specify `visited`
345/// to store and report visited `LogicalShare` nodes,
346/// and the `dag_visit` function will take care to only visit every `LogicalShare` nodes once.
347/// See also `EndoPlan`.
348pub trait VisitPlan: Visit<LogicalPlanRef> {
349    // Skip visiting `plan` if visited, otherwise run the traversal provided by `f`.
350    // Notice that to allow mutable access of `self` in `f`,
351    // we let `f` to take `&mut Self` as its first argument.
352    fn visited<F>(&mut self, plan: &LogicalPlanRef, f: F)
353    where
354        F: FnMut(&mut Self);
355
356    fn dag_visit(&mut self, plan: &LogicalPlanRef) {
357        match plan.as_logical_share() {
358            Some(_) => self.visited(plan, |this| this.tree_visit(plan)),
359            None => self.tree_visit(plan),
360        }
361    }
362}
363
364impl<C: ConventionMarker> PlanRef<C> {
365    pub fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef<C> {
366        let new = self.rewrite_exprs(r);
367        let inputs: Vec<PlanRef<C>> = new
368            .inputs()
369            .iter()
370            .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r))
371            .collect();
372        new.clone_root_with_inputs(&inputs[..])
373    }
374}
375
376pub(crate) trait VisitExprsRecursive {
377    fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor);
378}
379
380impl<C: ConventionMarker> VisitExprsRecursive for PlanRef<C> {
381    fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor) {
382        self.visit_exprs(r);
383        self.inputs()
384            .iter()
385            .for_each(|plan_ref| plan_ref.visit_exprs_recursive(r));
386    }
387}
388
389impl<C: ConventionMarker> PlanRef<C> {
390    pub fn expect_stream_key(&self) -> &[usize] {
391        self.stream_key().unwrap_or_else(|| {
392            panic!(
393                "a stream key is expected but not exist, plan:\n{}",
394                self.explain_to_string()
395            )
396        })
397    }
398}
399
400impl LogicalPlanRef {
401    fn prune_col_inner(
402        &self,
403        required_cols: &[usize],
404        ctx: &mut ColumnPruningContext,
405    ) -> LogicalPlanRef {
406        if let Some(logical_share) = self.as_logical_share() {
407            // Check the share cache first. If cache exists, it means this is the second round of
408            // column pruning.
409            if let Some((new_share, merge_required_cols)) = ctx.get_share_cache(self.id()) {
410                // Piggyback share remove if its has only one parent.
411                if ctx.get_parent_num(logical_share) == 1 {
412                    let input: LogicalPlanRef = logical_share.input();
413                    return input.prune_col(required_cols, ctx);
414                }
415
416                // If it is the first visit, recursively call `prune_col` for its input and
417                // replace it.
418                if ctx.visit_share_at_first_round(self.id()) {
419                    let new_logical_share: &LogicalShare = new_share
420                        .as_logical_share()
421                        .expect("must be share operator");
422                    let new_share_input = new_logical_share.input().prune_col(
423                        &(0..new_logical_share.base.schema().len()).collect_vec(),
424                        ctx,
425                    );
426                    new_logical_share.replace_input(new_share_input);
427                }
428
429                // Calculate the new required columns based on the new share.
430                let new_required_cols: Vec<usize> = required_cols
431                    .iter()
432                    .map(|col| merge_required_cols.iter().position(|x| x == col).unwrap())
433                    .collect_vec();
434                let mapping = ColIndexMapping::with_remaining_columns(
435                    &new_required_cols,
436                    new_share.schema().len(),
437                );
438                return LogicalProject::with_mapping(new_share, mapping).into();
439            }
440
441            // `LogicalShare` can't clone, so we implement column pruning for `LogicalShare`
442            // here.
443            // Basically, we need to wait for all parents of `LogicalShare` to prune columns before
444            // we merge the required columns and prune.
445            let parent_has_pushed = ctx.add_required_cols(self.id(), required_cols.into());
446            if parent_has_pushed == ctx.get_parent_num(logical_share) {
447                let merge_require_cols = ctx
448                    .take_required_cols(self.id())
449                    .expect("must have required columns")
450                    .into_iter()
451                    .flat_map(|x| x.into_iter())
452                    .sorted()
453                    .dedup()
454                    .collect_vec();
455                let input: LogicalPlanRef = logical_share.input();
456                let input = input.prune_col(&merge_require_cols, ctx);
457
458                // Cache the new share operator for the second round.
459                let new_logical_share = LogicalShare::create(input.clone());
460                ctx.add_share_cache(self.id(), new_logical_share, merge_require_cols.clone());
461
462                let exprs = logical_share
463                    .base
464                    .schema()
465                    .fields
466                    .iter()
467                    .enumerate()
468                    .map(|(i, field)| {
469                        if let Some(pos) = merge_require_cols.iter().position(|x| *x == i) {
470                            ExprImpl::InputRef(Box::new(InputRef::new(
471                                pos,
472                                field.data_type.clone(),
473                            )))
474                        } else {
475                            ExprImpl::Literal(Box::new(Literal::new(None, field.data_type.clone())))
476                        }
477                    })
478                    .collect_vec();
479                let project = LogicalProject::create(input, exprs);
480                logical_share.replace_input(project);
481            }
482            let mapping =
483                ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
484            LogicalProject::with_mapping(self.clone(), mapping).into()
485        } else {
486            // Dispatch to dyn PlanNode instead of PlanRef.
487            let dyn_t = self.deref();
488            dyn_t.prune_col(required_cols, ctx)
489        }
490    }
491
492    fn predicate_pushdown_inner(
493        &self,
494        predicate: Condition,
495        ctx: &mut PredicatePushdownContext,
496    ) -> LogicalPlanRef {
497        if let Some(logical_share) = self.as_logical_share() {
498            // Piggyback share remove if its has only one parent.
499            if ctx.get_parent_num(logical_share) == 1 {
500                let input: LogicalPlanRef = logical_share.input();
501                return input.predicate_pushdown(predicate, ctx);
502            }
503
504            // `LogicalShare` can't clone, so we implement predicate pushdown for `LogicalShare`
505            // here.
506            // Basically, we need to wait for all parents of `LogicalShare` to push down the
507            // predicate before we merge the predicates and pushdown.
508            let parent_has_pushed = ctx.add_predicate(self.id(), predicate.clone());
509            if parent_has_pushed == ctx.get_parent_num(logical_share) {
510                let merge_predicate = ctx
511                    .take_predicate(self.id())
512                    .expect("must have predicate")
513                    .into_iter()
514                    .map(|mut c| Condition {
515                        conjunctions: c
516                            .conjunctions
517                            .extract_if(.., |e| {
518                                // If predicates contain now, impure or correlated input ref, don't push through share operator.
519                                // The predicate with now() function is regarded as a temporal filter predicate, which will be transformed to a temporal filter operator and can not do the OR operation with other predicates.
520                                let mut finder = ExprCorrelatedIdFinder::default();
521                                finder.visit_expr(e);
522                                e.count_nows() == 0
523                                    && e.is_pure()
524                                    && !finder.has_correlated_input_ref()
525                            })
526                            .collect(),
527                    })
528                    .reduce(|a, b| a.or(b))
529                    .unwrap();
530
531                // rewrite the *entire* predicate for `LogicalShare`
532                // before pushing down to whatever plan node(s)
533                // ps: the reason here contains a "special" optimization
534                // rather than directly apply explicit rule in stream or
535                // batch plan optimization, is because predicate push down
536                // will *instantly* push down all predicates, and rule(s)
537                // can not be applied in the middle.
538                // thus we need some on-the-fly (in the middle) rewrite
539                // technique to help with this kind of optimization.
540                let mut expr_rewriter = ExpressionSimplifyRewriter {};
541                let mut new_predicate = Condition::true_cond();
542
543                for c in merge_predicate.conjunctions {
544                    let c = Condition::with_expr(expr_rewriter.rewrite_cond(c));
545                    // rebuild the conjunctions
546                    new_predicate = new_predicate.and(c);
547                }
548
549                let input: LogicalPlanRef = logical_share.input();
550                let input = input.predicate_pushdown(new_predicate, ctx);
551                logical_share.replace_input(input);
552            }
553            LogicalFilter::create(self.clone(), predicate)
554        } else {
555            // Dispatch to dyn PlanNode instead of PlanRef.
556            let dyn_t = self.deref();
557            dyn_t.predicate_pushdown(predicate, ctx)
558        }
559    }
560}
561
562impl ColPrunable for LogicalPlanRef {
563    #[allow(clippy::let_and_return)]
564    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> LogicalPlanRef {
565        let res = self.prune_col_inner(required_cols, ctx);
566        #[cfg(debug_assertions)]
567        super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
568            "column pruning",
569            &LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
570            &res,
571        );
572        res
573    }
574}
575
576impl PredicatePushdown for LogicalPlanRef {
577    #[allow(clippy::let_and_return)]
578    fn predicate_pushdown(
579        &self,
580        predicate: Condition,
581        ctx: &mut PredicatePushdownContext,
582    ) -> LogicalPlanRef {
583        #[cfg(debug_assertions)]
584        let predicate_clone = predicate.clone();
585
586        let res = self.predicate_pushdown_inner(predicate, ctx);
587
588        #[cfg(debug_assertions)]
589        super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
590            "predicate push down",
591            &LogicalFilter::new(self.clone(), predicate_clone).into(),
592            &res,
593        );
594
595        res
596    }
597}
598
599impl<C: ConventionMarker> PlanRef<C> {
600    pub fn clone_root_with_inputs(&self, inputs: &[PlanRef<C>]) -> PlanRef<C> {
601        if let Some(share) = self.as_share_node() {
602            assert_eq!(inputs.len(), 1);
603            // We can't clone `LogicalShare`, but only can replace input instead.
604            share.replace_input(inputs[0].clone());
605            self.clone()
606        } else {
607            // Dispatch to dyn PlanNode instead of PlanRef.
608            let dyn_t = self.deref();
609            dyn_t.clone_with_inputs(inputs)
610        }
611    }
612}
613
614/// Implement again for the `dyn` newtype wrapper.
615impl<C: ConventionMarker> PlanRef<C> {
616    pub fn node_type(&self) -> C::PlanNodeType {
617        self.0.node_type()
618    }
619
620    pub fn plan_base(&self) -> &PlanBase<C> {
621        self.0.plan_base()
622    }
623}
624
625/// Allow access to all fields defined in [`GenericPlanRef`] for the type-erased plan node.
626// TODO: may also implement on `dyn PlanNode` directly.
627impl<C: ConventionMarker> GenericPlanRef for PlanRef<C> {
628    fn id(&self) -> PlanNodeId {
629        self.plan_base().id()
630    }
631
632    fn schema(&self) -> &Schema {
633        self.plan_base().schema()
634    }
635
636    fn stream_key(&self) -> Option<&[usize]> {
637        self.plan_base().stream_key()
638    }
639
640    fn ctx(&self) -> OptimizerContextRef {
641        self.plan_base().ctx()
642    }
643
644    fn functional_dependency(&self) -> &FunctionalDependencySet {
645        self.plan_base().functional_dependency()
646    }
647}
648
649/// Allow access to all fields defined in [`PhysicalPlanRef`] for the type-erased plan node.
650// TODO: may also implement on `dyn PlanNode` directly.
651impl PhysicalPlanRef for BatchPlanRef {
652    fn distribution(&self) -> &Distribution {
653        self.plan_base().distribution()
654    }
655}
656
657impl PhysicalPlanRef for StreamPlanRef {
658    fn distribution(&self) -> &Distribution {
659        self.plan_base().distribution()
660    }
661}
662
663/// Allow access to all fields defined in [`StreamPlanNodeMetadata`] for the type-erased plan node.
664// TODO: may also implement on `dyn PlanNode` directly.
665impl StreamPlanNodeMetadata for StreamPlanRef {
666    fn stream_kind(&self) -> StreamKind {
667        self.plan_base().stream_kind()
668    }
669
670    fn emit_on_window_close(&self) -> bool {
671        self.plan_base().emit_on_window_close()
672    }
673
674    fn watermark_columns(&self) -> &WatermarkColumns {
675        self.plan_base().watermark_columns()
676    }
677
678    fn columns_monotonicity(&self) -> &MonotonicityMap {
679        self.plan_base().columns_monotonicity()
680    }
681}
682
683/// Allow access to all fields defined in [`BatchPlanNodeMetadata`] for the type-erased plan node.
684// TODO: may also implement on `dyn PlanNode` directly.
685impl BatchPlanNodeMetadata for BatchPlanRef {
686    fn order(&self) -> &Order {
687        self.plan_base().order()
688    }
689}
690
691/// In order to let expression display id started from 1 for explaining, hidden column names and
692/// other places. We will reset expression display id to 0 and clone the whole plan to reset the
693/// schema.
694pub fn reorganize_elements_id<C: ConventionMarker>(plan: PlanRef<C>) -> PlanRef<C> {
695    let backup = plan.ctx().backup_elem_ids();
696    plan.ctx().reset_elem_ids();
697    let plan = PlanCloner::clone_whole_plan(plan);
698    plan.ctx().restore_elem_ids(backup);
699    plan
700}
701
702pub trait Explain {
703    /// Write explain the whole plan tree.
704    fn explain<'a>(&self) -> Pretty<'a>;
705
706    /// Write explain the whole plan tree with node id.
707    fn explain_with_id<'a>(&self) -> Pretty<'a>;
708
709    /// Explain the plan node and return a string.
710    fn explain_to_string(&self) -> String;
711
712    /// Explain the plan node and return a json string.
713    fn explain_to_json(&self) -> String;
714
715    /// Explain the plan node and return a xml string.
716    fn explain_to_xml(&self) -> String;
717
718    /// Explain the plan node and return a yaml string.
719    fn explain_to_yaml(&self) -> String;
720
721    /// Explain the plan node and return a dot format string.
722    fn explain_to_dot(&self) -> String;
723}
724
725impl<C: ConventionMarker> Explain for PlanRef<C> {
726    /// Write explain the whole plan tree.
727    fn explain<'a>(&self) -> Pretty<'a> {
728        let mut node = self.distill();
729        let inputs = self.inputs();
730        for input in inputs.iter().peekable() {
731            node.children.push(input.explain());
732        }
733        Pretty::Record(node)
734    }
735
736    /// Write explain the whole plan tree with node id.
737    fn explain_with_id<'a>(&self) -> Pretty<'a> {
738        let node_id = self.id();
739        let mut node = self.distill();
740        // NOTE(kwannoel): Can lead to poor performance if plan is very large,
741        // but we want to show the id first.
742        node.fields
743            .insert(0, ("id".into(), Pretty::display(&node_id.0)));
744        let inputs = self.inputs();
745        for input in inputs.iter().peekable() {
746            node.children.push(input.explain_with_id());
747        }
748        Pretty::Record(node)
749    }
750
751    /// Explain the plan node and return a string.
752    fn explain_to_string(&self) -> String {
753        let plan = reorganize_elements_id(self.clone());
754
755        let mut output = String::with_capacity(2048);
756        let mut config = pretty_config();
757        config.unicode(&mut output, &plan.explain());
758        output
759    }
760
761    /// Explain the plan node and return a json string.
762    fn explain_to_json(&self) -> String {
763        let plan = reorganize_elements_id(self.clone());
764        let explain_ir = plan.explain();
765        serde_json::to_string_pretty(&PrettySerde(explain_ir, true))
766            .expect("failed to serialize plan to json")
767    }
768
769    /// Explain the plan node and return a xml string.
770    fn explain_to_xml(&self) -> String {
771        let plan = reorganize_elements_id(self.clone());
772        let explain_ir = plan.explain();
773        quick_xml::se::to_string(&PrettySerde(explain_ir, true))
774            .expect("failed to serialize plan to xml")
775    }
776
777    /// Explain the plan node and return a yaml string.
778    fn explain_to_yaml(&self) -> String {
779        let plan = reorganize_elements_id(self.clone());
780        let explain_ir = plan.explain();
781        serde_yaml::to_string(&PrettySerde(explain_ir, true))
782            .expect("failed to serialize plan to yaml")
783    }
784
785    /// Explain the plan node and return a dot format string.
786    fn explain_to_dot(&self) -> String {
787        let plan = reorganize_elements_id(self.clone());
788        let explain_ir = plan.explain_with_id();
789        let mut graph = Graph::<String, String>::new();
790        let mut nodes = HashMap::new();
791        build_graph_from_pretty(&explain_ir, &mut graph, &mut nodes, None);
792        let dot = Dot::with_config(&graph, &[Config::EdgeNoLabel]);
793        dot.to_string()
794    }
795}
796
797impl<C: ConventionMarker> PlanRef<C> {
798    pub fn as_share_node(&self) -> Option<&C::ShareNode> {
799        C::as_share(self)
800    }
801}
802
803pub(crate) fn pretty_config() -> PrettyConfig {
804    PrettyConfig {
805        indent: 3,
806        need_boundaries: false,
807        width: 2048,
808        reduced_spaces: true,
809    }
810}
811
812macro_rules! impl_generic_plan_ref_method {
813    ($($convention:ident),+) => {
814        paste! {
815            $(
816                /// Directly implement methods for `PlanNode` to access the fields defined in [`GenericPlanRef`].
817                impl dyn [<$convention PlanNode>] {
818                    pub fn id(&self) -> PlanNodeId {
819                        self.plan_base().id()
820                    }
821
822                    pub fn ctx(&self) -> OptimizerContextRef {
823                        self.plan_base().ctx().clone()
824                    }
825
826                    pub fn schema(&self) -> &Schema {
827                        self.plan_base().schema()
828                    }
829
830                    pub fn stream_key(&self) -> Option<&[usize]> {
831                        self.plan_base().stream_key()
832                    }
833
834                    pub fn functional_dependency(&self) -> &FunctionalDependencySet {
835                        self.plan_base().functional_dependency()
836                    }
837
838                    pub fn explain_myself_to_string(&self) -> String {
839                        self.distill_to_string()
840                    }
841                }
842            )+
843        }
844    };
845}
846
847impl_generic_plan_ref_method!(Batch, Stream, Logical);
848
849/// Recursion depth threshold for plan node visitor to send notice to user.
850pub const PLAN_DEPTH_THRESHOLD: usize = 30;
851/// Notice message for plan node visitor to send to user when the depth threshold is reached.
852pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
853Consider simplifying or splitting the query if you encounter any issues.";
854
855impl dyn StreamPlanNode {
856    /// Serialize the plan node and its children to a stream plan proto.
857    ///
858    /// Note that some operators has their own implementation of `to_stream_prost`. We have a
859    /// hook inside to do some ad-hoc things.
860    pub fn to_stream_prost(
861        &self,
862        state: &mut BuildFragmentGraphState,
863    ) -> SchedulerResult<PbStreamPlan> {
864        recursive::tracker!().recurse(|t| {
865            if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
866                notice_to_user(PLAN_TOO_DEEP_NOTICE);
867            }
868
869            use stream::prelude::*;
870
871            if let Some(stream_table_scan) = self.as_stream_table_scan() {
872                return stream_table_scan.adhoc_to_stream_prost(state);
873            }
874            if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
875                return stream_cdc_table_scan.adhoc_to_stream_prost(state);
876            }
877            if let Some(stream_source_scan) = self.as_stream_source_scan() {
878                return stream_source_scan.adhoc_to_stream_prost(state);
879            }
880            if let Some(stream_share) = self.as_stream_share() {
881                return stream_share.adhoc_to_stream_prost(state);
882            }
883
884            let node = Some(self.try_to_stream_prost_body(state)?);
885            let input = self
886                .inputs()
887                .into_iter()
888                .map(|plan| plan.to_stream_prost(state))
889                .try_collect()?;
890            // TODO: support pk_indices and operator_id
891            Ok(PbStreamPlan {
892                input,
893                identity: self.explain_myself_to_string(),
894                node_body: node,
895                operator_id: self.id().0 as _,
896                stream_key: self
897                    .stream_key()
898                    .unwrap_or_default()
899                    .iter()
900                    .map(|x| *x as u32)
901                    .collect(),
902                fields: self.schema().to_prost(),
903                append_only: self.plan_base().append_only(),
904            })
905        })
906    }
907}
908
909impl dyn BatchPlanNode {
910    /// Serialize the plan node and its children to a batch plan proto.
911    pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
912        self.to_batch_prost_identity(true)
913    }
914
915    /// Serialize the plan node and its children to a batch plan proto without the identity field
916    /// (for testing).
917    pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
918        recursive::tracker!().recurse(|t| {
919            if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
920                notice_to_user(PLAN_TOO_DEEP_NOTICE);
921            }
922
923            let node_body = Some(self.try_to_batch_prost_body()?);
924            let children = self
925                .inputs()
926                .into_iter()
927                .map(|plan| plan.to_batch_prost_identity(identity))
928                .try_collect()?;
929            Ok(PbBatchPlan {
930                children,
931                identity: if identity {
932                    self.explain_myself_to_string()
933                } else {
934                    "".into()
935                },
936                node_body,
937            })
938        })
939    }
940}
941
942mod plan_base;
943pub use plan_base::*;
944#[macro_use]
945mod plan_tree_node;
946pub use plan_tree_node::*;
947mod col_pruning;
948pub use col_pruning::*;
949mod expr_rewritable;
950pub use expr_rewritable::*;
951mod expr_visitable;
952
953mod convert;
954pub use convert::*;
955mod eq_join_predicate;
956pub use eq_join_predicate::*;
957mod to_prost;
958pub use to_prost::*;
959mod predicate_pushdown;
960pub use predicate_pushdown::*;
961mod merge_eq_nodes;
962pub use merge_eq_nodes::*;
963
964pub mod batch;
965pub mod generic;
966pub mod stream;
967
968pub use generic::{PlanAggCall, PlanAggCallDisplay};
969
970mod batch_delete;
971mod batch_exchange;
972mod batch_expand;
973mod batch_filter;
974mod batch_group_topn;
975mod batch_hash_agg;
976mod batch_hash_join;
977mod batch_hop_window;
978mod batch_insert;
979mod batch_limit;
980mod batch_log_seq_scan;
981mod batch_lookup_join;
982mod batch_max_one_row;
983mod batch_nested_loop_join;
984mod batch_over_window;
985mod batch_project;
986mod batch_project_set;
987mod batch_seq_scan;
988mod batch_simple_agg;
989mod batch_sort;
990mod batch_sort_agg;
991mod batch_source;
992mod batch_sys_seq_scan;
993mod batch_table_function;
994mod batch_topn;
995mod batch_union;
996mod batch_update;
997mod batch_values;
998mod logical_agg;
999mod logical_apply;
1000mod logical_cdc_scan;
1001mod logical_changelog;
1002mod logical_cte_ref;
1003mod logical_dedup;
1004mod logical_delete;
1005mod logical_except;
1006mod logical_expand;
1007mod logical_filter;
1008mod logical_hop_window;
1009mod logical_insert;
1010mod logical_intersect;
1011mod logical_join;
1012mod logical_kafka_scan;
1013mod logical_limit;
1014mod logical_max_one_row;
1015mod logical_multi_join;
1016mod logical_now;
1017mod logical_over_window;
1018mod logical_project;
1019mod logical_project_set;
1020mod logical_recursive_union;
1021mod logical_scan;
1022mod logical_share;
1023mod logical_source;
1024mod logical_sys_scan;
1025mod logical_table_function;
1026mod logical_topn;
1027mod logical_union;
1028mod logical_update;
1029mod logical_values;
1030mod stream_asof_join;
1031mod stream_changelog;
1032mod stream_dedup;
1033mod stream_delta_join;
1034mod stream_dml;
1035mod stream_dynamic_filter;
1036mod stream_eowc_over_window;
1037mod stream_exchange;
1038mod stream_expand;
1039mod stream_filter;
1040mod stream_fs_fetch;
1041mod stream_global_approx_percentile;
1042mod stream_group_topn;
1043mod stream_hash_agg;
1044mod stream_hash_join;
1045mod stream_hop_window;
1046mod stream_join_common;
1047mod stream_local_approx_percentile;
1048mod stream_materialize;
1049mod stream_materialized_exprs;
1050mod stream_now;
1051mod stream_over_window;
1052mod stream_project;
1053mod stream_project_set;
1054mod stream_row_id_gen;
1055mod stream_row_merge;
1056mod stream_simple_agg;
1057mod stream_sink;
1058mod stream_sort;
1059mod stream_source;
1060mod stream_source_scan;
1061mod stream_stateless_simple_agg;
1062mod stream_sync_log_store;
1063mod stream_table_scan;
1064mod stream_topn;
1065mod stream_values;
1066mod stream_watermark_filter;
1067
1068mod batch_file_scan;
1069mod batch_iceberg_scan;
1070mod batch_kafka_scan;
1071mod batch_postgres_query;
1072
1073mod batch_mysql_query;
1074mod derive;
1075mod logical_file_scan;
1076mod logical_iceberg_scan;
1077mod logical_postgres_query;
1078
1079mod logical_mysql_query;
1080mod stream_cdc_table_scan;
1081mod stream_share;
1082mod stream_temporal_join;
1083mod stream_union;
1084pub mod utils;
1085
1086pub use batch_delete::BatchDelete;
1087pub use batch_exchange::BatchExchange;
1088pub use batch_expand::BatchExpand;
1089pub use batch_file_scan::BatchFileScan;
1090pub use batch_filter::BatchFilter;
1091pub use batch_group_topn::BatchGroupTopN;
1092pub use batch_hash_agg::BatchHashAgg;
1093pub use batch_hash_join::BatchHashJoin;
1094pub use batch_hop_window::BatchHopWindow;
1095pub use batch_iceberg_scan::BatchIcebergScan;
1096pub use batch_insert::BatchInsert;
1097pub use batch_kafka_scan::BatchKafkaScan;
1098pub use batch_limit::BatchLimit;
1099pub use batch_log_seq_scan::BatchLogSeqScan;
1100pub use batch_lookup_join::BatchLookupJoin;
1101pub use batch_max_one_row::BatchMaxOneRow;
1102pub use batch_mysql_query::BatchMySqlQuery;
1103pub use batch_nested_loop_join::BatchNestedLoopJoin;
1104pub use batch_over_window::BatchOverWindow;
1105pub use batch_postgres_query::BatchPostgresQuery;
1106pub use batch_project::BatchProject;
1107pub use batch_project_set::BatchProjectSet;
1108pub use batch_seq_scan::BatchSeqScan;
1109pub use batch_simple_agg::BatchSimpleAgg;
1110pub use batch_sort::BatchSort;
1111pub use batch_sort_agg::BatchSortAgg;
1112pub use batch_source::BatchSource;
1113pub use batch_sys_seq_scan::BatchSysSeqScan;
1114pub use batch_table_function::BatchTableFunction;
1115pub use batch_topn::BatchTopN;
1116pub use batch_union::BatchUnion;
1117pub use batch_update::BatchUpdate;
1118pub use batch_values::BatchValues;
1119pub use logical_agg::LogicalAgg;
1120pub use logical_apply::LogicalApply;
1121pub use logical_cdc_scan::LogicalCdcScan;
1122pub use logical_changelog::LogicalChangeLog;
1123pub use logical_cte_ref::LogicalCteRef;
1124pub use logical_dedup::LogicalDedup;
1125pub use logical_delete::LogicalDelete;
1126pub use logical_except::LogicalExcept;
1127pub use logical_expand::LogicalExpand;
1128pub use logical_file_scan::LogicalFileScan;
1129pub use logical_filter::LogicalFilter;
1130pub use logical_hop_window::LogicalHopWindow;
1131pub use logical_iceberg_scan::LogicalIcebergScan;
1132pub use logical_insert::LogicalInsert;
1133pub use logical_intersect::LogicalIntersect;
1134pub use logical_join::LogicalJoin;
1135pub use logical_kafka_scan::LogicalKafkaScan;
1136pub use logical_limit::LogicalLimit;
1137pub use logical_max_one_row::LogicalMaxOneRow;
1138pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder};
1139pub use logical_mysql_query::LogicalMySqlQuery;
1140pub use logical_now::LogicalNow;
1141pub use logical_over_window::LogicalOverWindow;
1142pub use logical_postgres_query::LogicalPostgresQuery;
1143pub use logical_project::LogicalProject;
1144pub use logical_project_set::LogicalProjectSet;
1145pub use logical_recursive_union::LogicalRecursiveUnion;
1146pub use logical_scan::LogicalScan;
1147pub use logical_share::LogicalShare;
1148pub use logical_source::LogicalSource;
1149pub use logical_sys_scan::LogicalSysScan;
1150pub use logical_table_function::LogicalTableFunction;
1151pub use logical_topn::LogicalTopN;
1152pub use logical_union::LogicalUnion;
1153pub use logical_update::LogicalUpdate;
1154pub use logical_values::LogicalValues;
1155pub use stream_asof_join::StreamAsOfJoin;
1156pub use stream_cdc_table_scan::StreamCdcTableScan;
1157pub use stream_changelog::StreamChangeLog;
1158pub use stream_dedup::StreamDedup;
1159pub use stream_delta_join::StreamDeltaJoin;
1160pub use stream_dml::StreamDml;
1161pub use stream_dynamic_filter::StreamDynamicFilter;
1162pub use stream_eowc_over_window::StreamEowcOverWindow;
1163pub use stream_exchange::StreamExchange;
1164pub use stream_expand::StreamExpand;
1165pub use stream_filter::StreamFilter;
1166pub use stream_fs_fetch::StreamFsFetch;
1167pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
1168pub use stream_group_topn::StreamGroupTopN;
1169pub use stream_hash_agg::StreamHashAgg;
1170pub use stream_hash_join::StreamHashJoin;
1171pub use stream_hop_window::StreamHopWindow;
1172use stream_join_common::StreamJoinCommon;
1173pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
1174pub use stream_materialize::StreamMaterialize;
1175pub use stream_materialized_exprs::StreamMaterializedExprs;
1176pub use stream_now::StreamNow;
1177pub use stream_over_window::StreamOverWindow;
1178pub use stream_project::StreamProject;
1179pub use stream_project_set::StreamProjectSet;
1180pub use stream_row_id_gen::StreamRowIdGen;
1181pub use stream_row_merge::StreamRowMerge;
1182pub use stream_share::StreamShare;
1183pub use stream_simple_agg::StreamSimpleAgg;
1184pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
1185pub use stream_sort::StreamEowcSort;
1186pub use stream_source::StreamSource;
1187pub use stream_source_scan::StreamSourceScan;
1188pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
1189pub use stream_sync_log_store::StreamSyncLogStore;
1190pub use stream_table_scan::StreamTableScan;
1191pub use stream_temporal_join::StreamTemporalJoin;
1192pub use stream_topn::StreamTopN;
1193pub use stream_union::StreamUnion;
1194pub use stream_values::StreamValues;
1195pub use stream_watermark_filter::StreamWatermarkFilter;
1196
1197use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
1198use crate::optimizer::optimizer_context::OptimizerContextRef;
1199use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
1200use crate::optimizer::plan_rewriter::PlanCloner;
1201use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
1202use crate::scheduler::SchedulerResult;
1203use crate::stream_fragmenter::BuildFragmentGraphState;
1204use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};
1205
1206/// `for_all_plan_nodes` includes all plan nodes. If you added a new plan node
1207/// inside the project, be sure to add here and in its conventions like `for_logical_plan_nodes`
1208///
1209/// Every tuple has two elements, where `{ convention, name }`
1210/// You can use it as follows
1211/// ```rust
1212/// macro_rules! use_plan {
1213///     ($({ $convention:ident, $name:ident }),*) => {};
1214/// }
1215/// risingwave_frontend::for_all_plan_nodes! { use_plan }
1216/// ```
1217/// See the following implementations for example.
1218#[macro_export]
1219macro_rules! for_all_plan_nodes {
1220    ($macro:path $(,$rest:tt)*) => {
1221        $macro! {
1222              { Logical, Agg }
1223            , { Logical, Apply }
1224            , { Logical, Filter }
1225            , { Logical, Project }
1226            , { Logical, Scan }
1227            , { Logical, CdcScan }
1228            , { Logical, SysScan }
1229            , { Logical, Source }
1230            , { Logical, Insert }
1231            , { Logical, Delete }
1232            , { Logical, Update }
1233            , { Logical, Join }
1234            , { Logical, Values }
1235            , { Logical, Limit }
1236            , { Logical, TopN }
1237            , { Logical, HopWindow }
1238            , { Logical, TableFunction }
1239            , { Logical, MultiJoin }
1240            , { Logical, Expand }
1241            , { Logical, ProjectSet }
1242            , { Logical, Union }
1243            , { Logical, OverWindow }
1244            , { Logical, Share }
1245            , { Logical, Now }
1246            , { Logical, Dedup }
1247            , { Logical, Intersect }
1248            , { Logical, Except }
1249            , { Logical, MaxOneRow }
1250            , { Logical, KafkaScan }
1251            , { Logical, IcebergScan }
1252            , { Logical, RecursiveUnion }
1253            , { Logical, CteRef }
1254            , { Logical, ChangeLog }
1255            , { Logical, FileScan }
1256            , { Logical, PostgresQuery }
1257            , { Logical, MySqlQuery }
1258            , { Batch, SimpleAgg }
1259            , { Batch, HashAgg }
1260            , { Batch, SortAgg }
1261            , { Batch, Project }
1262            , { Batch, Filter }
1263            , { Batch, Insert }
1264            , { Batch, Delete }
1265            , { Batch, Update }
1266            , { Batch, SeqScan }
1267            , { Batch, SysSeqScan }
1268            , { Batch, LogSeqScan }
1269            , { Batch, HashJoin }
1270            , { Batch, NestedLoopJoin }
1271            , { Batch, Values }
1272            , { Batch, Sort }
1273            , { Batch, Exchange }
1274            , { Batch, Limit }
1275            , { Batch, TopN }
1276            , { Batch, HopWindow }
1277            , { Batch, TableFunction }
1278            , { Batch, Expand }
1279            , { Batch, LookupJoin }
1280            , { Batch, ProjectSet }
1281            , { Batch, Union }
1282            , { Batch, GroupTopN }
1283            , { Batch, Source }
1284            , { Batch, OverWindow }
1285            , { Batch, MaxOneRow }
1286            , { Batch, KafkaScan }
1287            , { Batch, IcebergScan }
1288            , { Batch, FileScan }
1289            , { Batch, PostgresQuery }
1290            , { Batch, MySqlQuery }
1291            , { Stream, Project }
1292            , { Stream, Filter }
1293            , { Stream, TableScan }
1294            , { Stream, CdcTableScan }
1295            , { Stream, Sink }
1296            , { Stream, Source }
1297            , { Stream, SourceScan }
1298            , { Stream, HashJoin }
1299            , { Stream, Exchange }
1300            , { Stream, HashAgg }
1301            , { Stream, SimpleAgg }
1302            , { Stream, StatelessSimpleAgg }
1303            , { Stream, Materialize }
1304            , { Stream, TopN }
1305            , { Stream, HopWindow }
1306            , { Stream, DeltaJoin }
1307            , { Stream, Expand }
1308            , { Stream, DynamicFilter }
1309            , { Stream, ProjectSet }
1310            , { Stream, GroupTopN }
1311            , { Stream, Union }
1312            , { Stream, RowIdGen }
1313            , { Stream, Dml }
1314            , { Stream, Now }
1315            , { Stream, Share }
1316            , { Stream, WatermarkFilter }
1317            , { Stream, TemporalJoin }
1318            , { Stream, Values }
1319            , { Stream, Dedup }
1320            , { Stream, EowcOverWindow }
1321            , { Stream, EowcSort }
1322            , { Stream, OverWindow }
1323            , { Stream, FsFetch }
1324            , { Stream, ChangeLog }
1325            , { Stream, GlobalApproxPercentile }
1326            , { Stream, LocalApproxPercentile }
1327            , { Stream, RowMerge }
1328            , { Stream, AsOfJoin }
1329            , { Stream, SyncLogStore }
1330            , { Stream, MaterializedExprs }
1331            $(,$rest)*
1332        }
1333    };
1334}
1335
1336#[macro_export]
1337macro_rules! for_each_convention_all_plan_nodes {
1338    ($macro:path $(,$rest:tt)*) => {
1339        $crate::for_all_plan_nodes! {
1340            $crate::for_each_convention_all_plan_nodes
1341            , $macro
1342            $(,$rest)*
1343        }
1344    };
1345    (
1346        $( { Logical, $logical_name:ident } ),*
1347        , $( { Batch, $batch_name:ident } ),*
1348        , $( { Stream, $stream_name:ident } ),*
1349        , $macro:path $(,$rest:tt)*
1350    ) => {
1351        $macro! {
1352            {
1353                Logical, { $( $logical_name ),* },
1354                Batch, { $( $batch_name ),* },
1355                Stream, { $( $stream_name ),* }
1356            }
1357            $(,$rest)*
1358        }
1359    }
1360}
1361
1362/// impl `PlanNodeType` fn for each node.
1363macro_rules! impl_plan_node_meta {
1364    ({
1365        $( $convention:ident, { $( $name:ident ),* }),*
1366    }) => {
1367        paste!{
1368            $(
1369                /// each enum value represent a `PlanNode` struct type, help us to dispatch and downcast
1370                #[derive(Copy, Clone, PartialEq, Debug, Hash, Eq, Serialize)]
1371                pub enum [<$convention PlanNodeType>] {
1372                    $( [<$convention $name>] ),*
1373                }
1374            )*
1375            $(
1376                $(impl PlanNodeMeta for [<$convention $name>] {
1377                    type Convention = $convention;
1378                    const NODE_TYPE: [<$convention PlanNodeType>] = [<$convention PlanNodeType>]::[<$convention $name>];
1379
1380                    fn plan_base(&self) -> &PlanBase<$convention> {
1381                        &self.base
1382                    }
1383                }
1384
1385                impl Deref for [<$convention $name>] {
1386                    type Target = PlanBase<$convention>;
1387
1388                    fn deref(&self) -> &Self::Target {
1389                        &self.base
1390                    }
1391                })*
1392            )*
1393        }
1394    }
1395}
1396
1397for_each_convention_all_plan_nodes! { impl_plan_node_meta }
1398
1399macro_rules! impl_plan_node {
1400    ($({ $convention:ident, $name:ident }),*) => {
1401        paste!{
1402            $(impl [<$convention PlanNode>] for [<$convention $name>] { })*
1403        }
1404    }
1405}
1406
1407for_all_plan_nodes! { impl_plan_node }
1408
1409/// impl plan node downcast fn for each node.
1410macro_rules! impl_down_cast_fn {
1411    ({
1412        $( $convention:ident, { $( $name:ident ),* }),*
1413    }) => {
1414        paste!{
1415            $(
1416                impl dyn [<$convention PlanNode>] {
1417                    $( pub fn [< as_ $convention:snake _ $name:snake>](&self) -> Option<&[<$convention $name>]> {
1418                        self.downcast_ref::<[<$convention $name>]>()
1419                    } )*
1420                }
1421            )*
1422        }
1423    }
1424}
1425
1426for_each_convention_all_plan_nodes! { impl_down_cast_fn }