1use std::collections::HashMap;
31use std::fmt::Debug;
32use std::hash::Hash;
33use std::marker::PhantomData;
34use std::ops::Deref;
35use std::rc::Rc;
36
37use downcast_rs::{Downcast, impl_downcast};
38use dyn_clone::DynClone;
39use itertools::Itertools;
40use paste::paste;
41use petgraph::dot::{Config, Dot};
42use petgraph::graph::Graph;
43use pretty_xmlish::{Pretty, PrettyConfig};
44use risingwave_common::catalog::Schema;
45use risingwave_common::util::recursive::{self, Recurse};
46use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
47use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
48use serde::Serialize;
49
50use self::batch::BatchPlanNodeMetadata;
51use self::generic::{GenericPlanRef, PhysicalPlanRef};
52use self::stream::StreamPlanNodeMetadata;
53use self::utils::Distill;
54use super::property::{
55 Distribution, FunctionalDependencySet, MonotonicityMap, Order, WatermarkColumns,
56};
57use crate::error::{ErrorCode, Result};
58use crate::optimizer::property::StreamKind;
59use crate::optimizer::{ExpressionSimplifyRewriter, PlanVisitor};
60use crate::session::current::notice_to_user;
61use crate::utils::{PrettySerde, build_graph_from_pretty};
62
63pub trait ConventionMarker: 'static + Sized + Clone + Debug + Eq + PartialEq + Hash {
67 type Extra: 'static + Eq + Hash + Clone + Debug;
69 type ShareNode: ShareNode<Self>;
70 type PlanRefDyn: PlanNodeCommon<Self> + Eq + Hash + ?Sized;
71 type PlanNodeType;
72
73 fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode>;
74}
75
76pub trait ShareNode<C: ConventionMarker>:
77 AnyPlanNodeMeta<C> + PlanTreeNodeUnary<C> + 'static
78{
79 fn new_share(share: generic::Share<PlanRef<C>>) -> PlanRef<C>;
80 fn replace_input(&self, plan: PlanRef<C>);
81}
82
83pub struct NoShareNode<C: ConventionMarker>(!, PhantomData<C>);
84
85impl<C: ConventionMarker> ShareNode<C> for NoShareNode<C> {
86 fn new_share(_plan: generic::Share<PlanRef<C>>) -> PlanRef<C> {
87 unreachable!()
88 }
89
90 fn replace_input(&self, _plan: PlanRef<C>) {
91 unreachable!()
92 }
93}
94
95impl<C: ConventionMarker> PlanTreeNodeUnary<C> for NoShareNode<C> {
96 fn input(&self) -> PlanRef<C> {
97 unreachable!()
98 }
99
100 fn clone_with_input(&self, _input: PlanRef<C>) -> Self {
101 unreachable!()
102 }
103}
104
105impl<C: ConventionMarker> AnyPlanNodeMeta<C> for NoShareNode<C> {
106 fn node_type(&self) -> C::PlanNodeType {
107 unreachable!()
108 }
109
110 fn plan_base(&self) -> &PlanBase<C> {
111 unreachable!()
112 }
113}
114
115#[derive(Clone, Debug, Eq, PartialEq, Hash)]
117pub struct Logical;
118impl ConventionMarker for Logical {
119 type Extra = plan_base::NoExtra;
120 type PlanNodeType = LogicalPlanNodeType;
121 type PlanRefDyn = dyn LogicalPlanNode;
122 type ShareNode = LogicalShare;
123
124 fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
125 plan.as_logical_share()
126 }
127}
128
129#[derive(Clone, Debug, Eq, PartialEq, Hash)]
131pub struct Batch;
132impl ConventionMarker for Batch {
133 type Extra = plan_base::BatchExtra;
134 type PlanNodeType = BatchPlanNodeType;
135 type PlanRefDyn = dyn BatchPlanNode;
136 type ShareNode = NoShareNode<Batch>;
137
138 fn as_share(_plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
139 None
140 }
141}
142
143#[derive(Clone, Debug, Eq, PartialEq, Hash)]
145pub struct Stream;
146impl ConventionMarker for Stream {
147 type Extra = plan_base::StreamExtra;
148 type PlanNodeType = StreamPlanNodeType;
149 type PlanRefDyn = dyn StreamPlanNode;
150 type ShareNode = StreamShare;
151
152 fn as_share(plan: &Self::PlanRefDyn) -> Option<&Self::ShareNode> {
153 plan.as_stream_share()
154 }
155}
156
157pub trait PlanNodeMeta {
159 type Convention: ConventionMarker;
160 const NODE_TYPE: <Self::Convention as ConventionMarker>::PlanNodeType;
161 fn plan_base(&self) -> &PlanBase<Self::Convention>;
163}
164
165mod plan_node_meta {
167 use super::*;
168
169 pub trait AnyPlanNodeMeta<C: ConventionMarker> {
173 fn node_type(&self) -> C::PlanNodeType;
174 fn plan_base(&self) -> &PlanBase<C>;
175 }
176
177 impl<P> AnyPlanNodeMeta<P::Convention> for P
179 where
180 P: PlanNodeMeta,
181 {
182 fn node_type(&self) -> <P::Convention as ConventionMarker>::PlanNodeType {
183 P::NODE_TYPE
184 }
185
186 fn plan_base(&self) -> &PlanBase<P::Convention> {
187 <Self as PlanNodeMeta>::plan_base(self)
188 }
189 }
190}
191use plan_node_meta::AnyPlanNodeMeta;
192
193pub trait PlanNodeCommon<C: ConventionMarker> = PlanTreeNode<C>
194 + DynClone
195 + DynEq
196 + DynHash
197 + Distill
198 + Debug
199 + Downcast
200 + ExprRewritable<C>
201 + ExprVisitable
202 + AnyPlanNodeMeta<C>;
203
204pub trait StreamPlanNode: PlanNodeCommon<Stream> + TryToStreamPb {}
209pub trait BatchPlanNode:
210 PlanNodeCommon<Batch> + ToDistributedBatch + ToLocalBatch + TryToBatchPb
211{
212}
213pub trait LogicalPlanNode:
214 PlanNodeCommon<Logical> + ColPrunable + PredicatePushdown + ToBatch + ToStream
215{
216}
217
218macro_rules! impl_trait {
219 ($($convention:ident),+) => {
220 paste! {
221 $(
222 impl Hash for dyn [<$convention PlanNode>] {
223 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
224 self.dyn_hash(state);
225 }
226 }
227
228 impl PartialEq for dyn [<$convention PlanNode>] {
229 fn eq(&self, other: &Self) -> bool {
230 self.dyn_eq(other.as_dyn_eq())
231 }
232 }
233
234 impl Eq for dyn [<$convention PlanNode>] {}
235 )+
236 }
237 };
238}
239
240impl_trait!(Batch, Stream, Logical);
241impl_downcast!(BatchPlanNode);
242impl_downcast!(LogicalPlanNode);
243impl_downcast!(StreamPlanNode);
244
245#[allow(clippy::derived_hash_with_manual_eq)]
248#[derive(Debug, Eq, Hash)]
249pub struct PlanRef<C: ConventionMarker>(Rc<C::PlanRefDyn>);
250
251impl<C: ConventionMarker> Clone for PlanRef<C> {
252 fn clone(&self) -> Self {
253 Self(self.0.clone())
254 }
255}
256
257pub type LogicalPlanRef = PlanRef<Logical>;
258pub type StreamPlanRef = PlanRef<Stream>;
259pub type BatchPlanRef = PlanRef<Batch>;
260
261#[allow(clippy::op_ref)]
264impl<C: ConventionMarker> PartialEq for PlanRef<C> {
265 fn eq(&self, other: &Self) -> bool {
266 &self.0 == &other.0
267 }
268}
269
270impl<C: ConventionMarker> Deref for PlanRef<C> {
271 type Target = C::PlanRefDyn;
272
273 fn deref(&self) -> &Self::Target {
274 self.0.deref()
275 }
276}
277
278impl<T: LogicalPlanNode> From<T> for PlanRef<Logical> {
279 fn from(value: T) -> Self {
280 PlanRef(Rc::new(value) as _)
281 }
282}
283
284impl<T: StreamPlanNode> From<T> for PlanRef<Stream> {
285 fn from(value: T) -> Self {
286 PlanRef(Rc::new(value) as _)
287 }
288}
289
290impl<T: BatchPlanNode> From<T> for PlanRef<Batch> {
291 fn from(value: T) -> Self {
292 PlanRef(Rc::new(value) as _)
293 }
294}
295
296impl<C: ConventionMarker> Layer for PlanRef<C> {
297 type Sub = Self;
298
299 fn map<F>(self, f: F) -> Self
300 where
301 F: FnMut(Self::Sub) -> Self::Sub,
302 {
303 self.clone_root_with_inputs(&self.inputs().into_iter().map(f).collect_vec())
304 }
305
306 fn descent<F>(&self, f: F)
307 where
308 F: FnMut(&Self::Sub),
309 {
310 self.inputs().iter().for_each(f);
311 }
312}
313
314#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
315pub struct PlanNodeId(pub i32);
316
317impl PlanNodeId {
318 pub fn to_stream_node_operator_id(self) -> StreamNodeLocalOperatorId {
319 StreamNodeLocalOperatorId::new(self.0 as _)
320 }
321}
322
323pub trait EndoPlan: Endo<LogicalPlanRef> {
333 fn cached<F>(&mut self, plan: LogicalPlanRef, f: F) -> LogicalPlanRef
338 where
339 F: FnMut(&mut Self) -> LogicalPlanRef;
340
341 fn dag_apply(&mut self, plan: LogicalPlanRef) -> LogicalPlanRef {
342 match plan.as_logical_share() {
343 Some(_) => self.cached(plan.clone(), |this| this.tree_apply(plan.clone())),
344 None => self.tree_apply(plan),
345 }
346 }
347}
348
349pub trait VisitPlan: Visit<LogicalPlanRef> {
355 fn visited<F>(&mut self, plan: &LogicalPlanRef, f: F)
359 where
360 F: FnMut(&mut Self);
361
362 fn dag_visit(&mut self, plan: &LogicalPlanRef) {
363 match plan.as_logical_share() {
364 Some(_) => self.visited(plan, |this| this.tree_visit(plan)),
365 None => self.tree_visit(plan),
366 }
367 }
368}
369
370impl<C: ConventionMarker> PlanRef<C> {
371 pub fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef<C> {
372 let new = self.rewrite_exprs(r);
373 let inputs: Vec<PlanRef<C>> = new
374 .inputs()
375 .iter()
376 .map(|plan_ref| plan_ref.rewrite_exprs_recursive(r))
377 .collect();
378 new.clone_root_with_inputs(&inputs[..])
379 }
380}
381
382pub(crate) trait VisitExprsRecursive {
383 fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor);
384}
385
386impl<C: ConventionMarker> VisitExprsRecursive for PlanRef<C> {
387 fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor) {
388 self.visit_exprs(r);
389 self.inputs()
390 .iter()
391 .for_each(|plan_ref| plan_ref.visit_exprs_recursive(r));
392 }
393}
394
395impl<C: ConventionMarker> PlanRef<C> {
396 pub fn expect_stream_key(&self) -> &[usize] {
397 self.stream_key().unwrap_or_else(|| {
398 panic!(
399 "a stream key is expected but not exist, plan:\n{}",
400 self.explain_to_string()
401 )
402 })
403 }
404}
405
406impl LogicalPlanRef {
407 fn prune_col_inner(
408 &self,
409 required_cols: &[usize],
410 ctx: &mut ColumnPruningContext,
411 ) -> LogicalPlanRef {
412 if let Some(logical_share) = self.as_logical_share() {
413 if let Some((new_share, merge_required_cols)) = ctx.get_share_cache(self.id()) {
416 if ctx.get_parent_num(logical_share) == 1 {
418 let input: LogicalPlanRef = logical_share.input();
419 return input.prune_col(required_cols, ctx);
420 }
421
422 if ctx.visit_share_at_first_round(self.id()) {
425 let new_logical_share: &LogicalShare = new_share
426 .as_logical_share()
427 .expect("must be share operator");
428 let new_share_input = new_logical_share.input().prune_col(
429 &(0..new_logical_share.base.schema().len()).collect_vec(),
430 ctx,
431 );
432 new_logical_share.replace_input(new_share_input);
433 }
434
435 let new_required_cols: Vec<usize> = required_cols
437 .iter()
438 .map(|col| merge_required_cols.iter().position(|x| x == col).unwrap())
439 .collect_vec();
440 let mapping = ColIndexMapping::with_remaining_columns(
441 &new_required_cols,
442 new_share.schema().len(),
443 );
444 return LogicalProject::with_mapping(new_share, mapping).into();
445 }
446
447 let parent_has_pushed = ctx.add_required_cols(self.id(), required_cols.into());
452 if parent_has_pushed == ctx.get_parent_num(logical_share) {
453 let merge_require_cols = ctx
454 .take_required_cols(self.id())
455 .expect("must have required columns")
456 .into_iter()
457 .flat_map(|x| x.into_iter())
458 .sorted()
459 .dedup()
460 .collect_vec();
461 let input: LogicalPlanRef = logical_share.input();
462 let input = input.prune_col(&merge_require_cols, ctx);
463
464 let new_logical_share = LogicalShare::create(input.clone());
466 ctx.add_share_cache(self.id(), new_logical_share, merge_require_cols.clone());
467
468 let exprs = logical_share
469 .base
470 .schema()
471 .fields
472 .iter()
473 .enumerate()
474 .map(|(i, field)| {
475 if let Some(pos) = merge_require_cols.iter().position(|x| *x == i) {
476 ExprImpl::InputRef(Box::new(InputRef::new(
477 pos,
478 field.data_type.clone(),
479 )))
480 } else {
481 ExprImpl::Literal(Box::new(Literal::new(None, field.data_type.clone())))
482 }
483 })
484 .collect_vec();
485 let project = LogicalProject::create(input, exprs);
486 logical_share.replace_input(project);
487 }
488 let mapping =
489 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
490 LogicalProject::with_mapping(self.clone(), mapping).into()
491 } else {
492 let dyn_t = self.deref();
494 dyn_t.prune_col(required_cols, ctx)
495 }
496 }
497
498 fn predicate_pushdown_inner(
499 &self,
500 predicate: Condition,
501 ctx: &mut PredicatePushdownContext,
502 ) -> LogicalPlanRef {
503 if let Some(logical_share) = self.as_logical_share() {
504 if ctx.get_parent_num(logical_share) == 1 {
506 let input: LogicalPlanRef = logical_share.input();
507 return input.predicate_pushdown(predicate, ctx);
508 }
509
510 let parent_has_pushed = ctx.add_predicate(self.id(), predicate.clone());
515 if parent_has_pushed == ctx.get_parent_num(logical_share) {
516 let merge_predicate = ctx
517 .take_predicate(self.id())
518 .expect("must have predicate")
519 .into_iter()
520 .map(|mut c| Condition {
521 conjunctions: c
522 .conjunctions
523 .extract_if(.., |e| {
524 let mut finder = ExprCorrelatedIdFinder::default();
527 finder.visit_expr(e);
528 e.count_nows() == 0
529 && e.is_pure()
530 && !finder.has_correlated_input_ref()
531 })
532 .collect(),
533 })
534 .reduce(|a, b| a.or(b))
535 .unwrap();
536
537 let mut expr_rewriter = ExpressionSimplifyRewriter {};
547 let mut new_predicate = Condition::true_cond();
548
549 for c in merge_predicate.conjunctions {
550 let c = Condition::with_expr(expr_rewriter.rewrite_cond(c));
551 new_predicate = new_predicate.and(c);
553 }
554
555 let input: LogicalPlanRef = logical_share.input();
556 let input = input.predicate_pushdown(new_predicate, ctx);
557 logical_share.replace_input(input);
558 }
559 LogicalFilter::create(self.clone(), predicate)
560 } else {
561 let dyn_t = self.deref();
563 dyn_t.predicate_pushdown(predicate, ctx)
564 }
565 }
566
567 pub fn forbid_snapshot_backfill(&self) -> Option<String> {
568 struct ForbidSnapshotBackfill {
569 warning_msg: Option<String>,
570 }
571 impl LogicalPlanVisitor for ForbidSnapshotBackfill {
572 type Result = ();
573
574 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
575
576 fn default_behavior() -> Self::DefaultBehavior {
577 DefaultValue
578 }
579
580 fn visit_logical_join(&mut self, plan: &LogicalJoin) -> Self::Result {
581 self.visit(plan.left());
582 self.visit(plan.right());
583 if self.warning_msg.is_none() && plan.should_be_temporal_join() {
584 self.warning_msg =
585 Some("snapshot backfill disabled due to temporal join".to_owned());
586 }
587 }
588
589 fn visit_logical_source(&mut self, plan: &LogicalSource) -> Self::Result {
590 if self.warning_msg.is_none() && plan.is_shared_source() {
591 self.warning_msg = Some(format!(
592 "snapshot backfill disabled due to using shared source {:?}",
593 plan.core.catalog.as_ref().map(|c| &c.name)
594 ));
595 }
596 }
597 }
598 let mut forbid_snapshot = ForbidSnapshotBackfill { warning_msg: None };
599 forbid_snapshot.visit(self.clone());
600 forbid_snapshot.warning_msg
601 }
602}
603
604impl ColPrunable for LogicalPlanRef {
605 #[allow(clippy::let_and_return)]
606 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> LogicalPlanRef {
607 let res = self.prune_col_inner(required_cols, ctx);
608 #[cfg(debug_assertions)]
609 super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
610 "column pruning",
611 &LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
612 &res,
613 );
614 res
615 }
616}
617
618impl PredicatePushdown for LogicalPlanRef {
619 #[allow(clippy::let_and_return)]
620 fn predicate_pushdown(
621 &self,
622 predicate: Condition,
623 ctx: &mut PredicatePushdownContext,
624 ) -> LogicalPlanRef {
625 #[cfg(debug_assertions)]
626 let predicate_clone = predicate.clone();
627
628 let res = self.predicate_pushdown_inner(predicate, ctx);
629
630 #[cfg(debug_assertions)]
631 super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
632 "predicate push down",
633 &LogicalFilter::new(self.clone(), predicate_clone).into(),
634 &res,
635 );
636
637 res
638 }
639}
640
641impl<C: ConventionMarker> PlanRef<C> {
642 pub fn clone_root_with_inputs(&self, inputs: &[PlanRef<C>]) -> PlanRef<C> {
643 if let Some(share) = self.as_share_node() {
644 assert_eq!(inputs.len(), 1);
645 share.replace_input(inputs[0].clone());
647 self.clone()
648 } else {
649 let dyn_t = self.deref();
651 dyn_t.clone_with_inputs(inputs)
652 }
653 }
654}
655
656impl<C: ConventionMarker> PlanRef<C> {
658 pub fn node_type(&self) -> C::PlanNodeType {
659 self.0.node_type()
660 }
661
662 pub fn plan_base(&self) -> &PlanBase<C> {
663 self.0.plan_base()
664 }
665}
666
667impl<C: ConventionMarker> GenericPlanRef for PlanRef<C> {
670 fn id(&self) -> PlanNodeId {
671 self.plan_base().id()
672 }
673
674 fn schema(&self) -> &Schema {
675 self.plan_base().schema()
676 }
677
678 fn stream_key(&self) -> Option<&[usize]> {
679 self.plan_base().stream_key()
680 }
681
682 fn ctx(&self) -> OptimizerContextRef {
683 self.plan_base().ctx()
684 }
685
686 fn functional_dependency(&self) -> &FunctionalDependencySet {
687 self.plan_base().functional_dependency()
688 }
689}
690
691impl PhysicalPlanRef for BatchPlanRef {
694 fn distribution(&self) -> &Distribution {
695 self.plan_base().distribution()
696 }
697}
698
699impl PhysicalPlanRef for StreamPlanRef {
700 fn distribution(&self) -> &Distribution {
701 self.plan_base().distribution()
702 }
703}
704
705impl StreamPlanNodeMetadata for StreamPlanRef {
708 fn stream_kind(&self) -> StreamKind {
709 self.plan_base().stream_kind()
710 }
711
712 fn emit_on_window_close(&self) -> bool {
713 self.plan_base().emit_on_window_close()
714 }
715
716 fn watermark_columns(&self) -> &WatermarkColumns {
717 self.plan_base().watermark_columns()
718 }
719
720 fn columns_monotonicity(&self) -> &MonotonicityMap {
721 self.plan_base().columns_monotonicity()
722 }
723}
724
725impl BatchPlanNodeMetadata for BatchPlanRef {
728 fn order(&self) -> &Order {
729 self.plan_base().order()
730 }
731
732 fn orders(&self) -> Vec<Order> {
733 self.plan_base().orders()
734 }
735}
736
737pub fn reorganize_elements_id<C: ConventionMarker>(plan: PlanRef<C>) -> PlanRef<C> {
741 let backup = plan.ctx().backup_elem_ids();
742 plan.ctx().reset_elem_ids();
743 let plan = PlanCloner::clone_whole_plan(plan);
744 plan.ctx().restore_elem_ids(backup);
745 plan
746}
747
748pub trait Explain {
749 fn explain<'a>(&self) -> Pretty<'a>;
751
752 fn explain_with_id<'a>(&self) -> Pretty<'a>;
754
755 fn explain_to_string(&self) -> String;
757
758 fn explain_to_json(&self) -> String;
760
761 fn explain_to_xml(&self) -> String;
763
764 fn explain_to_yaml(&self) -> String;
766
767 fn explain_to_dot(&self) -> String;
769}
770
771impl<C: ConventionMarker> Explain for PlanRef<C> {
772 fn explain<'a>(&self) -> Pretty<'a> {
774 let mut node = self.distill();
775 let inputs = self.inputs();
776 for input in inputs.iter().peekable() {
777 node.children.push(input.explain());
778 }
779 Pretty::Record(node)
780 }
781
782 fn explain_with_id<'a>(&self) -> Pretty<'a> {
784 let node_id = self.id();
785 let mut node = self.distill();
786 node.fields
789 .insert(0, ("id".into(), Pretty::display(&node_id.0)));
790 let inputs = self.inputs();
791 for input in inputs.iter().peekable() {
792 node.children.push(input.explain_with_id());
793 }
794 Pretty::Record(node)
795 }
796
797 fn explain_to_string(&self) -> String {
799 let plan = reorganize_elements_id(self.clone());
800
801 let mut output = String::with_capacity(2048);
802 let mut config = pretty_config();
803 config.unicode(&mut output, &plan.explain());
804 output
805 }
806
807 fn explain_to_json(&self) -> String {
809 let plan = reorganize_elements_id(self.clone());
810 let explain_ir = plan.explain();
811 serde_json::to_string_pretty(&PrettySerde(explain_ir, true))
812 .expect("failed to serialize plan to json")
813 }
814
815 fn explain_to_xml(&self) -> String {
817 let plan = reorganize_elements_id(self.clone());
818 let explain_ir = plan.explain();
819 quick_xml::se::to_string(&PrettySerde(explain_ir, true))
820 .expect("failed to serialize plan to xml")
821 }
822
823 fn explain_to_yaml(&self) -> String {
825 let plan = reorganize_elements_id(self.clone());
826 let explain_ir = plan.explain();
827 serde_yaml::to_string(&PrettySerde(explain_ir, true))
828 .expect("failed to serialize plan to yaml")
829 }
830
831 fn explain_to_dot(&self) -> String {
833 let plan = reorganize_elements_id(self.clone());
834 let explain_ir = plan.explain_with_id();
835 let mut graph = Graph::<String, String>::new();
836 let mut nodes = HashMap::new();
837 build_graph_from_pretty(&explain_ir, &mut graph, &mut nodes, None);
838 let dot = Dot::with_config(&graph, &[Config::EdgeNoLabel]);
839 dot.to_string()
840 }
841}
842
843impl<C: ConventionMarker> PlanRef<C> {
844 pub fn as_share_node(&self) -> Option<&C::ShareNode> {
845 C::as_share(self)
846 }
847}
848
849pub(crate) fn pretty_config() -> PrettyConfig {
850 PrettyConfig {
851 indent: 3,
852 need_boundaries: false,
853 width: 2048,
854 reduced_spaces: true,
855 }
856}
857
858macro_rules! impl_generic_plan_ref_method {
859 ($($convention:ident),+) => {
860 paste! {
861 $(
862 impl dyn [<$convention PlanNode>] {
864 pub fn id(&self) -> PlanNodeId {
865 self.plan_base().id()
866 }
867
868 pub fn ctx(&self) -> OptimizerContextRef {
869 self.plan_base().ctx().clone()
870 }
871
872 pub fn schema(&self) -> &Schema {
873 self.plan_base().schema()
874 }
875
876 pub fn stream_key(&self) -> Option<&[usize]> {
877 self.plan_base().stream_key()
878 }
879
880 pub fn functional_dependency(&self) -> &FunctionalDependencySet {
881 self.plan_base().functional_dependency()
882 }
883
884 pub fn explain_myself_to_string(&self) -> String {
885 self.distill_to_string()
886 }
887 }
888 )+
889 }
890 };
891}
892
893impl_generic_plan_ref_method!(Batch, Stream, Logical);
894
895pub const PLAN_DEPTH_THRESHOLD: usize = 30;
897pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
899Consider simplifying or splitting the query if you encounter any issues.";
900
901impl dyn StreamPlanNode {
902 pub fn to_stream_prost(
907 &self,
908 state: &mut BuildFragmentGraphState,
909 ) -> SchedulerResult<PbStreamPlan> {
910 recursive::tracker!().recurse(|t| {
911 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
912 notice_to_user(PLAN_TOO_DEEP_NOTICE);
913 }
914
915 use stream::prelude::*;
916
917 if let Some(stream_table_scan) = self.as_stream_table_scan() {
918 return stream_table_scan.adhoc_to_stream_prost(state);
919 }
920 if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
921 return stream_cdc_table_scan.adhoc_to_stream_prost(state);
922 }
923 if let Some(stream_source_scan) = self.as_stream_source_scan() {
924 return stream_source_scan.adhoc_to_stream_prost(state);
925 }
926 if let Some(stream_share) = self.as_stream_share() {
927 return stream_share.adhoc_to_stream_prost(state);
928 }
929
930 let node = Some(self.try_to_stream_prost_body(state)?);
931 let input = self
932 .inputs()
933 .into_iter()
934 .map(|plan| plan.to_stream_prost(state))
935 .try_collect()?;
936 Ok(PbStreamPlan {
938 input,
939 identity: self.explain_myself_to_string(),
940 node_body: node,
941 operator_id: self.id().to_stream_node_operator_id(),
942 stream_key: self
943 .stream_key()
944 .unwrap_or_default()
945 .iter()
946 .map(|x| *x as u32)
947 .collect(),
948 fields: self.schema().to_prost(),
949 stream_kind: self.plan_base().stream_kind().to_protobuf() as i32,
950 })
951 })
952 }
953}
954
955impl dyn BatchPlanNode {
956 pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
958 self.to_batch_prost_identity(true)
959 }
960
961 pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
964 recursive::tracker!().recurse(|t| {
965 if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
966 notice_to_user(PLAN_TOO_DEEP_NOTICE);
967 }
968
969 let node_body = Some(self.try_to_batch_prost_body()?);
970 let children = self
971 .inputs()
972 .into_iter()
973 .map(|plan| plan.to_batch_prost_identity(identity))
974 .try_collect()?;
975 Ok(PbBatchPlan {
976 children,
977 identity: if identity {
978 self.explain_myself_to_string()
979 } else {
980 "".into()
981 },
982 node_body,
983 })
984 })
985 }
986}
987
988mod plan_base;
989pub use plan_base::*;
990#[macro_use]
991mod plan_tree_node;
992pub use plan_tree_node::*;
993mod col_pruning;
994pub use col_pruning::*;
995mod expr_rewritable;
996pub use expr_rewritable::*;
997mod expr_visitable;
998
999mod convert;
1000pub use convert::*;
1001mod eq_join_predicate;
1002pub use eq_join_predicate::*;
1003mod to_prost;
1004pub use to_prost::*;
1005mod predicate_pushdown;
1006pub use predicate_pushdown::*;
1007mod merge_eq_nodes;
1008pub use merge_eq_nodes::*;
1009
1010pub mod batch;
1011pub mod generic;
1012pub mod stream;
1013
1014pub use generic::{PlanAggCall, PlanAggCallDisplay};
1015
1016mod batch_delete;
1017mod batch_exchange;
1018mod batch_expand;
1019mod batch_filter;
1020mod batch_get_channel_delta_stats;
1021mod batch_group_topn;
1022mod batch_hash_agg;
1023mod batch_hash_join;
1024mod batch_hop_window;
1025mod batch_insert;
1026mod batch_limit;
1027mod batch_log_seq_scan;
1028mod batch_lookup_join;
1029mod batch_max_one_row;
1030mod batch_nested_loop_join;
1031mod batch_over_window;
1032mod batch_project;
1033mod batch_project_set;
1034mod batch_seq_scan;
1035mod batch_simple_agg;
1036mod batch_sort;
1037mod batch_sort_agg;
1038mod batch_source;
1039mod batch_sys_seq_scan;
1040mod batch_table_function;
1041mod batch_topn;
1042mod batch_union;
1043mod batch_update;
1044mod batch_values;
1045mod logical_agg;
1046mod logical_apply;
1047mod logical_cdc_scan;
1048mod logical_changelog;
1049mod logical_dedup;
1050mod logical_delete;
1051mod logical_except;
1052mod logical_expand;
1053mod logical_filter;
1054mod logical_gap_fill;
1055mod logical_get_channel_delta_stats;
1056mod logical_hop_window;
1057mod logical_insert;
1058mod logical_intersect;
1059mod logical_join;
1060mod logical_kafka_scan;
1061mod logical_limit;
1062mod logical_locality_provider;
1063mod logical_max_one_row;
1064mod logical_multi_join;
1065mod logical_now;
1066mod logical_over_window;
1067mod logical_project;
1068mod logical_project_set;
1069mod logical_scan;
1070mod logical_share;
1071mod logical_source;
1072mod logical_sys_scan;
1073mod logical_table_function;
1074mod logical_topn;
1075mod logical_union;
1076mod logical_update;
1077mod logical_values;
1078mod stream_asof_join;
1079mod stream_changelog;
1080mod stream_dedup;
1081mod stream_delta_join;
1082mod stream_dml;
1083mod stream_dynamic_filter;
1084mod stream_eowc_gap_fill;
1085mod stream_eowc_over_window;
1086mod stream_exchange;
1087mod stream_expand;
1088mod stream_filter;
1089mod stream_fs_fetch;
1090mod stream_gap_fill;
1091mod stream_global_approx_percentile;
1092mod stream_group_topn;
1093mod stream_hash_agg;
1094mod stream_hash_join;
1095mod stream_hop_window;
1096mod stream_join_common;
1097mod stream_local_approx_percentile;
1098mod stream_locality_provider;
1099mod stream_materialize;
1100mod stream_materialized_exprs;
1101mod stream_now;
1102mod stream_over_window;
1103mod stream_project;
1104mod stream_project_set;
1105mod stream_row_id_gen;
1106mod stream_row_merge;
1107mod stream_simple_agg;
1108mod stream_sink;
1109mod stream_sort;
1110mod stream_source;
1111mod stream_source_scan;
1112mod stream_stateless_simple_agg;
1113mod stream_sync_log_store;
1114mod stream_table_scan;
1115mod stream_topn;
1116mod stream_union;
1117mod stream_values;
1118mod stream_watermark_filter;
1119
1120mod batch_file_scan;
1121mod batch_iceberg_scan;
1122mod batch_kafka_scan;
1123mod batch_postgres_query;
1124
1125mod batch_mysql_query;
1126mod derive;
1127mod logical_file_scan;
1128mod logical_iceberg_intermediate_scan;
1129mod logical_iceberg_scan;
1130mod logical_postgres_query;
1131
1132mod batch_vector_search;
1133mod logical_mysql_query;
1134mod logical_vector_search;
1135mod logical_vector_search_lookup_join;
1136mod stream_cdc_table_scan;
1137mod stream_share;
1138mod stream_temporal_join;
1139mod stream_upstream_sink_union;
1140mod stream_vector_index_lookup_join;
1141mod stream_vector_index_write;
1142pub mod utils;
1143
1144pub use batch_delete::BatchDelete;
1145pub use batch_exchange::BatchExchange;
1146pub use batch_expand::BatchExpand;
1147pub use batch_file_scan::BatchFileScan;
1148pub use batch_filter::BatchFilter;
1149pub use batch_get_channel_delta_stats::BatchGetChannelDeltaStats;
1150pub use batch_group_topn::BatchGroupTopN;
1151pub use batch_hash_agg::BatchHashAgg;
1152pub use batch_hash_join::BatchHashJoin;
1153pub use batch_hop_window::BatchHopWindow;
1154pub use batch_iceberg_scan::BatchIcebergScan;
1155pub use batch_insert::BatchInsert;
1156pub use batch_kafka_scan::BatchKafkaScan;
1157pub use batch_limit::BatchLimit;
1158pub use batch_log_seq_scan::BatchLogSeqScan;
1159pub use batch_lookup_join::BatchLookupJoin;
1160pub use batch_max_one_row::BatchMaxOneRow;
1161pub use batch_mysql_query::BatchMySqlQuery;
1162pub use batch_nested_loop_join::BatchNestedLoopJoin;
1163pub use batch_over_window::BatchOverWindow;
1164pub use batch_postgres_query::BatchPostgresQuery;
1165pub use batch_project::BatchProject;
1166pub use batch_project_set::BatchProjectSet;
1167pub use batch_seq_scan::BatchSeqScan;
1168pub use batch_simple_agg::BatchSimpleAgg;
1169pub use batch_sort::BatchSort;
1170pub use batch_sort_agg::BatchSortAgg;
1171pub use batch_source::BatchSource;
1172pub use batch_sys_seq_scan::BatchSysSeqScan;
1173pub use batch_table_function::BatchTableFunction;
1174pub use batch_topn::BatchTopN;
1175pub use batch_union::BatchUnion;
1176pub use batch_update::BatchUpdate;
1177pub use batch_values::BatchValues;
1178pub use batch_vector_search::BatchVectorSearch;
1179pub use logical_agg::LogicalAgg;
1180pub use logical_apply::LogicalApply;
1181pub use logical_cdc_scan::LogicalCdcScan;
1182pub use logical_changelog::LogicalChangeLog;
1183pub use logical_dedup::LogicalDedup;
1184pub use logical_delete::LogicalDelete;
1185pub use logical_except::LogicalExcept;
1186pub use logical_expand::LogicalExpand;
1187pub use logical_file_scan::LogicalFileScan;
1188pub use logical_filter::LogicalFilter;
1189pub use logical_gap_fill::LogicalGapFill;
1190pub use logical_get_channel_delta_stats::LogicalGetChannelDeltaStats;
1191pub use logical_hop_window::LogicalHopWindow;
1192pub use logical_iceberg_intermediate_scan::LogicalIcebergIntermediateScan;
1193pub use logical_iceberg_scan::LogicalIcebergScan;
1194pub use logical_insert::LogicalInsert;
1195pub use logical_intersect::LogicalIntersect;
1196pub use logical_join::LogicalJoin;
1197pub use logical_kafka_scan::LogicalKafkaScan;
1198pub use logical_limit::LogicalLimit;
1199pub use logical_locality_provider::LogicalLocalityProvider;
1200pub use logical_max_one_row::LogicalMaxOneRow;
1201pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder};
1202pub use logical_mysql_query::LogicalMySqlQuery;
1203pub use logical_now::LogicalNow;
1204pub use logical_over_window::LogicalOverWindow;
1205pub use logical_postgres_query::LogicalPostgresQuery;
1206pub use logical_project::LogicalProject;
1207pub use logical_project_set::LogicalProjectSet;
1208pub use logical_scan::LogicalScan;
1209pub use logical_share::LogicalShare;
1210pub use logical_source::LogicalSource;
1211pub use logical_sys_scan::LogicalSysScan;
1212pub use logical_table_function::LogicalTableFunction;
1213pub use logical_topn::LogicalTopN;
1214pub use logical_union::LogicalUnion;
1215pub use logical_update::LogicalUpdate;
1216pub use logical_values::LogicalValues;
1217pub use logical_vector_search::LogicalVectorSearch;
1218pub use logical_vector_search_lookup_join::LogicalVectorSearchLookupJoin;
1219use risingwave_pb::id::StreamNodeLocalOperatorId;
1220pub use stream_asof_join::StreamAsOfJoin;
1221pub use stream_cdc_table_scan::StreamCdcTableScan;
1222pub use stream_changelog::StreamChangeLog;
1223pub use stream_dedup::StreamDedup;
1224pub use stream_delta_join::StreamDeltaJoin;
1225pub use stream_dml::StreamDml;
1226pub use stream_dynamic_filter::StreamDynamicFilter;
1227pub use stream_eowc_gap_fill::StreamEowcGapFill;
1228pub use stream_eowc_over_window::StreamEowcOverWindow;
1229pub use stream_exchange::StreamExchange;
1230pub use stream_expand::StreamExpand;
1231pub use stream_filter::StreamFilter;
1232pub use stream_fs_fetch::StreamFsFetch;
1233pub use stream_gap_fill::StreamGapFill;
1234pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
1235pub use stream_group_topn::StreamGroupTopN;
1236pub use stream_hash_agg::StreamHashAgg;
1237pub use stream_hash_join::StreamHashJoin;
1238pub use stream_hop_window::StreamHopWindow;
1239use stream_join_common::StreamJoinCommon;
1240pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
1241pub use stream_locality_provider::StreamLocalityProvider;
1242pub use stream_materialize::StreamMaterialize;
1243pub use stream_materialized_exprs::StreamMaterializedExprs;
1244pub use stream_now::StreamNow;
1245pub use stream_over_window::StreamOverWindow;
1246pub use stream_project::StreamProject;
1247pub use stream_project_set::StreamProjectSet;
1248pub use stream_row_id_gen::StreamRowIdGen;
1249pub use stream_row_merge::StreamRowMerge;
1250pub use stream_share::StreamShare;
1251pub use stream_simple_agg::StreamSimpleAgg;
1252pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
1253pub use stream_sort::StreamEowcSort;
1254pub use stream_source::StreamSource;
1255pub use stream_source_scan::StreamSourceScan;
1256pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
1257pub use stream_sync_log_store::StreamSyncLogStore;
1258pub use stream_table_scan::StreamTableScan;
1259pub use stream_temporal_join::StreamTemporalJoin;
1260pub use stream_topn::StreamTopN;
1261pub use stream_union::StreamUnion;
1262pub use stream_upstream_sink_union::StreamUpstreamSinkUnion;
1263pub use stream_values::StreamValues;
1264pub use stream_vector_index_lookup_join::StreamVectorIndexLookupJoin;
1265pub use stream_vector_index_write::StreamVectorIndexWrite;
1266pub use stream_watermark_filter::StreamWatermarkFilter;
1267
1268use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
1269use crate::optimizer::optimizer_context::OptimizerContextRef;
1270use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
1271use crate::optimizer::plan_rewriter::PlanCloner;
1272use crate::optimizer::plan_visitor::{
1273 DefaultBehavior, DefaultValue, ExprCorrelatedIdFinder, LogicalPlanVisitor,
1274};
1275use crate::scheduler::SchedulerResult;
1276use crate::stream_fragmenter::BuildFragmentGraphState;
1277use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};
1278
1279#[macro_export]
1292macro_rules! for_all_plan_nodes {
1293 ($macro:path $(,$rest:tt)*) => {
1294 $macro! {
1295 { Logical, Agg }
1296 , { Logical, Apply }
1297 , { Logical, Filter }
1298 , { Logical, Project }
1299 , { Logical, Scan }
1300 , { Logical, CdcScan }
1301 , { Logical, SysScan }
1302 , { Logical, Source }
1303 , { Logical, Insert }
1304 , { Logical, Delete }
1305 , { Logical, Update }
1306 , { Logical, Join }
1307 , { Logical, Values }
1308 , { Logical, Limit }
1309 , { Logical, TopN }
1310 , { Logical, HopWindow }
1311 , { Logical, TableFunction }
1312 , { Logical, MultiJoin }
1313 , { Logical, Expand }
1314 , { Logical, ProjectSet }
1315 , { Logical, Union }
1316 , { Logical, OverWindow }
1317 , { Logical, Share }
1318 , { Logical, Now }
1319 , { Logical, Dedup }
1320 , { Logical, Intersect }
1321 , { Logical, Except }
1322 , { Logical, MaxOneRow }
1323 , { Logical, KafkaScan }
1324 , { Logical, IcebergScan }
1325 , { Logical, IcebergIntermediateScan }
1326 , { Logical, ChangeLog }
1327 , { Logical, FileScan }
1328 , { Logical, PostgresQuery }
1329 , { Logical, MySqlQuery }
1330 , { Logical, GapFill }
1331 , { Logical, VectorSearch }
1332 , { Logical, GetChannelDeltaStats }
1333 , { Logical, LocalityProvider }
1334 , { Logical, VectorSearchLookupJoin }
1335 , { Batch, SimpleAgg }
1336 , { Batch, HashAgg }
1337 , { Batch, SortAgg }
1338 , { Batch, Project }
1339 , { Batch, Filter }
1340 , { Batch, Insert }
1341 , { Batch, Delete }
1342 , { Batch, Update }
1343 , { Batch, SeqScan }
1344 , { Batch, SysSeqScan }
1345 , { Batch, LogSeqScan }
1346 , { Batch, HashJoin }
1347 , { Batch, NestedLoopJoin }
1348 , { Batch, Values }
1349 , { Batch, Sort }
1350 , { Batch, Exchange }
1351 , { Batch, Limit }
1352 , { Batch, TopN }
1353 , { Batch, HopWindow }
1354 , { Batch, TableFunction }
1355 , { Batch, Expand }
1356 , { Batch, LookupJoin }
1357 , { Batch, ProjectSet }
1358 , { Batch, Union }
1359 , { Batch, GroupTopN }
1360 , { Batch, Source }
1361 , { Batch, OverWindow }
1362 , { Batch, MaxOneRow }
1363 , { Batch, KafkaScan }
1364 , { Batch, IcebergScan }
1365 , { Batch, FileScan }
1366 , { Batch, PostgresQuery }
1367 , { Batch, MySqlQuery }
1368 , { Batch, GetChannelDeltaStats }
1369 , { Batch, VectorSearch }
1370 , { Stream, Project }
1371 , { Stream, Filter }
1372 , { Stream, TableScan }
1373 , { Stream, CdcTableScan }
1374 , { Stream, Sink }
1375 , { Stream, Source }
1376 , { Stream, SourceScan }
1377 , { Stream, HashJoin }
1378 , { Stream, Exchange }
1379 , { Stream, HashAgg }
1380 , { Stream, SimpleAgg }
1381 , { Stream, StatelessSimpleAgg }
1382 , { Stream, Materialize }
1383 , { Stream, TopN }
1384 , { Stream, HopWindow }
1385 , { Stream, DeltaJoin }
1386 , { Stream, Expand }
1387 , { Stream, DynamicFilter }
1388 , { Stream, ProjectSet }
1389 , { Stream, GroupTopN }
1390 , { Stream, Union }
1391 , { Stream, RowIdGen }
1392 , { Stream, Dml }
1393 , { Stream, Now }
1394 , { Stream, Share }
1395 , { Stream, WatermarkFilter }
1396 , { Stream, TemporalJoin }
1397 , { Stream, Values }
1398 , { Stream, Dedup }
1399 , { Stream, EowcOverWindow }
1400 , { Stream, EowcSort }
1401 , { Stream, OverWindow }
1402 , { Stream, FsFetch }
1403 , { Stream, ChangeLog }
1404 , { Stream, GlobalApproxPercentile }
1405 , { Stream, LocalApproxPercentile }
1406 , { Stream, RowMerge }
1407 , { Stream, AsOfJoin }
1408 , { Stream, SyncLogStore }
1409 , { Stream, MaterializedExprs }
1410 , { Stream, VectorIndexWrite }
1411 , { Stream, VectorIndexLookupJoin }
1412 , { Stream, UpstreamSinkUnion }
1413 , { Stream, LocalityProvider }
1414 , { Stream, EowcGapFill }
1415 , { Stream, GapFill }
1416 $(,$rest)*
1417 }
1418 };
1419}
1420
1421#[macro_export]
1422macro_rules! for_each_convention_all_plan_nodes {
1423 ($macro:path $(,$rest:tt)*) => {
1424 $crate::for_all_plan_nodes! {
1425 $crate::for_each_convention_all_plan_nodes
1426 , $macro
1427 $(,$rest)*
1428 }
1429 };
1430 (
1431 $( { Logical, $logical_name:ident } ),*
1432 , $( { Batch, $batch_name:ident } ),*
1433 , $( { Stream, $stream_name:ident } ),*
1434 , $macro:path $(,$rest:tt)*
1435 ) => {
1436 $macro! {
1437 {
1438 Logical, { $( $logical_name ),* },
1439 Batch, { $( $batch_name ),* },
1440 Stream, { $( $stream_name ),* }
1441 }
1442 $(,$rest)*
1443 }
1444 }
1445}
1446
1447macro_rules! impl_plan_node_meta {
1449 ({
1450 $( $convention:ident, { $( $name:ident ),* }),*
1451 }) => {
1452 paste!{
1453 $(
1454 #[derive(Copy, Clone, PartialEq, Debug, Hash, Eq, Serialize)]
1456 pub enum [<$convention PlanNodeType>] {
1457 $( [<$convention $name>] ),*
1458 }
1459 )*
1460 $(
1461 $(impl PlanNodeMeta for [<$convention $name>] {
1462 type Convention = $convention;
1463 const NODE_TYPE: [<$convention PlanNodeType>] = [<$convention PlanNodeType>]::[<$convention $name>];
1464
1465 fn plan_base(&self) -> &PlanBase<$convention> {
1466 &self.base
1467 }
1468 }
1469
1470 impl Deref for [<$convention $name>] {
1471 type Target = PlanBase<$convention>;
1472
1473 fn deref(&self) -> &Self::Target {
1474 &self.base
1475 }
1476 })*
1477 )*
1478 }
1479 }
1480}
1481
1482for_each_convention_all_plan_nodes! { impl_plan_node_meta }
1483
1484macro_rules! impl_plan_node {
1485 ($({ $convention:ident, $name:ident }),*) => {
1486 paste!{
1487 $(impl [<$convention PlanNode>] for [<$convention $name>] { })*
1488 }
1489 }
1490}
1491
1492for_all_plan_nodes! { impl_plan_node }
1493
1494macro_rules! impl_down_cast_fn {
1496 ({
1497 $( $convention:ident, { $( $name:ident ),* }),*
1498 }) => {
1499 paste!{
1500 $(
1501 impl dyn [<$convention PlanNode>] {
1502 $( pub fn [< as_ $convention:snake _ $name:snake>](&self) -> Option<&[<$convention $name>]> {
1503 self.downcast_ref::<[<$convention $name>]>()
1504 } )*
1505 }
1506 )*
1507 }
1508 }
1509}
1510
1511for_each_convention_all_plan_nodes! { impl_down_cast_fn }