1use std::collections::HashMap;
31use std::fmt::Debug;
32use std::hash::Hash;
33use std::marker::PhantomData;
34use std::ops::Deref;
35use std::rc::Rc;
36
37use downcast_rs::{Downcast, impl_downcast};
38use dyn_clone::DynClone;
39use itertools::Itertools;
40use paste::paste;
41use petgraph::dot::{Config, Dot};
42use petgraph::graph::Graph;
43use pretty_xmlish::{Pretty, PrettyConfig};
44use risingwave_common::catalog::Schema;
45use risingwave_common::util::recursive::{self, Recurse};
46use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
47use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
48use serde::Serialize;
49
50use self::batch::BatchPlanNodeMetadata;
51use self::generic::{GenericPlanRef, PhysicalPlanRef};
52use self::stream::StreamPlanNodeMetadata;
53use self::utils::Distill;
54use super::property::{
55 Distribution, FunctionalDependencySet, MonotonicityMap, Order, WatermarkColumns,
56};
57use crate::error::{ErrorCode, Result};
58use crate::optimizer::ExpressionSimplifyRewriter;
59use crate::optimizer::property::StreamKind;
60use crate::session::current::notice_to_user;
61use crate::utils::{PrettySerde, build_graph_from_pretty};
62
63pub trait ConventionMarker: 'static + Sized + Clone + Debug + Eq + PartialEq + Hash {
67 type Extra: 'static + Eq + Hash + Clone + Debug;
69 type ShareNode: ShareNode<Self>;
70 type PlanRefDyn: PlanNodeCommon<Self> + Eq + Hash + ?Sized;
71 type PlanNodeType;
72
73 fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode>;
74}
75
76pub trait ShareNode<C: ConventionMarker>:
77 AnyPlanNodeMeta<C> + PlanTreeNodeUnary<C> + 'static
78{
79 fn new_share(share: generic::Share<PlanRef<C>>) -> PlanRef<C>;
80 fn replace_input(&self, plan: PlanRef<C>);
81}
82
83pub struct NoShareNode<C: ConventionMarker>(!, PhantomData<C>);
84
85impl<C: ConventionMarker> ShareNode<C> for NoShareNode<C> {
86 fn new_share(_plan: generic::Share<PlanRef<C>>) -> PlanRef<C> {
87 unreachable!()
88 }
89
90 fn replace_input(&self, _plan: PlanRef<C>) {
91 unreachable!()
92 }
93}
94
95impl<C: ConventionMarker> PlanTreeNodeUnary<C> for NoShareNode<C> {
96 fn input(&self) -> PlanRef<C> {
97 unreachable!()
98 }
99
100 fn clone_with_input(&self, _input: PlanRef<C>) -> Self {
101 unreachable!()
102 }
103}
104
105impl<C: ConventionMarker> AnyPlanNodeMeta<C> for NoShareNode<C> {
106 fn node_type(&self) -> C::PlanNodeType {
107 unreachable!()
108 }
109
110 fn plan_base(&self) -> &PlanBase<C> {
111 unreachable!()
112 }
113}
114
115#[derive(Clone, Debug, Eq, PartialEq, Hash)]
117pub struct Logical;
118impl ConventionMarker for Logical {
119 type Extra = plan_base::NoExtra;
120 type PlanNodeType = LogicalPlanNodeType;
121 type PlanRefDyn = dyn LogicalPlanNode;
122 type ShareNode = LogicalShare;
123
124 fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
125 plan.as_logical_share()
126 }
127}
128
129#[derive(Clone, Debug, Eq, PartialEq, Hash)]
131pub struct Batch;
132impl ConventionMarker for Batch {
133 type Extra = plan_base::BatchExtra;
134 type PlanNodeType = BatchPlanNodeType;
135 type PlanRefDyn = dyn BatchPlanNode;
136 type ShareNode = NoShareNode<Batch>;
137
138 fn as_share(_plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
139 None
140 }
141}
142
143#[derive(Clone, Debug, Eq, PartialEq, Hash)]
145pub struct Stream;
146impl ConventionMarker for Stream {
147 type Extra = plan_base::StreamExtra;
148 type PlanNodeType = StreamPlanNodeType;
149 type PlanRefDyn = dyn StreamPlanNode;
150 type ShareNode = StreamShare;
151
152 fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
153 plan.as_stream_share()
154 }
155}
156
157pub trait PlanNodeMeta {
159 type Convention: ConventionMarker;
160 const NODE_TYPE: <Self::Convention as ConventionMarker>::PlanNodeType;
161 fn plan_base(&self) -> &PlanBase<Self::Convention>;
163}
164
165mod plan_node_meta {
167 use super::*;
168
169 pub trait AnyPlanNodeMeta<C: ConventionMarker> {
173 fn node_type(&self) -> C::PlanNodeType;
174 fn plan_base(&self) -> &PlanBase<C>;
175 }
176
177 impl<P> AnyPlanNodeMeta<P::Convention> for P
179 where
180 P: PlanNodeMeta,
181 {
182 fn node_type(&self) -> <P::Convention as ConventionMarker>::PlanNodeType {
183 P::NODE_TYPE
184 }
185
186 fn plan_base(&self) -> &PlanBase<P::Convention> {
187 <Self as PlanNodeMeta>::plan_base(self)
188 }
189 }
190}
191use plan_node_meta::AnyPlanNodeMeta;
192
193pub trait PlanNodeCommon<C: ConventionMarker> = PlanTreeNode<C>
194 + DynClone
195 + DynEq
196 + DynHash
197 + Distill
198 + Debug
199 + Downcast
200 + ExprRewritable<C>
201 + ExprVisitable
202 + AnyPlanNodeMeta<C>;
203
204pub trait StreamPlanNode: PlanNodeCommon<Stream> + TryToStreamPb {}
209pub trait BatchPlanNode:
210 PlanNodeCommon<Batch> + ToDistributedBatch + ToLocalBatch + TryToBatchPb
211{
212}
213pub trait LogicalPlanNode:
214 PlanNodeCommon<Logical> + ColPrunable + PredicatePushdown + ToBatch + ToStream
215{
216}
217
218macro_rules! impl_trait {
219 ($($convention:ident),+) => {
220 paste! {
221 $(
222 impl Hash for dyn [<$convention PlanNode>] {
223 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
224 self.dyn_hash(state);
225 }
226 }
227
228 impl PartialEq for dyn [<$convention PlanNode>] {
229 fn eq(&self, other: &Self) -> bool {
230 self.dyn_eq(other.as_dyn_eq())
231 }
232 }
233
234 impl Eq for dyn [<$convention PlanNode>] {}
235 )+
236 }
237 };
238}
239
240impl_trait!(Batch, Stream, Logical);
241impl_downcast!(BatchPlanNode);
242impl_downcast!(LogicalPlanNode);
243impl_downcast!(StreamPlanNode);
244
245#[allow(clippy::derived_hash_with_manual_eq)]
248#[derive(Debug, Eq, Hash)]
249pub struct PlanRef<C: ConventionMarker>(Rc<C::PlanRefDyn>);
250
251impl<C: ConventionMarker> Clone for PlanRef<C> {
252 fn clone(&self) -> Self {
253 Self(self.0.clone())
254 }
255}
256
257pub type LogicalPlanRef = PlanRef<Logical>;
258pub type StreamPlanRef = PlanRef<Stream>;
259pub type BatchPlanRef = PlanRef<Batch>;
260
261#[allow(clippy::op_ref)]
264impl<C: ConventionMarker> PartialEq for PlanRef<C> {
265 fn eq(&self, other: &Self) -> bool {
266 &self.0 == &other.0
267 }
268}
269
270impl<C: ConventionMarker> Deref for PlanRef<C> {
271 type Target = C::PlanRefDyn;
272
273 fn deref(&self) -> &Self::Target {
274 self.0.deref()
275 }
276}
277
278impl<T: LogicalPlanNode> From<T> for PlanRef<Logical> {
279 fn from(value: T) -> Self {
280 PlanRef(Rc::new(value) as _)
281 }
282}
283
284impl<T: StreamPlanNode> From<T> for PlanRef<Stream> {
285 fn from(value: T) -> Self {
286 PlanRef(Rc::new(value) as _)
287 }
288}
289
290impl<T: BatchPlanNode> From<T> for PlanRef<Batch> {
291 fn from(value: T) -> Self {
292 PlanRef(Rc::new(value) as _)
293 }
294}
295
296impl<C: ConventionMarker> Layer for PlanRef<C> {
297 type Sub = Self;
298
299 fn map<F>(self, f: F) -> Self
300 where
301 F: FnMut(Self::Sub) -> Self::Sub,
302 {
303 self.clone_root_with_inputs(&self.inputs().into_iter().map(f).collect_vec())
304 }
305
306 fn descent<F>(&self, f: F)
307 where
308 F: FnMut(&Self::Sub),
309 {
310 self.inputs().iter().for_each(f);
311 }
312}
313
314#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
315pub struct PlanNodeId(pub i32);
316
317pub trait EndoPlan: Endo<LogicalPlanRef> {
327 fn cached<F>(&mut self, plan: LogicalPlanRef, f: F) -> LogicalPlanRef
332 where
333 F: FnMut(&mut Self) -> LogicalPlanRef;
334
335 fn dag_apply(&mut self, plan: LogicalPlanRef) -> LogicalPlanRef {
336 match plan.as_logical_share() {
337 Some(_) => self.cached(plan.clone(), |this| this.tree_apply(plan.clone())),
338 None => self.tree_apply(plan),
339 }
340 }
341}
342
343pub trait VisitPlan: Visit<LogicalPlanRef> {
349 fn visited<F>(&mut self, plan: &LogicalPlanRef, f: F)
353 where
354 F: FnMut(&mut Self);
355
356 fn dag_visit(&mut self, plan: &LogicalPlanRef) {
357 match plan.as_logical_share() {
358 Some(_) => self.visited(plan, |this| this.tree_visit(plan)),
359 None => self.tree_visit(plan),
360 }
361 }
362}
363
364impl<C: ConventionMarker> PlanRef<C> {
365 pub fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef<C> {
366 let new = self.rewrite_exprs(r);
367 let inputs: Vec<PlanRef<C>> = new
368 .inputs()
369 .iter()
370 .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r))
371 .collect();
372 new.clone_root_with_inputs(&inputs[..])
373 }
374}
375
376pub(crate) trait VisitExprsRecursive {
377 fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor);
378}
379
380impl<C: ConventionMarker> VisitExprsRecursive for PlanRef<C> {
381 fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor) {
382 self.visit_exprs(r);
383 self.inputs()
384 .iter()
385 .for_each(|plan_ref| plan_ref.visit_exprs_recursive(r));
386 }
387}
388
389impl<C: ConventionMarker> PlanRef<C> {
390 pub fn expect_stream_key(&self) -> &[usize] {
391 self.stream_key().unwrap_or_else(|| {
392 panic!(
393 "a stream key is expected but not exist, plan:\n{}",
394 self.explain_to_string()
395 )
396 })
397 }
398}
399
400impl LogicalPlanRef {
401 fn prune_col_inner(
402 &self,
403 required_cols: &[usize],
404 ctx: &mut ColumnPruningContext,
405 ) -> LogicalPlanRef {
406 if let Some(logical_share) = self.as_logical_share() {
407 if let Some((new_share, merge_required_cols)) = ctx.get_share_cache(self.id()) {
410 if ctx.get_parent_num(logical_share) == 1 {
412 let input: LogicalPlanRef = logical_share.input();
413 return input.prune_col(required_cols, ctx);
414 }
415
416 if ctx.visit_share_at_first_round(self.id()) {
419 let new_logical_share: &LogicalShare = new_share
420 .as_logical_share()
421 .expect("must be share operator");
422 let new_share_input = new_logical_share.input().prune_col(
423 &(0..new_logical_share.base.schema().len()).collect_vec(),
424 ctx,
425 );
426 new_logical_share.replace_input(new_share_input);
427 }
428
429 let new_required_cols: Vec<usize> = required_cols
431 .iter()
432 .map(|col| merge_required_cols.iter().position(|x| x == col).unwrap())
433 .collect_vec();
434 let mapping = ColIndexMapping::with_remaining_columns(
435 &new_required_cols,
436 new_share.schema().len(),
437 );
438 return LogicalProject::with_mapping(new_share, mapping).into();
439 }
440
441 let parent_has_pushed = ctx.add_required_cols(self.id(), required_cols.into());
446 if parent_has_pushed == ctx.get_parent_num(logical_share) {
447 let merge_require_cols = ctx
448 .take_required_cols(self.id())
449 .expect("must have required columns")
450 .into_iter()
451 .flat_map(|x| x.into_iter())
452 .sorted()
453 .dedup()
454 .collect_vec();
455 let input: LogicalPlanRef = logical_share.input();
456 let input = input.prune_col(&merge_require_cols, ctx);
457
458 let new_logical_share = LogicalShare::create(input.clone());
460 ctx.add_share_cache(self.id(), new_logical_share, merge_require_cols.clone());
461
462 let exprs = logical_share
463 .base
464 .schema()
465 .fields
466 .iter()
467 .enumerate()
468 .map(|(i, field)| {
469 if let Some(pos) = merge_require_cols.iter().position(|x| *x == i) {
470 ExprImpl::InputRef(Box::new(InputRef::new(
471 pos,
472 field.data_type.clone(),
473 )))
474 } else {
475 ExprImpl::Literal(Box::new(Literal::new(None, field.data_type.clone())))
476 }
477 })
478 .collect_vec();
479 let project = LogicalProject::create(input, exprs);
480 logical_share.replace_input(project);
481 }
482 let mapping =
483 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
484 LogicalProject::with_mapping(self.clone(), mapping).into()
485 } else {
486 let dyn_t = self.deref();
488 dyn_t.prune_col(required_cols, ctx)
489 }
490 }
491
492 fn predicate_pushdown_inner(
493 &self,
494 predicate: Condition,
495 ctx: &mut PredicatePushdownContext,
496 ) -> LogicalPlanRef {
497 if let Some(logical_share) = self.as_logical_share() {
498 if ctx.get_parent_num(logical_share) == 1 {
500 let input: LogicalPlanRef = logical_share.input();
501 return input.predicate_pushdown(predicate, ctx);
502 }
503
504 let parent_has_pushed = ctx.add_predicate(self.id(), predicate.clone());
509 if parent_has_pushed == ctx.get_parent_num(logical_share) {
510 let merge_predicate = ctx
511 .take_predicate(self.id())
512 .expect("must have predicate")
513 .into_iter()
514 .map(|mut c| Condition {
515 conjunctions: c
516 .conjunctions
517 .extract_if(.., |e| {
518 let mut finder = ExprCorrelatedIdFinder::default();
521 finder.visit_expr(e);
522 e.count_nows() == 0
523 && e.is_pure()
524 && !finder.has_correlated_input_ref()
525 })
526 .collect(),
527 })
528 .reduce(|a, b| a.or(b))
529 .unwrap();
530
531 let mut expr_rewriter = ExpressionSimplifyRewriter {};
541 let mut new_predicate = Condition::true_cond();
542
543 for c in merge_predicate.conjunctions {
544 let c = Condition::with_expr(expr_rewriter.rewrite_cond(c));
545 new_predicate = new_predicate.and(c);
547 }
548
549 let input: LogicalPlanRef = logical_share.input();
550 let input = input.predicate_pushdown(new_predicate, ctx);
551 logical_share.replace_input(input);
552 }
553 LogicalFilter::create(self.clone(), predicate)
554 } else {
555 let dyn_t = self.deref();
557 dyn_t.predicate_pushdown(predicate, ctx)
558 }
559 }
560}
561
562impl ColPrunable for LogicalPlanRef {
563 #[allow(clippy::let_and_return)]
564 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> LogicalPlanRef {
565 let res = self.prune_col_inner(required_cols, ctx);
566 #[cfg(debug_assertions)]
567 super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
568 "column pruning",
569 &LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
570 &res,
571 );
572 res
573 }
574}
575
576impl PredicatePushdown for LogicalPlanRef {
577 #[allow(clippy::let_and_return)]
578 fn predicate_pushdown(
579 &self,
580 predicate: Condition,
581 ctx: &mut PredicatePushdownContext,
582 ) -> LogicalPlanRef {
583 #[cfg(debug_assertions)]
584 let predicate_clone = predicate.clone();
585
586 let res = self.predicate_pushdown_inner(predicate, ctx);
587
588 #[cfg(debug_assertions)]
589 super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
590 "predicate push down",
591 &LogicalFilter::new(self.clone(), predicate_clone).into(),
592 &res,
593 );
594
595 res
596 }
597}
598
599impl<C: ConventionMarker> PlanRef<C> {
600 pub fn clone_root_with_inputs(&self, inputs: &[PlanRef<C>]) -> PlanRef<C> {
601 if let Some(share) = self.as_share_node() {
602 assert_eq!(inputs.len(), 1);
603 share.replace_input(inputs[0].clone());
605 self.clone()
606 } else {
607 let dyn_t = self.deref();
609 dyn_t.clone_with_inputs(inputs)
610 }
611 }
612}
613
614impl<C: ConventionMarker> PlanRef<C> {
616 pub fn node_type(&self) -> C::PlanNodeType {
617 self.0.node_type()
618 }
619
620 pub fn plan_base(&self) -> &PlanBase<C> {
621 self.0.plan_base()
622 }
623}
624
625impl<C: ConventionMarker> GenericPlanRef for PlanRef<C> {
628 fn id(&self) -> PlanNodeId {
629 self.plan_base().id()
630 }
631
632 fn schema(&self) -> &Schema {
633 self.plan_base().schema()
634 }
635
636 fn stream_key(&self) -> Option<&[usize]> {
637 self.plan_base().stream_key()
638 }
639
640 fn ctx(&self) -> OptimizerContextRef {
641 self.plan_base().ctx()
642 }
643
644 fn functional_dependency(&self) -> &FunctionalDependencySet {
645 self.plan_base().functional_dependency()
646 }
647}
648
649impl PhysicalPlanRef for BatchPlanRef {
652 fn distribution(&self) -> &Distribution {
653 self.plan_base().distribution()
654 }
655}
656
657impl PhysicalPlanRef for StreamPlanRef {
658 fn distribution(&self) -> &Distribution {
659 self.plan_base().distribution()
660 }
661}
662
663impl StreamPlanNodeMetadata for StreamPlanRef {
666 fn stream_kind(&self) -> StreamKind {
667 self.plan_base().stream_kind()
668 }
669
670 fn emit_on_window_close(&self) -> bool {
671 self.plan_base().emit_on_window_close()
672 }
673
674 fn watermark_columns(&self) -> &WatermarkColumns {
675 self.plan_base().watermark_columns()
676 }
677
678 fn columns_monotonicity(&self) -> &MonotonicityMap {
679 self.plan_base().columns_monotonicity()
680 }
681}
682
683impl BatchPlanNodeMetadata for BatchPlanRef {
686 fn order(&self) -> &Order {
687 self.plan_base().order()
688 }
689}
690
691pub fn reorganize_elements_id<C: ConventionMarker>(plan: PlanRef<C>) -> PlanRef<C> {
695 let backup = plan.ctx().backup_elem_ids();
696 plan.ctx().reset_elem_ids();
697 let plan = PlanCloner::clone_whole_plan(plan);
698 plan.ctx().restore_elem_ids(backup);
699 plan
700}
701
702pub trait Explain {
703 fn explain<'a>(&self) -> Pretty<'a>;
705
706 fn explain_with_id<'a>(&self) -> Pretty<'a>;
708
709 fn explain_to_string(&self) -> String;
711
712 fn explain_to_json(&self) -> String;
714
715 fn explain_to_xml(&self) -> String;
717
718 fn explain_to_yaml(&self) -> String;
720
721 fn explain_to_dot(&self) -> String;
723}
724
725impl<C: ConventionMarker> Explain for PlanRef<C> {
726 fn explain<'a>(&self) -> Pretty<'a> {
728 let mut node = self.distill();
729 let inputs = self.inputs();
730 for input in inputs.iter().peekable() {
731 node.children.push(input.explain());
732 }
733 Pretty::Record(node)
734 }
735
736 fn explain_with_id<'a>(&self) -> Pretty<'a> {
738 let node_id = self.id();
739 let mut node = self.distill();
740 node.fields
743 .insert(0, ("id".into(), Pretty::display(&node_id.0)));
744 let inputs = self.inputs();
745 for input in inputs.iter().peekable() {
746 node.children.push(input.explain_with_id());
747 }
748 Pretty::Record(node)
749 }
750
751 fn explain_to_string(&self) -> String {
753 let plan = reorganize_elements_id(self.clone());
754
755 let mut output = String::with_capacity(2048);
756 let mut config = pretty_config();
757 config.unicode(&mut output, &plan.explain());
758 output
759 }
760
761 fn explain_to_json(&self) -> String {
763 let plan = reorganize_elements_id(self.clone());
764 let explain_ir = plan.explain();
765 serde_json::to_string_pretty(&PrettySerde(explain_ir, true))
766 .expect("failed to serialize plan to json")
767 }
768
769 fn explain_to_xml(&self) -> String {
771 let plan = reorganize_elements_id(self.clone());
772 let explain_ir = plan.explain();
773 quick_xml::se::to_string(&PrettySerde(explain_ir, true))
774 .expect("failed to serialize plan to xml")
775 }
776
777 fn explain_to_yaml(&self) -> String {
779 let plan = reorganize_elements_id(self.clone());
780 let explain_ir = plan.explain();
781 serde_yaml::to_string(&PrettySerde(explain_ir, true))
782 .expect("failed to serialize plan to yaml")
783 }
784
785 fn explain_to_dot(&self) -> String {
787 let plan = reorganize_elements_id(self.clone());
788 let explain_ir = plan.explain_with_id();
789 let mut graph = Graph::<String, String>::new();
790 let mut nodes = HashMap::new();
791 build_graph_from_pretty(&explain_ir, &mut graph, &mut nodes, None);
792 let dot = Dot::with_config(&graph, &[Config::EdgeNoLabel]);
793 dot.to_string()
794 }
795}
796
797impl<C: ConventionMarker> PlanRef<C> {
798 pub fn as_share_node(&self) -> Option<&C::ShareNode> {
799 C::as_share(self)
800 }
801}
802
803pub(crate) fn pretty_config() -> PrettyConfig {
804 PrettyConfig {
805 indent: 3,
806 need_boundaries: false,
807 width: 2048,
808 reduced_spaces: true,
809 }
810}
811
812macro_rules! impl_generic_plan_ref_method {
813 ($($convention:ident),+) => {
814 paste! {
815 $(
816 impl dyn [<$convention PlanNode>] {
818 pub fn id(&self) -> PlanNodeId {
819 self.plan_base().id()
820 }
821
822 pub fn ctx(&self) -> OptimizerContextRef {
823 self.plan_base().ctx().clone()
824 }
825
826 pub fn schema(&self) -> &Schema {
827 self.plan_base().schema()
828 }
829
830 pub fn stream_key(&self) -> Option<&[usize]> {
831 self.plan_base().stream_key()
832 }
833
834 pub fn functional_dependency(&self) -> &FunctionalDependencySet {
835 self.plan_base().functional_dependency()
836 }
837
838 pub fn explain_myself_to_string(&self) -> String {
839 self.distill_to_string()
840 }
841 }
842 )+
843 }
844 };
845}
846
847impl_generic_plan_ref_method!(Batch, Stream, Logical);
848
849pub const PLAN_DEPTH_THRESHOLD: usize = 30;
851pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
853Consider simplifying or splitting the query if you encounter any issues.";
854
855impl dyn StreamPlanNode {
856 pub fn to_stream_prost(
861 &self,
862 state: &mut BuildFragmentGraphState,
863 ) -> SchedulerResult<PbStreamPlan> {
864 recursive::tracker!().recurse(|t| {
865 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
866 notice_to_user(PLAN_TOO_DEEP_NOTICE);
867 }
868
869 use stream::prelude::*;
870
871 if let Some(stream_table_scan) = self.as_stream_table_scan() {
872 return stream_table_scan.adhoc_to_stream_prost(state);
873 }
874 if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
875 return stream_cdc_table_scan.adhoc_to_stream_prost(state);
876 }
877 if let Some(stream_source_scan) = self.as_stream_source_scan() {
878 return stream_source_scan.adhoc_to_stream_prost(state);
879 }
880 if let Some(stream_share) = self.as_stream_share() {
881 return stream_share.adhoc_to_stream_prost(state);
882 }
883
884 let node = Some(self.try_to_stream_prost_body(state)?);
885 let input = self
886 .inputs()
887 .into_iter()
888 .map(|plan| plan.to_stream_prost(state))
889 .try_collect()?;
890 Ok(PbStreamPlan {
892 input,
893 identity: self.explain_myself_to_string(),
894 node_body: node,
895 operator_id: self.id().0 as _,
896 stream_key: self
897 .stream_key()
898 .unwrap_or_default()
899 .iter()
900 .map(|x| *x as u32)
901 .collect(),
902 fields: self.schema().to_prost(),
903 append_only: self.plan_base().append_only(),
904 })
905 })
906 }
907}
908
909impl dyn BatchPlanNode {
910 pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
912 self.to_batch_prost_identity(true)
913 }
914
915 pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
918 recursive::tracker!().recurse(|t| {
919 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
920 notice_to_user(PLAN_TOO_DEEP_NOTICE);
921 }
922
923 let node_body = Some(self.try_to_batch_prost_body()?);
924 let children = self
925 .inputs()
926 .into_iter()
927 .map(|plan| plan.to_batch_prost_identity(identity))
928 .try_collect()?;
929 Ok(PbBatchPlan {
930 children,
931 identity: if identity {
932 self.explain_myself_to_string()
933 } else {
934 "".into()
935 },
936 node_body,
937 })
938 })
939 }
940}
941
942mod plan_base;
943pub use plan_base::*;
944#[macro_use]
945mod plan_tree_node;
946pub use plan_tree_node::*;
947mod col_pruning;
948pub use col_pruning::*;
949mod expr_rewritable;
950pub use expr_rewritable::*;
951mod expr_visitable;
952
953mod convert;
954pub use convert::*;
955mod eq_join_predicate;
956pub use eq_join_predicate::*;
957mod to_prost;
958pub use to_prost::*;
959mod predicate_pushdown;
960pub use predicate_pushdown::*;
961mod merge_eq_nodes;
962pub use merge_eq_nodes::*;
963
964pub mod batch;
965pub mod generic;
966pub mod stream;
967
968pub use generic::{PlanAggCall, PlanAggCallDisplay};
969
970mod batch_delete;
971mod batch_exchange;
972mod batch_expand;
973mod batch_filter;
974mod batch_group_topn;
975mod batch_hash_agg;
976mod batch_hash_join;
977mod batch_hop_window;
978mod batch_insert;
979mod batch_limit;
980mod batch_log_seq_scan;
981mod batch_lookup_join;
982mod batch_max_one_row;
983mod batch_nested_loop_join;
984mod batch_over_window;
985mod batch_project;
986mod batch_project_set;
987mod batch_seq_scan;
988mod batch_simple_agg;
989mod batch_sort;
990mod batch_sort_agg;
991mod batch_source;
992mod batch_sys_seq_scan;
993mod batch_table_function;
994mod batch_topn;
995mod batch_union;
996mod batch_update;
997mod batch_values;
998mod logical_agg;
999mod logical_apply;
1000mod logical_cdc_scan;
1001mod logical_changelog;
1002mod logical_cte_ref;
1003mod logical_dedup;
1004mod logical_delete;
1005mod logical_except;
1006mod logical_expand;
1007mod logical_filter;
1008mod logical_hop_window;
1009mod logical_insert;
1010mod logical_intersect;
1011mod logical_join;
1012mod logical_kafka_scan;
1013mod logical_limit;
1014mod logical_max_one_row;
1015mod logical_multi_join;
1016mod logical_now;
1017mod logical_over_window;
1018mod logical_project;
1019mod logical_project_set;
1020mod logical_recursive_union;
1021mod logical_scan;
1022mod logical_share;
1023mod logical_source;
1024mod logical_sys_scan;
1025mod logical_table_function;
1026mod logical_topn;
1027mod logical_union;
1028mod logical_update;
1029mod logical_values;
1030mod stream_asof_join;
1031mod stream_changelog;
1032mod stream_dedup;
1033mod stream_delta_join;
1034mod stream_dml;
1035mod stream_dynamic_filter;
1036mod stream_eowc_over_window;
1037mod stream_exchange;
1038mod stream_expand;
1039mod stream_filter;
1040mod stream_fs_fetch;
1041mod stream_global_approx_percentile;
1042mod stream_group_topn;
1043mod stream_hash_agg;
1044mod stream_hash_join;
1045mod stream_hop_window;
1046mod stream_join_common;
1047mod stream_local_approx_percentile;
1048mod stream_materialize;
1049mod stream_materialized_exprs;
1050mod stream_now;
1051mod stream_over_window;
1052mod stream_project;
1053mod stream_project_set;
1054mod stream_row_id_gen;
1055mod stream_row_merge;
1056mod stream_simple_agg;
1057mod stream_sink;
1058mod stream_sort;
1059mod stream_source;
1060mod stream_source_scan;
1061mod stream_stateless_simple_agg;
1062mod stream_sync_log_store;
1063mod stream_table_scan;
1064mod stream_topn;
1065mod stream_values;
1066mod stream_watermark_filter;
1067
1068mod batch_file_scan;
1069mod batch_iceberg_scan;
1070mod batch_kafka_scan;
1071mod batch_postgres_query;
1072
1073mod batch_mysql_query;
1074mod derive;
1075mod logical_file_scan;
1076mod logical_iceberg_scan;
1077mod logical_postgres_query;
1078
1079mod logical_mysql_query;
1080mod stream_cdc_table_scan;
1081mod stream_share;
1082mod stream_temporal_join;
1083mod stream_union;
1084pub mod utils;
1085
1086pub use batch_delete::BatchDelete;
1087pub use batch_exchange::BatchExchange;
1088pub use batch_expand::BatchExpand;
1089pub use batch_file_scan::BatchFileScan;
1090pub use batch_filter::BatchFilter;
1091pub use batch_group_topn::BatchGroupTopN;
1092pub use batch_hash_agg::BatchHashAgg;
1093pub use batch_hash_join::BatchHashJoin;
1094pub use batch_hop_window::BatchHopWindow;
1095pub use batch_iceberg_scan::BatchIcebergScan;
1096pub use batch_insert::BatchInsert;
1097pub use batch_kafka_scan::BatchKafkaScan;
1098pub use batch_limit::BatchLimit;
1099pub use batch_log_seq_scan::BatchLogSeqScan;
1100pub use batch_lookup_join::BatchLookupJoin;
1101pub use batch_max_one_row::BatchMaxOneRow;
1102pub use batch_mysql_query::BatchMySqlQuery;
1103pub use batch_nested_loop_join::BatchNestedLoopJoin;
1104pub use batch_over_window::BatchOverWindow;
1105pub use batch_postgres_query::BatchPostgresQuery;
1106pub use batch_project::BatchProject;
1107pub use batch_project_set::BatchProjectSet;
1108pub use batch_seq_scan::BatchSeqScan;
1109pub use batch_simple_agg::BatchSimpleAgg;
1110pub use batch_sort::BatchSort;
1111pub use batch_sort_agg::BatchSortAgg;
1112pub use batch_source::BatchSource;
1113pub use batch_sys_seq_scan::BatchSysSeqScan;
1114pub use batch_table_function::BatchTableFunction;
1115pub use batch_topn::BatchTopN;
1116pub use batch_union::BatchUnion;
1117pub use batch_update::BatchUpdate;
1118pub use batch_values::BatchValues;
1119pub use logical_agg::LogicalAgg;
1120pub use logical_apply::LogicalApply;
1121pub use logical_cdc_scan::LogicalCdcScan;
1122pub use logical_changelog::LogicalChangeLog;
1123pub use logical_cte_ref::LogicalCteRef;
1124pub use logical_dedup::LogicalDedup;
1125pub use logical_delete::LogicalDelete;
1126pub use logical_except::LogicalExcept;
1127pub use logical_expand::LogicalExpand;
1128pub use logical_file_scan::LogicalFileScan;
1129pub use logical_filter::LogicalFilter;
1130pub use logical_hop_window::LogicalHopWindow;
1131pub use logical_iceberg_scan::LogicalIcebergScan;
1132pub use logical_insert::LogicalInsert;
1133pub use logical_intersect::LogicalIntersect;
1134pub use logical_join::LogicalJoin;
1135pub use logical_kafka_scan::LogicalKafkaScan;
1136pub use logical_limit::LogicalLimit;
1137pub use logical_max_one_row::LogicalMaxOneRow;
1138pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder};
1139pub use logical_mysql_query::LogicalMySqlQuery;
1140pub use logical_now::LogicalNow;
1141pub use logical_over_window::LogicalOverWindow;
1142pub use logical_postgres_query::LogicalPostgresQuery;
1143pub use logical_project::LogicalProject;
1144pub use logical_project_set::LogicalProjectSet;
1145pub use logical_recursive_union::LogicalRecursiveUnion;
1146pub use logical_scan::LogicalScan;
1147pub use logical_share::LogicalShare;
1148pub use logical_source::LogicalSource;
1149pub use logical_sys_scan::LogicalSysScan;
1150pub use logical_table_function::LogicalTableFunction;
1151pub use logical_topn::LogicalTopN;
1152pub use logical_union::LogicalUnion;
1153pub use logical_update::LogicalUpdate;
1154pub use logical_values::LogicalValues;
1155pub use stream_asof_join::StreamAsOfJoin;
1156pub use stream_cdc_table_scan::StreamCdcTableScan;
1157pub use stream_changelog::StreamChangeLog;
1158pub use stream_dedup::StreamDedup;
1159pub use stream_delta_join::StreamDeltaJoin;
1160pub use stream_dml::StreamDml;
1161pub use stream_dynamic_filter::StreamDynamicFilter;
1162pub use stream_eowc_over_window::StreamEowcOverWindow;
1163pub use stream_exchange::StreamExchange;
1164pub use stream_expand::StreamExpand;
1165pub use stream_filter::StreamFilter;
1166pub use stream_fs_fetch::StreamFsFetch;
1167pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
1168pub use stream_group_topn::StreamGroupTopN;
1169pub use stream_hash_agg::StreamHashAgg;
1170pub use stream_hash_join::StreamHashJoin;
1171pub use stream_hop_window::StreamHopWindow;
1172use stream_join_common::StreamJoinCommon;
1173pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
1174pub use stream_materialize::StreamMaterialize;
1175pub use stream_materialized_exprs::StreamMaterializedExprs;
1176pub use stream_now::StreamNow;
1177pub use stream_over_window::StreamOverWindow;
1178pub use stream_project::StreamProject;
1179pub use stream_project_set::StreamProjectSet;
1180pub use stream_row_id_gen::StreamRowIdGen;
1181pub use stream_row_merge::StreamRowMerge;
1182pub use stream_share::StreamShare;
1183pub use stream_simple_agg::StreamSimpleAgg;
1184pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
1185pub use stream_sort::StreamEowcSort;
1186pub use stream_source::StreamSource;
1187pub use stream_source_scan::StreamSourceScan;
1188pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
1189pub use stream_sync_log_store::StreamSyncLogStore;
1190pub use stream_table_scan::StreamTableScan;
1191pub use stream_temporal_join::StreamTemporalJoin;
1192pub use stream_topn::StreamTopN;
1193pub use stream_union::StreamUnion;
1194pub use stream_values::StreamValues;
1195pub use stream_watermark_filter::StreamWatermarkFilter;
1196
1197use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
1198use crate::optimizer::optimizer_context::OptimizerContextRef;
1199use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
1200use crate::optimizer::plan_rewriter::PlanCloner;
1201use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
1202use crate::scheduler::SchedulerResult;
1203use crate::stream_fragmenter::BuildFragmentGraphState;
1204use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};
1205
1206#[macro_export]
1219macro_rules! for_all_plan_nodes {
1220 ($macro:path $(,$rest:tt)*) => {
1221 $macro! {
1222 { Logical, Agg }
1223 , { Logical, Apply }
1224 , { Logical, Filter }
1225 , { Logical, Project }
1226 , { Logical, Scan }
1227 , { Logical, CdcScan }
1228 , { Logical, SysScan }
1229 , { Logical, Source }
1230 , { Logical, Insert }
1231 , { Logical, Delete }
1232 , { Logical, Update }
1233 , { Logical, Join }
1234 , { Logical, Values }
1235 , { Logical, Limit }
1236 , { Logical, TopN }
1237 , { Logical, HopWindow }
1238 , { Logical, TableFunction }
1239 , { Logical, MultiJoin }
1240 , { Logical, Expand }
1241 , { Logical, ProjectSet }
1242 , { Logical, Union }
1243 , { Logical, OverWindow }
1244 , { Logical, Share }
1245 , { Logical, Now }
1246 , { Logical, Dedup }
1247 , { Logical, Intersect }
1248 , { Logical, Except }
1249 , { Logical, MaxOneRow }
1250 , { Logical, KafkaScan }
1251 , { Logical, IcebergScan }
1252 , { Logical, RecursiveUnion }
1253 , { Logical, CteRef }
1254 , { Logical, ChangeLog }
1255 , { Logical, FileScan }
1256 , { Logical, PostgresQuery }
1257 , { Logical, MySqlQuery }
1258 , { Batch, SimpleAgg }
1259 , { Batch, HashAgg }
1260 , { Batch, SortAgg }
1261 , { Batch, Project }
1262 , { Batch, Filter }
1263 , { Batch, Insert }
1264 , { Batch, Delete }
1265 , { Batch, Update }
1266 , { Batch, SeqScan }
1267 , { Batch, SysSeqScan }
1268 , { Batch, LogSeqScan }
1269 , { Batch, HashJoin }
1270 , { Batch, NestedLoopJoin }
1271 , { Batch, Values }
1272 , { Batch, Sort }
1273 , { Batch, Exchange }
1274 , { Batch, Limit }
1275 , { Batch, TopN }
1276 , { Batch, HopWindow }
1277 , { Batch, TableFunction }
1278 , { Batch, Expand }
1279 , { Batch, LookupJoin }
1280 , { Batch, ProjectSet }
1281 , { Batch, Union }
1282 , { Batch, GroupTopN }
1283 , { Batch, Source }
1284 , { Batch, OverWindow }
1285 , { Batch, MaxOneRow }
1286 , { Batch, KafkaScan }
1287 , { Batch, IcebergScan }
1288 , { Batch, FileScan }
1289 , { Batch, PostgresQuery }
1290 , { Batch, MySqlQuery }
1291 , { Stream, Project }
1292 , { Stream, Filter }
1293 , { Stream, TableScan }
1294 , { Stream, CdcTableScan }
1295 , { Stream, Sink }
1296 , { Stream, Source }
1297 , { Stream, SourceScan }
1298 , { Stream, HashJoin }
1299 , { Stream, Exchange }
1300 , { Stream, HashAgg }
1301 , { Stream, SimpleAgg }
1302 , { Stream, StatelessSimpleAgg }
1303 , { Stream, Materialize }
1304 , { Stream, TopN }
1305 , { Stream, HopWindow }
1306 , { Stream, DeltaJoin }
1307 , { Stream, Expand }
1308 , { Stream, DynamicFilter }
1309 , { Stream, ProjectSet }
1310 , { Stream, GroupTopN }
1311 , { Stream, Union }
1312 , { Stream, RowIdGen }
1313 , { Stream, Dml }
1314 , { Stream, Now }
1315 , { Stream, Share }
1316 , { Stream, WatermarkFilter }
1317 , { Stream, TemporalJoin }
1318 , { Stream, Values }
1319 , { Stream, Dedup }
1320 , { Stream, EowcOverWindow }
1321 , { Stream, EowcSort }
1322 , { Stream, OverWindow }
1323 , { Stream, FsFetch }
1324 , { Stream, ChangeLog }
1325 , { Stream, GlobalApproxPercentile }
1326 , { Stream, LocalApproxPercentile }
1327 , { Stream, RowMerge }
1328 , { Stream, AsOfJoin }
1329 , { Stream, SyncLogStore }
1330 , { Stream, MaterializedExprs }
1331 $(,$rest)*
1332 }
1333 };
1334}
1335
1336#[macro_export]
1337macro_rules! for_each_convention_all_plan_nodes {
1338 ($macro:path $(,$rest:tt)*) => {
1339 $crate::for_all_plan_nodes! {
1340 $crate::for_each_convention_all_plan_nodes
1341 , $macro
1342 $(,$rest)*
1343 }
1344 };
1345 (
1346 $( { Logical, $logical_name:ident } ),*
1347 , $( { Batch, $batch_name:ident } ),*
1348 , $( { Stream, $stream_name:ident } ),*
1349 , $macro:path $(,$rest:tt)*
1350 ) => {
1351 $macro! {
1352 {
1353 Logical, { $( $logical_name ),* },
1354 Batch, { $( $batch_name ),* },
1355 Stream, { $( $stream_name ),* }
1356 }
1357 $(,$rest)*
1358 }
1359 }
1360}
1361
1362macro_rules! impl_plan_node_meta {
1364 ({
1365 $( $convention:ident, { $( $name:ident ),* }),*
1366 }) => {
1367 paste!{
1368 $(
1369 #[derive(Copy, Clone, PartialEq, Debug, Hash, Eq, Serialize)]
1371 pub enum [<$convention PlanNodeType>] {
1372 $( [<$convention $name>] ),*
1373 }
1374 )*
1375 $(
1376 $(impl PlanNodeMeta for [<$convention $name>] {
1377 type Convention = $convention;
1378 const NODE_TYPE: [<$convention PlanNodeType>] = [<$convention PlanNodeType>]::[<$convention $name>];
1379
1380 fn plan_base(&self) -> &PlanBase<$convention> {
1381 &self.base
1382 }
1383 }
1384
1385 impl Deref for [<$convention $name>] {
1386 type Target = PlanBase<$convention>;
1387
1388 fn deref(&self) -> &Self::Target {
1389 &self.base
1390 }
1391 })*
1392 )*
1393 }
1394 }
1395}
1396
1397for_each_convention_all_plan_nodes! { impl_plan_node_meta }
1398
1399macro_rules! impl_plan_node {
1400 ($({ $convention:ident, $name:ident }),*) => {
1401 paste!{
1402 $(impl [<$convention PlanNode>] for [<$convention $name>] { })*
1403 }
1404 }
1405}
1406
1407for_all_plan_nodes! { impl_plan_node }
1408
1409macro_rules! impl_down_cast_fn {
1411 ({
1412 $( $convention:ident, { $( $name:ident ),* }),*
1413 }) => {
1414 paste!{
1415 $(
1416 impl dyn [<$convention PlanNode>] {
1417 $( pub fn [< as_ $convention:snake _ $name:snake>](&self) -> Option<&[<$convention $name>]> {
1418 self.downcast_ref::<[<$convention $name>]>()
1419 } )*
1420 }
1421 )*
1422 }
1423 }
1424}
1425
1426for_each_convention_all_plan_nodes! { impl_down_cast_fn }