1use std::collections::HashMap;
31use std::fmt::Debug;
32use std::hash::Hash;
33use std::ops::Deref;
34use std::rc::Rc;
35
36use downcast_rs::{Downcast, impl_downcast};
37use dyn_clone::DynClone;
38use itertools::Itertools;
39use paste::paste;
40use petgraph::dot::{Config, Dot};
41use petgraph::graph::Graph;
42use pretty_xmlish::{Pretty, PrettyConfig};
43use risingwave_common::catalog::Schema;
44use risingwave_common::util::recursive::{self, Recurse};
45use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
46use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
47use serde::Serialize;
48use smallvec::SmallVec;
49
50use self::batch::BatchPlanRef;
51use self::generic::{GenericPlanRef, PhysicalPlanRef};
52use self::stream::StreamPlanRef;
53use self::utils::Distill;
54use super::property::{
55 Distribution, FunctionalDependencySet, MonotonicityMap, Order, WatermarkColumns,
56};
57use crate::error::{ErrorCode, Result};
58use crate::optimizer::ExpressionSimplifyRewriter;
59use crate::session::current::notice_to_user;
60use crate::utils::{PrettySerde, build_graph_from_pretty};
61
62pub trait ConventionMarker: 'static + Sized {
66 type Extra: 'static + Eq + Hash + Clone + Debug;
68
69 fn value() -> Convention;
71}
72
73pub struct Logical;
75impl ConventionMarker for Logical {
76 type Extra = plan_base::NoExtra;
77
78 fn value() -> Convention {
79 Convention::Logical
80 }
81}
82
83pub struct Batch;
85impl ConventionMarker for Batch {
86 type Extra = plan_base::BatchExtra;
87
88 fn value() -> Convention {
89 Convention::Batch
90 }
91}
92
93pub struct Stream;
95impl ConventionMarker for Stream {
96 type Extra = plan_base::StreamExtra;
97
98 fn value() -> Convention {
99 Convention::Stream
100 }
101}
102
103pub trait PlanNodeMeta {
105 type Convention: ConventionMarker;
106
107 const NODE_TYPE: PlanNodeType;
108
109 fn plan_base(&self) -> &PlanBase<Self::Convention>;
111
112 fn plan_base_ref(&self) -> PlanBaseRef<'_>;
117}
118
119mod plan_node_meta {
121 use super::*;
122
123 pub trait AnyPlanNodeMeta {
127 fn node_type(&self) -> PlanNodeType;
128 fn plan_base(&self) -> PlanBaseRef<'_>;
129 fn convention(&self) -> Convention;
130 }
131
132 impl<P> AnyPlanNodeMeta for P
134 where
135 P: PlanNodeMeta,
136 {
137 fn node_type(&self) -> PlanNodeType {
138 P::NODE_TYPE
139 }
140
141 fn plan_base(&self) -> PlanBaseRef<'_> {
142 PlanNodeMeta::plan_base_ref(self)
143 }
144
145 fn convention(&self) -> Convention {
146 P::Convention::value()
147 }
148 }
149}
150use plan_node_meta::AnyPlanNodeMeta;
151
152pub trait PlanNode:
157 PlanTreeNode
158 + DynClone
159 + DynEq
160 + DynHash
161 + Distill
162 + Debug
163 + Downcast
164 + ColPrunable
165 + ExprRewritable
166 + ExprVisitable
167 + ToBatch
168 + ToStream
169 + ToDistributedBatch
170 + ToPb
171 + ToLocalBatch
172 + PredicatePushdown
173 + AnyPlanNodeMeta
174{
175}
176
177impl Hash for dyn PlanNode {
178 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
179 self.dyn_hash(state);
180 }
181}
182
183impl PartialEq for dyn PlanNode {
184 fn eq(&self, other: &Self) -> bool {
185 self.dyn_eq(other.as_dyn_eq())
186 }
187}
188
189impl Eq for dyn PlanNode {}
190
191impl_downcast!(PlanNode);
192
193#[allow(clippy::derived_hash_with_manual_eq)]
196#[derive(Clone, Debug, Eq, Hash)]
197pub struct PlanRef(Rc<dyn PlanNode>);
198
199#[allow(clippy::op_ref)]
202impl PartialEq for PlanRef {
203 fn eq(&self, other: &Self) -> bool {
204 &self.0 == &other.0
205 }
206}
207
208impl Deref for PlanRef {
209 type Target = dyn PlanNode;
210
211 fn deref(&self) -> &Self::Target {
212 self.0.deref()
213 }
214}
215
216impl<T: PlanNode> From<T> for PlanRef {
217 fn from(value: T) -> Self {
218 PlanRef(Rc::new(value))
219 }
220}
221
222impl Layer for PlanRef {
223 type Sub = Self;
224
225 fn map<F>(self, f: F) -> Self
226 where
227 F: FnMut(Self::Sub) -> Self::Sub,
228 {
229 self.clone_with_inputs(&self.inputs().into_iter().map(f).collect_vec())
230 }
231
232 fn descent<F>(&self, f: F)
233 where
234 F: FnMut(&Self::Sub),
235 {
236 self.inputs().iter().for_each(f);
237 }
238}
239
240#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
241pub struct PlanNodeId(pub i32);
242
243pub trait EndoPlan: Endo<PlanRef> {
253 fn cached<F>(&mut self, plan: PlanRef, f: F) -> PlanRef
258 where
259 F: FnMut(&mut Self) -> PlanRef;
260
261 fn dag_apply(&mut self, plan: PlanRef) -> PlanRef {
262 match plan.as_logical_share() {
263 Some(_) => self.cached(plan.clone(), |this| this.tree_apply(plan.clone())),
264 None => self.tree_apply(plan),
265 }
266 }
267}
268
269pub trait VisitPlan: Visit<PlanRef> {
275 fn visited<F>(&mut self, plan: &PlanRef, f: F)
279 where
280 F: FnMut(&mut Self);
281
282 fn dag_visit(&mut self, plan: &PlanRef) {
283 match plan.as_logical_share() {
284 Some(_) => self.visited(plan, |this| this.tree_visit(plan)),
285 None => self.tree_visit(plan),
286 }
287 }
288}
289
290#[derive(Clone, Copy, Debug, PartialEq)]
291pub enum Convention {
292 Logical,
293 Batch,
294 Stream,
295}
296
297pub(crate) trait RewriteExprsRecursive {
298 fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef;
299}
300
301impl RewriteExprsRecursive for PlanRef {
302 fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef {
303 let new = self.rewrite_exprs(r);
304 let inputs: Vec<PlanRef> = new
305 .inputs()
306 .iter()
307 .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r))
308 .collect();
309 new.clone_with_inputs(&inputs[..])
310 }
311}
312
313pub(crate) trait VisitExprsRecursive {
314 fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor);
315}
316
317impl VisitExprsRecursive for PlanRef {
318 fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor) {
319 self.visit_exprs(r);
320 self.inputs()
321 .iter()
322 .for_each(|plan_ref| plan_ref.visit_exprs_recursive(r));
323 }
324}
325
326impl PlanRef {
327 pub fn expect_stream_key(&self) -> &[usize] {
328 self.stream_key().unwrap_or_else(|| {
329 panic!(
330 "a stream key is expected but not exist, plan:\n{}",
331 self.explain_to_string()
332 )
333 })
334 }
335
336 fn prune_col_inner(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
337 if let Some(logical_share) = self.as_logical_share() {
338 if let Some((new_share, merge_required_cols)) = ctx.get_share_cache(self.id()) {
341 if ctx.get_parent_num(logical_share) == 1 {
343 let input: PlanRef = logical_share.input();
344 return input.prune_col(required_cols, ctx);
345 }
346
347 if ctx.visit_share_at_second_round(self.id()) {
350 let new_logical_share: &LogicalShare = new_share
351 .as_logical_share()
352 .expect("must be share operator");
353 let new_share_input = new_logical_share.input().prune_col(
354 &(0..new_logical_share.base.schema().len()).collect_vec(),
355 ctx,
356 );
357 new_logical_share.replace_input(new_share_input);
358 }
359
360 let new_required_cols: Vec<usize> = required_cols
362 .iter()
363 .map(|col| merge_required_cols.iter().position(|x| x == col).unwrap())
364 .collect_vec();
365 let mapping = ColIndexMapping::with_remaining_columns(
366 &new_required_cols,
367 new_share.schema().len(),
368 );
369 return LogicalProject::with_mapping(new_share, mapping).into();
370 }
371
372 let parent_has_pushed = ctx.add_required_cols(self.id(), required_cols.into());
377 if parent_has_pushed == ctx.get_parent_num(logical_share) {
378 let merge_require_cols = ctx
379 .take_required_cols(self.id())
380 .expect("must have required columns")
381 .into_iter()
382 .flat_map(|x| x.into_iter())
383 .sorted()
384 .dedup()
385 .collect_vec();
386 let input: PlanRef = logical_share.input();
387 let input = input.prune_col(&merge_require_cols, ctx);
388
389 let new_logical_share = LogicalShare::create(input.clone());
391 ctx.add_share_cache(self.id(), new_logical_share, merge_require_cols.clone());
392
393 let exprs = logical_share
394 .base
395 .schema()
396 .fields
397 .iter()
398 .enumerate()
399 .map(|(i, field)| {
400 if let Some(pos) = merge_require_cols.iter().position(|x| *x == i) {
401 ExprImpl::InputRef(Box::new(InputRef::new(
402 pos,
403 field.data_type.clone(),
404 )))
405 } else {
406 ExprImpl::Literal(Box::new(Literal::new(None, field.data_type.clone())))
407 }
408 })
409 .collect_vec();
410 let project = LogicalProject::create(input, exprs);
411 logical_share.replace_input(project);
412 }
413 let mapping =
414 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
415 LogicalProject::with_mapping(self.clone(), mapping).into()
416 } else {
417 let dyn_t = self.deref();
419 dyn_t.prune_col(required_cols, ctx)
420 }
421 }
422
423 fn predicate_pushdown_inner(
424 &self,
425 predicate: Condition,
426 ctx: &mut PredicatePushdownContext,
427 ) -> PlanRef {
428 if let Some(logical_share) = self.as_logical_share() {
429 if ctx.get_parent_num(logical_share) == 1 {
431 let input: PlanRef = logical_share.input();
432 return input.predicate_pushdown(predicate, ctx);
433 }
434
435 let parent_has_pushed = ctx.add_predicate(self.id(), predicate.clone());
440 if parent_has_pushed == ctx.get_parent_num(logical_share) {
441 let merge_predicate = ctx
442 .take_predicate(self.id())
443 .expect("must have predicate")
444 .into_iter()
445 .map(|mut c| Condition {
446 conjunctions: c
447 .conjunctions
448 .extract_if(.., |e| {
449 let mut finder = ExprCorrelatedIdFinder::default();
452 finder.visit_expr(e);
453 e.count_nows() == 0
454 && e.is_pure()
455 && !finder.has_correlated_input_ref()
456 })
457 .collect(),
458 })
459 .reduce(|a, b| a.or(b))
460 .unwrap();
461
462 let mut expr_rewriter = ExpressionSimplifyRewriter {};
472 let mut new_predicate = Condition::true_cond();
473
474 for c in merge_predicate.conjunctions {
475 let c = Condition::with_expr(expr_rewriter.rewrite_cond(c));
476 new_predicate = new_predicate.and(c);
478 }
479
480 let input: PlanRef = logical_share.input();
481 let input = input.predicate_pushdown(new_predicate, ctx);
482 logical_share.replace_input(input);
483 }
484 LogicalFilter::create(self.clone(), predicate)
485 } else {
486 let dyn_t = self.deref();
488 dyn_t.predicate_pushdown(predicate, ctx)
489 }
490 }
491}
492
493impl ColPrunable for PlanRef {
494 #[allow(clippy::let_and_return)]
495 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
496 let res = self.prune_col_inner(required_cols, ctx);
497 #[cfg(debug_assertions)]
498 super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
499 "column pruning",
500 &LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
501 &res,
502 );
503 res
504 }
505}
506
507impl PredicatePushdown for PlanRef {
508 #[allow(clippy::let_and_return)]
509 fn predicate_pushdown(
510 &self,
511 predicate: Condition,
512 ctx: &mut PredicatePushdownContext,
513 ) -> PlanRef {
514 #[cfg(debug_assertions)]
515 let predicate_clone = predicate.clone();
516
517 let res = self.predicate_pushdown_inner(predicate, ctx);
518
519 #[cfg(debug_assertions)]
520 super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
521 "predicate push down",
522 &LogicalFilter::new(self.clone(), predicate_clone).into(),
523 &res,
524 );
525
526 res
527 }
528}
529
530impl PlanTreeNode for PlanRef {
531 fn inputs(&self) -> SmallVec<[PlanRef; 2]> {
532 let dyn_t = self.deref();
534 dyn_t.inputs()
535 }
536
537 fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
538 if let Some(logical_share) = self.clone().as_logical_share() {
539 assert_eq!(inputs.len(), 1);
540 logical_share.replace_input(inputs[0].clone());
542 self.clone()
543 } else if let Some(stream_share) = self.clone().as_stream_share() {
544 assert_eq!(inputs.len(), 1);
545 stream_share.replace_input(inputs[0].clone());
547 self.clone()
548 } else {
549 let dyn_t = self.deref();
551 dyn_t.clone_with_inputs(inputs)
552 }
553 }
554}
555
556impl AnyPlanNodeMeta for PlanRef {
558 fn node_type(&self) -> PlanNodeType {
559 self.0.node_type()
560 }
561
562 fn plan_base(&self) -> PlanBaseRef<'_> {
563 self.0.plan_base()
564 }
565
566 fn convention(&self) -> Convention {
567 self.0.convention()
568 }
569}
570
571impl GenericPlanRef for PlanRef {
574 fn id(&self) -> PlanNodeId {
575 self.plan_base().id()
576 }
577
578 fn schema(&self) -> &Schema {
579 self.plan_base().schema()
580 }
581
582 fn stream_key(&self) -> Option<&[usize]> {
583 self.plan_base().stream_key()
584 }
585
586 fn ctx(&self) -> OptimizerContextRef {
587 self.plan_base().ctx()
588 }
589
590 fn functional_dependency(&self) -> &FunctionalDependencySet {
591 self.plan_base().functional_dependency()
592 }
593}
594
595impl PhysicalPlanRef for PlanRef {
598 fn distribution(&self) -> &Distribution {
599 self.plan_base().distribution()
600 }
601}
602
603impl StreamPlanRef for PlanRef {
606 fn append_only(&self) -> bool {
607 self.plan_base().append_only()
608 }
609
610 fn emit_on_window_close(&self) -> bool {
611 self.plan_base().emit_on_window_close()
612 }
613
614 fn watermark_columns(&self) -> &WatermarkColumns {
615 self.plan_base().watermark_columns()
616 }
617
618 fn columns_monotonicity(&self) -> &MonotonicityMap {
619 self.plan_base().columns_monotonicity()
620 }
621}
622
623impl BatchPlanRef for PlanRef {
626 fn order(&self) -> &Order {
627 self.plan_base().order()
628 }
629}
630
631pub fn reorganize_elements_id(plan: PlanRef) -> PlanRef {
635 let backup = plan.ctx().backup_elem_ids();
636 plan.ctx().reset_elem_ids();
637 let plan = PlanCloner::clone_whole_plan(plan);
638 plan.ctx().restore_elem_ids(backup);
639 plan
640}
641
642pub trait Explain {
643 fn explain<'a>(&self) -> Pretty<'a>;
645
646 fn explain_with_id<'a>(&self) -> Pretty<'a>;
648
649 fn explain_to_string(&self) -> String;
651
652 fn explain_to_json(&self) -> String;
654
655 fn explain_to_xml(&self) -> String;
657
658 fn explain_to_yaml(&self) -> String;
660
661 fn explain_to_dot(&self) -> String;
663}
664
665impl Explain for PlanRef {
666 fn explain<'a>(&self) -> Pretty<'a> {
668 let mut node = self.distill();
669 let inputs = self.inputs();
670 for input in inputs.iter().peekable() {
671 node.children.push(input.explain());
672 }
673 Pretty::Record(node)
674 }
675
676 fn explain_with_id<'a>(&self) -> Pretty<'a> {
678 let node_id = self.id();
679 let mut node = self.distill();
680 node.fields
683 .insert(0, ("id".into(), Pretty::display(&node_id.0)));
684 let inputs = self.inputs();
685 for input in inputs.iter().peekable() {
686 node.children.push(input.explain_with_id());
687 }
688 Pretty::Record(node)
689 }
690
691 fn explain_to_string(&self) -> String {
693 let plan = reorganize_elements_id(self.clone());
694
695 let mut output = String::with_capacity(2048);
696 let mut config = pretty_config();
697 config.unicode(&mut output, &plan.explain());
698 output
699 }
700
701 fn explain_to_json(&self) -> String {
703 let plan = reorganize_elements_id(self.clone());
704 let explain_ir = plan.explain();
705 serde_json::to_string_pretty(&PrettySerde(explain_ir, true))
706 .expect("failed to serialize plan to json")
707 }
708
709 fn explain_to_xml(&self) -> String {
711 let plan = reorganize_elements_id(self.clone());
712 let explain_ir = plan.explain();
713 quick_xml::se::to_string(&PrettySerde(explain_ir, true))
714 .expect("failed to serialize plan to xml")
715 }
716
717 fn explain_to_yaml(&self) -> String {
719 let plan = reorganize_elements_id(self.clone());
720 let explain_ir = plan.explain();
721 serde_yaml::to_string(&PrettySerde(explain_ir, true))
722 .expect("failed to serialize plan to yaml")
723 }
724
725 fn explain_to_dot(&self) -> String {
727 let plan = reorganize_elements_id(self.clone());
728 let explain_ir = plan.explain_with_id();
729 let mut graph = Graph::<String, String>::new();
730 let mut nodes = HashMap::new();
731 build_graph_from_pretty(&explain_ir, &mut graph, &mut nodes, None);
732 let dot = Dot::with_config(&graph, &[Config::EdgeNoLabel]);
733 dot.to_string()
734 }
735}
736
737pub(crate) fn pretty_config() -> PrettyConfig {
738 PrettyConfig {
739 indent: 3,
740 need_boundaries: false,
741 width: 2048,
742 reduced_spaces: true,
743 }
744}
745
746impl dyn PlanNode {
749 pub fn id(&self) -> PlanNodeId {
750 self.plan_base().id()
751 }
752
753 pub fn ctx(&self) -> OptimizerContextRef {
754 self.plan_base().ctx().clone()
755 }
756
757 pub fn schema(&self) -> &Schema {
758 self.plan_base().schema()
759 }
760
761 pub fn stream_key(&self) -> Option<&[usize]> {
762 self.plan_base().stream_key()
763 }
764
765 pub fn functional_dependency(&self) -> &FunctionalDependencySet {
766 self.plan_base().functional_dependency()
767 }
768}
769
770pub const PLAN_DEPTH_THRESHOLD: usize = 30;
772pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
774Consider simplifying or splitting the query if you encounter any issues.";
775
776impl dyn PlanNode {
777 pub fn to_stream_prost(
782 &self,
783 state: &mut BuildFragmentGraphState,
784 ) -> SchedulerResult<PbStreamPlan> {
785 recursive::tracker!().recurse(|t| {
786 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
787 notice_to_user(PLAN_TOO_DEEP_NOTICE);
788 }
789
790 use stream::prelude::*;
791
792 if let Some(stream_table_scan) = self.as_stream_table_scan() {
793 return stream_table_scan.adhoc_to_stream_prost(state);
794 }
795 if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
796 return stream_cdc_table_scan.adhoc_to_stream_prost(state);
797 }
798 if let Some(stream_source_scan) = self.as_stream_source_scan() {
799 return stream_source_scan.adhoc_to_stream_prost(state);
800 }
801 if let Some(stream_share) = self.as_stream_share() {
802 return stream_share.adhoc_to_stream_prost(state);
803 }
804
805 let node = Some(self.try_to_stream_prost_body(state)?);
806 let input = self
807 .inputs()
808 .into_iter()
809 .map(|plan| plan.to_stream_prost(state))
810 .try_collect()?;
811 Ok(PbStreamPlan {
813 input,
814 identity: self.explain_myself_to_string(),
815 node_body: node,
816 operator_id: self.id().0 as _,
817 stream_key: self
818 .stream_key()
819 .unwrap_or_default()
820 .iter()
821 .map(|x| *x as u32)
822 .collect(),
823 fields: self.schema().to_prost(),
824 append_only: self.plan_base().append_only(),
825 })
826 })
827 }
828
829 pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
831 self.to_batch_prost_identity(true)
832 }
833
834 pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
837 recursive::tracker!().recurse(|t| {
838 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
839 notice_to_user(PLAN_TOO_DEEP_NOTICE);
840 }
841
842 let node_body = Some(self.try_to_batch_prost_body()?);
843 let children = self
844 .inputs()
845 .into_iter()
846 .map(|plan| plan.to_batch_prost_identity(identity))
847 .try_collect()?;
848 Ok(PbBatchPlan {
849 children,
850 identity: if identity {
851 self.explain_myself_to_string()
852 } else {
853 "".into()
854 },
855 node_body,
856 })
857 })
858 }
859
860 pub fn explain_myself_to_string(&self) -> String {
861 self.distill_to_string()
862 }
863}
864
865mod plan_base;
866pub use plan_base::*;
867#[macro_use]
868mod plan_tree_node;
869pub use plan_tree_node::*;
870mod col_pruning;
871pub use col_pruning::*;
872mod expr_rewritable;
873pub use expr_rewritable::*;
874mod expr_visitable;
875
876mod convert;
877pub use convert::*;
878mod eq_join_predicate;
879pub use eq_join_predicate::*;
880mod to_prost;
881pub use to_prost::*;
882mod predicate_pushdown;
883pub use predicate_pushdown::*;
884mod merge_eq_nodes;
885pub use merge_eq_nodes::*;
886
887pub mod batch;
888pub mod generic;
889pub mod stream;
890
891pub use generic::{PlanAggCall, PlanAggCallDisplay};
892
893mod batch_delete;
894mod batch_exchange;
895mod batch_expand;
896mod batch_filter;
897mod batch_group_topn;
898mod batch_hash_agg;
899mod batch_hash_join;
900mod batch_hop_window;
901mod batch_insert;
902mod batch_limit;
903mod batch_log_seq_scan;
904mod batch_lookup_join;
905mod batch_max_one_row;
906mod batch_nested_loop_join;
907mod batch_over_window;
908mod batch_project;
909mod batch_project_set;
910mod batch_seq_scan;
911mod batch_simple_agg;
912mod batch_sort;
913mod batch_sort_agg;
914mod batch_source;
915mod batch_sys_seq_scan;
916mod batch_table_function;
917mod batch_topn;
918mod batch_union;
919mod batch_update;
920mod batch_values;
921mod logical_agg;
922mod logical_apply;
923mod logical_cdc_scan;
924mod logical_changelog;
925mod logical_cte_ref;
926mod logical_dedup;
927mod logical_delete;
928mod logical_except;
929mod logical_expand;
930mod logical_filter;
931mod logical_hop_window;
932mod logical_insert;
933mod logical_intersect;
934mod logical_join;
935mod logical_kafka_scan;
936mod logical_limit;
937mod logical_max_one_row;
938mod logical_multi_join;
939mod logical_now;
940mod logical_over_window;
941mod logical_project;
942mod logical_project_set;
943mod logical_recursive_union;
944mod logical_scan;
945mod logical_share;
946mod logical_source;
947mod logical_sys_scan;
948mod logical_table_function;
949mod logical_topn;
950mod logical_union;
951mod logical_update;
952mod logical_values;
953mod stream_asof_join;
954mod stream_changelog;
955mod stream_dedup;
956mod stream_delta_join;
957mod stream_dml;
958mod stream_dynamic_filter;
959mod stream_eowc_over_window;
960mod stream_exchange;
961mod stream_expand;
962mod stream_filter;
963mod stream_fs_fetch;
964mod stream_global_approx_percentile;
965mod stream_group_topn;
966mod stream_hash_agg;
967mod stream_hash_join;
968mod stream_hop_window;
969mod stream_join_common;
970mod stream_local_approx_percentile;
971mod stream_materialize;
972mod stream_now;
973mod stream_over_window;
974mod stream_project;
975mod stream_project_set;
976mod stream_row_id_gen;
977mod stream_row_merge;
978mod stream_simple_agg;
979mod stream_sink;
980mod stream_sort;
981mod stream_source;
982mod stream_source_scan;
983mod stream_stateless_simple_agg;
984mod stream_sync_log_store;
985mod stream_table_scan;
986mod stream_topn;
987mod stream_values;
988mod stream_watermark_filter;
989
990mod batch_file_scan;
991mod batch_iceberg_scan;
992mod batch_kafka_scan;
993mod batch_postgres_query;
994
995mod batch_mysql_query;
996mod derive;
997mod logical_file_scan;
998mod logical_iceberg_scan;
999mod logical_postgres_query;
1000
1001mod logical_mysql_query;
1002mod stream_cdc_table_scan;
1003mod stream_share;
1004mod stream_temporal_join;
1005mod stream_union;
1006pub mod utils;
1007
1008pub use batch_delete::BatchDelete;
1009pub use batch_exchange::BatchExchange;
1010pub use batch_expand::BatchExpand;
1011pub use batch_file_scan::BatchFileScan;
1012pub use batch_filter::BatchFilter;
1013pub use batch_group_topn::BatchGroupTopN;
1014pub use batch_hash_agg::BatchHashAgg;
1015pub use batch_hash_join::BatchHashJoin;
1016pub use batch_hop_window::BatchHopWindow;
1017pub use batch_iceberg_scan::BatchIcebergScan;
1018pub use batch_insert::BatchInsert;
1019pub use batch_kafka_scan::BatchKafkaScan;
1020pub use batch_limit::BatchLimit;
1021pub use batch_log_seq_scan::BatchLogSeqScan;
1022pub use batch_lookup_join::BatchLookupJoin;
1023pub use batch_max_one_row::BatchMaxOneRow;
1024pub use batch_mysql_query::BatchMySqlQuery;
1025pub use batch_nested_loop_join::BatchNestedLoopJoin;
1026pub use batch_over_window::BatchOverWindow;
1027pub use batch_postgres_query::BatchPostgresQuery;
1028pub use batch_project::BatchProject;
1029pub use batch_project_set::BatchProjectSet;
1030pub use batch_seq_scan::BatchSeqScan;
1031pub use batch_simple_agg::BatchSimpleAgg;
1032pub use batch_sort::BatchSort;
1033pub use batch_sort_agg::BatchSortAgg;
1034pub use batch_source::BatchSource;
1035pub use batch_sys_seq_scan::BatchSysSeqScan;
1036pub use batch_table_function::BatchTableFunction;
1037pub use batch_topn::BatchTopN;
1038pub use batch_union::BatchUnion;
1039pub use batch_update::BatchUpdate;
1040pub use batch_values::BatchValues;
1041pub use logical_agg::LogicalAgg;
1042pub use logical_apply::LogicalApply;
1043pub use logical_cdc_scan::LogicalCdcScan;
1044pub use logical_changelog::LogicalChangeLog;
1045pub use logical_cte_ref::LogicalCteRef;
1046pub use logical_dedup::LogicalDedup;
1047pub use logical_delete::LogicalDelete;
1048pub use logical_except::LogicalExcept;
1049pub use logical_expand::LogicalExpand;
1050pub use logical_file_scan::LogicalFileScan;
1051pub use logical_filter::LogicalFilter;
1052pub use logical_hop_window::LogicalHopWindow;
1053pub use logical_iceberg_scan::LogicalIcebergScan;
1054pub use logical_insert::LogicalInsert;
1055pub use logical_intersect::LogicalIntersect;
1056pub use logical_join::LogicalJoin;
1057pub use logical_kafka_scan::LogicalKafkaScan;
1058pub use logical_limit::LogicalLimit;
1059pub use logical_max_one_row::LogicalMaxOneRow;
1060pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder};
1061pub use logical_mysql_query::LogicalMySqlQuery;
1062pub use logical_now::LogicalNow;
1063pub use logical_over_window::LogicalOverWindow;
1064pub use logical_postgres_query::LogicalPostgresQuery;
1065pub use logical_project::LogicalProject;
1066pub use logical_project_set::LogicalProjectSet;
1067pub use logical_recursive_union::LogicalRecursiveUnion;
1068pub use logical_scan::LogicalScan;
1069pub use logical_share::LogicalShare;
1070pub use logical_source::LogicalSource;
1071pub use logical_sys_scan::LogicalSysScan;
1072pub use logical_table_function::LogicalTableFunction;
1073pub use logical_topn::LogicalTopN;
1074pub use logical_union::LogicalUnion;
1075pub use logical_update::LogicalUpdate;
1076pub use logical_values::LogicalValues;
1077pub use stream_asof_join::StreamAsOfJoin;
1078pub use stream_cdc_table_scan::StreamCdcTableScan;
1079pub use stream_changelog::StreamChangeLog;
1080pub use stream_dedup::StreamDedup;
1081pub use stream_delta_join::StreamDeltaJoin;
1082pub use stream_dml::StreamDml;
1083pub use stream_dynamic_filter::StreamDynamicFilter;
1084pub use stream_eowc_over_window::StreamEowcOverWindow;
1085pub use stream_exchange::StreamExchange;
1086pub use stream_expand::StreamExpand;
1087pub use stream_filter::StreamFilter;
1088pub use stream_fs_fetch::StreamFsFetch;
1089pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
1090pub use stream_group_topn::StreamGroupTopN;
1091pub use stream_hash_agg::StreamHashAgg;
1092pub use stream_hash_join::StreamHashJoin;
1093pub use stream_hop_window::StreamHopWindow;
1094use stream_join_common::StreamJoinCommon;
1095pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
1096pub use stream_materialize::StreamMaterialize;
1097pub use stream_now::StreamNow;
1098pub use stream_over_window::StreamOverWindow;
1099pub use stream_project::StreamProject;
1100pub use stream_project_set::StreamProjectSet;
1101pub use stream_row_id_gen::StreamRowIdGen;
1102pub use stream_row_merge::StreamRowMerge;
1103pub use stream_share::StreamShare;
1104pub use stream_simple_agg::StreamSimpleAgg;
1105pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
1106pub use stream_sort::StreamEowcSort;
1107pub use stream_source::StreamSource;
1108pub use stream_source_scan::StreamSourceScan;
1109pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
1110pub use stream_sync_log_store::StreamSyncLogStore;
1111pub use stream_table_scan::StreamTableScan;
1112pub use stream_temporal_join::StreamTemporalJoin;
1113pub use stream_topn::StreamTopN;
1114pub use stream_union::StreamUnion;
1115pub use stream_values::StreamValues;
1116pub use stream_watermark_filter::StreamWatermarkFilter;
1117
1118use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
1119use crate::optimizer::optimizer_context::OptimizerContextRef;
1120use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
1121use crate::optimizer::plan_rewriter::PlanCloner;
1122use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
1123use crate::scheduler::SchedulerResult;
1124use crate::stream_fragmenter::BuildFragmentGraphState;
1125use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};
1126
1127#[macro_export]
1140macro_rules! for_all_plan_nodes {
1141 ($macro:ident) => {
1142 $macro! {
1143 { Logical, Agg }
1144 , { Logical, Apply }
1145 , { Logical, Filter }
1146 , { Logical, Project }
1147 , { Logical, Scan }
1148 , { Logical, CdcScan }
1149 , { Logical, SysScan }
1150 , { Logical, Source }
1151 , { Logical, Insert }
1152 , { Logical, Delete }
1153 , { Logical, Update }
1154 , { Logical, Join }
1155 , { Logical, Values }
1156 , { Logical, Limit }
1157 , { Logical, TopN }
1158 , { Logical, HopWindow }
1159 , { Logical, TableFunction }
1160 , { Logical, MultiJoin }
1161 , { Logical, Expand }
1162 , { Logical, ProjectSet }
1163 , { Logical, Union }
1164 , { Logical, OverWindow }
1165 , { Logical, Share }
1166 , { Logical, Now }
1167 , { Logical, Dedup }
1168 , { Logical, Intersect }
1169 , { Logical, Except }
1170 , { Logical, MaxOneRow }
1171 , { Logical, KafkaScan }
1172 , { Logical, IcebergScan }
1173 , { Logical, RecursiveUnion }
1174 , { Logical, CteRef }
1175 , { Logical, ChangeLog }
1176 , { Logical, FileScan }
1177 , { Logical, PostgresQuery }
1178 , { Logical, MySqlQuery }
1179 , { Batch, SimpleAgg }
1180 , { Batch, HashAgg }
1181 , { Batch, SortAgg }
1182 , { Batch, Project }
1183 , { Batch, Filter }
1184 , { Batch, Insert }
1185 , { Batch, Delete }
1186 , { Batch, Update }
1187 , { Batch, SeqScan }
1188 , { Batch, SysSeqScan }
1189 , { Batch, LogSeqScan }
1190 , { Batch, HashJoin }
1191 , { Batch, NestedLoopJoin }
1192 , { Batch, Values }
1193 , { Batch, Sort }
1194 , { Batch, Exchange }
1195 , { Batch, Limit }
1196 , { Batch, TopN }
1197 , { Batch, HopWindow }
1198 , { Batch, TableFunction }
1199 , { Batch, Expand }
1200 , { Batch, LookupJoin }
1201 , { Batch, ProjectSet }
1202 , { Batch, Union }
1203 , { Batch, GroupTopN }
1204 , { Batch, Source }
1205 , { Batch, OverWindow }
1206 , { Batch, MaxOneRow }
1207 , { Batch, KafkaScan }
1208 , { Batch, IcebergScan }
1209 , { Batch, FileScan }
1210 , { Batch, PostgresQuery }
1211 , { Batch, MySqlQuery }
1212 , { Stream, Project }
1213 , { Stream, Filter }
1214 , { Stream, TableScan }
1215 , { Stream, CdcTableScan }
1216 , { Stream, Sink }
1217 , { Stream, Source }
1218 , { Stream, SourceScan }
1219 , { Stream, HashJoin }
1220 , { Stream, Exchange }
1221 , { Stream, HashAgg }
1222 , { Stream, SimpleAgg }
1223 , { Stream, StatelessSimpleAgg }
1224 , { Stream, Materialize }
1225 , { Stream, TopN }
1226 , { Stream, HopWindow }
1227 , { Stream, DeltaJoin }
1228 , { Stream, Expand }
1229 , { Stream, DynamicFilter }
1230 , { Stream, ProjectSet }
1231 , { Stream, GroupTopN }
1232 , { Stream, Union }
1233 , { Stream, RowIdGen }
1234 , { Stream, Dml }
1235 , { Stream, Now }
1236 , { Stream, Share }
1237 , { Stream, WatermarkFilter }
1238 , { Stream, TemporalJoin }
1239 , { Stream, Values }
1240 , { Stream, Dedup }
1241 , { Stream, EowcOverWindow }
1242 , { Stream, EowcSort }
1243 , { Stream, OverWindow }
1244 , { Stream, FsFetch }
1245 , { Stream, ChangeLog }
1246 , { Stream, GlobalApproxPercentile }
1247 , { Stream, LocalApproxPercentile }
1248 , { Stream, RowMerge }
1249 , { Stream, AsOfJoin }
1250 , { Stream, SyncLogStore }
1251 }
1252 };
1253}
1254
1255#[macro_export]
1257macro_rules! for_logical_plan_nodes {
1258 ($macro:ident) => {
1259 $macro! {
1260 { Logical, Agg }
1261 , { Logical, Apply }
1262 , { Logical, Filter }
1263 , { Logical, Project }
1264 , { Logical, Scan }
1265 , { Logical, CdcScan }
1266 , { Logical, SysScan }
1267 , { Logical, Source }
1268 , { Logical, Insert }
1269 , { Logical, Delete }
1270 , { Logical, Update }
1271 , { Logical, Join }
1272 , { Logical, Values }
1273 , { Logical, Limit }
1274 , { Logical, TopN }
1275 , { Logical, HopWindow }
1276 , { Logical, TableFunction }
1277 , { Logical, MultiJoin }
1278 , { Logical, Expand }
1279 , { Logical, ProjectSet }
1280 , { Logical, Union }
1281 , { Logical, OverWindow }
1282 , { Logical, Share }
1283 , { Logical, Now }
1284 , { Logical, Dedup }
1285 , { Logical, Intersect }
1286 , { Logical, Except }
1287 , { Logical, MaxOneRow }
1288 , { Logical, KafkaScan }
1289 , { Logical, IcebergScan }
1290 , { Logical, RecursiveUnion }
1291 , { Logical, CteRef }
1292 , { Logical, ChangeLog }
1293 , { Logical, FileScan }
1294 , { Logical, PostgresQuery }
1295 , { Logical, MySqlQuery }
1296 }
1297 };
1298}
1299
1300#[macro_export]
1302macro_rules! for_batch_plan_nodes {
1303 ($macro:ident) => {
1304 $macro! {
1305 { Batch, SimpleAgg }
1306 , { Batch, HashAgg }
1307 , { Batch, SortAgg }
1308 , { Batch, Project }
1309 , { Batch, Filter }
1310 , { Batch, SeqScan }
1311 , { Batch, SysSeqScan }
1312 , { Batch, LogSeqScan }
1313 , { Batch, HashJoin }
1314 , { Batch, NestedLoopJoin }
1315 , { Batch, Values }
1316 , { Batch, Limit }
1317 , { Batch, Sort }
1318 , { Batch, TopN }
1319 , { Batch, Exchange }
1320 , { Batch, Insert }
1321 , { Batch, Delete }
1322 , { Batch, Update }
1323 , { Batch, HopWindow }
1324 , { Batch, TableFunction }
1325 , { Batch, Expand }
1326 , { Batch, LookupJoin }
1327 , { Batch, ProjectSet }
1328 , { Batch, Union }
1329 , { Batch, GroupTopN }
1330 , { Batch, Source }
1331 , { Batch, OverWindow }
1332 , { Batch, MaxOneRow }
1333 , { Batch, KafkaScan }
1334 , { Batch, IcebergScan }
1335 , { Batch, FileScan }
1336 , { Batch, PostgresQuery }
1337 , { Batch, MySqlQuery }
1338 }
1339 };
1340}
1341
1342#[macro_export]
1344macro_rules! for_stream_plan_nodes {
1345 ($macro:ident) => {
1346 $macro! {
1347 { Stream, Project }
1348 , { Stream, Filter }
1349 , { Stream, HashJoin }
1350 , { Stream, Exchange }
1351 , { Stream, TableScan }
1352 , { Stream, CdcTableScan }
1353 , { Stream, Sink }
1354 , { Stream, Source }
1355 , { Stream, SourceScan }
1356 , { Stream, HashAgg }
1357 , { Stream, SimpleAgg }
1358 , { Stream, StatelessSimpleAgg }
1359 , { Stream, Materialize }
1360 , { Stream, TopN }
1361 , { Stream, HopWindow }
1362 , { Stream, DeltaJoin }
1363 , { Stream, Expand }
1364 , { Stream, DynamicFilter }
1365 , { Stream, ProjectSet }
1366 , { Stream, GroupTopN }
1367 , { Stream, Union }
1368 , { Stream, RowIdGen }
1369 , { Stream, Dml }
1370 , { Stream, Now }
1371 , { Stream, Share }
1372 , { Stream, WatermarkFilter }
1373 , { Stream, TemporalJoin }
1374 , { Stream, Values }
1375 , { Stream, Dedup }
1376 , { Stream, EowcOverWindow }
1377 , { Stream, EowcSort }
1378 , { Stream, OverWindow }
1379 , { Stream, FsFetch }
1380 , { Stream, ChangeLog }
1381 , { Stream, GlobalApproxPercentile }
1382 , { Stream, LocalApproxPercentile }
1383 , { Stream, RowMerge }
1384 , { Stream, AsOfJoin }
1385 , { Stream, SyncLogStore }
1386 }
1387 };
1388}
1389
1390macro_rules! impl_plan_node_meta {
1392 ($( { $convention:ident, $name:ident }),*) => {
1393 paste!{
1394 #[derive(Copy, Clone, PartialEq, Debug, Hash, Eq, Serialize)]
1396 pub enum PlanNodeType {
1397 $( [<$convention $name>] ),*
1398 }
1399
1400 $(impl PlanNodeMeta for [<$convention $name>] {
1401 type Convention = $convention;
1402 const NODE_TYPE: PlanNodeType = PlanNodeType::[<$convention $name>];
1403
1404 fn plan_base(&self) -> &PlanBase<$convention> {
1405 &self.base
1406 }
1407
1408 fn plan_base_ref(&self) -> PlanBaseRef<'_> {
1409 PlanBaseRef::$convention(&self.base)
1410 }
1411 }
1412
1413 impl Deref for [<$convention $name>] {
1414 type Target = PlanBase<$convention>;
1415
1416 fn deref(&self) -> &Self::Target {
1417 &self.base
1418 }
1419 })*
1420 }
1421 }
1422}
1423
1424for_all_plan_nodes! { impl_plan_node_meta }
1425
1426macro_rules! impl_plan_node {
1427 ($({ $convention:ident, $name:ident }),*) => {
1428 paste!{
1429 $(impl PlanNode for [<$convention $name>] { })*
1430 }
1431 }
1432}
1433
1434for_all_plan_nodes! { impl_plan_node }
1435
1436macro_rules! impl_down_cast_fn {
1438 ($( { $convention:ident, $name:ident }),*) => {
1439 paste!{
1440 impl dyn PlanNode {
1441 $( pub fn [< as_$convention:snake _ $name:snake>](&self) -> Option<&[<$convention $name>]> {
1442 self.downcast_ref::<[<$convention $name>]>()
1443 } )*
1444 }
1445 }
1446 }
1447}
1448
1449for_all_plan_nodes! { impl_down_cast_fn }