use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Deref;
use std::rc::Rc;
use downcast_rs::{impl_downcast, Downcast};
use dyn_clone::DynClone;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use paste::paste;
use petgraph::dot::{Config, Dot};
use petgraph::graph::{Graph, NodeIndex};
use pretty_xmlish::{Pretty, PrettyConfig};
use risingwave_common::catalog::Schema;
use risingwave_common::util::recursive::{self, Recurse};
use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
use serde::Serialize;
use smallvec::SmallVec;
use self::batch::BatchPlanRef;
use self::generic::{GenericPlanRef, PhysicalPlanRef};
use self::stream::StreamPlanRef;
use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, MonotonicityMap, Order};
use crate::error::{ErrorCode, Result};
use crate::optimizer::ExpressionSimplifyRewriter;
use crate::session::current::notice_to_user;
use crate::utils::PrettySerde;
pub trait ConventionMarker: 'static + Sized {
type Extra: 'static + Eq + Hash + Clone + Debug;
fn value() -> Convention;
}
pub struct Logical;
impl ConventionMarker for Logical {
type Extra = plan_base::NoExtra;
fn value() -> Convention {
Convention::Logical
}
}
pub struct Batch;
impl ConventionMarker for Batch {
type Extra = plan_base::BatchExtra;
fn value() -> Convention {
Convention::Batch
}
}
pub struct Stream;
impl ConventionMarker for Stream {
type Extra = plan_base::StreamExtra;
fn value() -> Convention {
Convention::Stream
}
}
pub trait PlanNodeMeta {
type Convention: ConventionMarker;
const NODE_TYPE: PlanNodeType;
fn plan_base(&self) -> &PlanBase<Self::Convention>;
fn plan_base_ref(&self) -> PlanBaseRef<'_>;
}
mod plan_node_meta {
use super::*;
pub trait AnyPlanNodeMeta {
fn node_type(&self) -> PlanNodeType;
fn plan_base(&self) -> PlanBaseRef<'_>;
fn convention(&self) -> Convention;
}
impl<P> AnyPlanNodeMeta for P
where
P: PlanNodeMeta,
{
fn node_type(&self) -> PlanNodeType {
P::NODE_TYPE
}
fn plan_base(&self) -> PlanBaseRef<'_> {
PlanNodeMeta::plan_base_ref(self)
}
fn convention(&self) -> Convention {
P::Convention::value()
}
}
}
use plan_node_meta::AnyPlanNodeMeta;
pub trait PlanNode:
PlanTreeNode
+ DynClone
+ DynEq
+ DynHash
+ Distill
+ Debug
+ Downcast
+ ColPrunable
+ ExprRewritable
+ ExprVisitable
+ ToBatch
+ ToStream
+ ToDistributedBatch
+ ToPb
+ ToLocalBatch
+ PredicatePushdown
+ AnyPlanNodeMeta
{
}
impl Hash for dyn PlanNode {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.dyn_hash(state);
}
}
impl PartialEq for dyn PlanNode {
fn eq(&self, other: &Self) -> bool {
self.dyn_eq(other.as_dyn_eq())
}
}
impl Eq for dyn PlanNode {}
impl_downcast!(PlanNode);
#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Clone, Debug, Eq, Hash)]
pub struct PlanRef(Rc<dyn PlanNode>);
#[allow(clippy::op_ref)]
impl PartialEq for PlanRef {
fn eq(&self, other: &Self) -> bool {
&self.0 == &other.0
}
}
impl Deref for PlanRef {
type Target = dyn PlanNode;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<T: PlanNode> From<T> for PlanRef {
fn from(value: T) -> Self {
PlanRef(Rc::new(value))
}
}
impl Layer for PlanRef {
type Sub = Self;
fn map<F>(self, f: F) -> Self
where
F: FnMut(Self::Sub) -> Self::Sub,
{
self.clone_with_inputs(&self.inputs().into_iter().map(f).collect_vec())
}
fn descent<F>(&self, f: F)
where
F: FnMut(&Self::Sub),
{
self.inputs().iter().for_each(f);
}
}
#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub struct PlanNodeId(pub i32);
pub trait EndoPlan: Endo<PlanRef> {
fn cached<F>(&mut self, plan: PlanRef, f: F) -> PlanRef
where
F: FnMut(&mut Self) -> PlanRef;
fn dag_apply(&mut self, plan: PlanRef) -> PlanRef {
match plan.as_logical_share() {
Some(_) => self.cached(plan.clone(), |this| this.tree_apply(plan.clone())),
None => self.tree_apply(plan),
}
}
}
pub trait VisitPlan: Visit<PlanRef> {
fn visited<F>(&mut self, plan: &PlanRef, f: F)
where
F: FnMut(&mut Self);
fn dag_visit(&mut self, plan: &PlanRef) {
match plan.as_logical_share() {
Some(_) => self.visited(plan, |this| this.tree_visit(plan)),
None => self.tree_visit(plan),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Convention {
Logical,
Batch,
Stream,
}
pub(crate) trait RewriteExprsRecursive {
fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef;
}
impl RewriteExprsRecursive for PlanRef {
fn rewrite_exprs_recursive(&self, r: &mut impl ExprRewriter) -> PlanRef {
let new = self.rewrite_exprs(r);
let inputs: Vec<PlanRef> = new
.inputs()
.iter()
.map(|plan_ref| plan_ref.rewrite_exprs_recursive(r))
.collect();
new.clone_with_inputs(&inputs[..])
}
}
pub(crate) trait VisitExprsRecursive {
fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor);
}
impl VisitExprsRecursive for PlanRef {
fn visit_exprs_recursive(&self, r: &mut impl ExprVisitor) {
self.visit_exprs(r);
self.inputs()
.iter()
.for_each(|plan_ref| plan_ref.visit_exprs_recursive(r));
}
}
impl PlanRef {
pub fn expect_stream_key(&self) -> &[usize] {
self.stream_key().unwrap_or_else(|| {
panic!(
"a stream key is expected but not exist, plan:\n{}",
self.explain_to_string()
)
})
}
fn prune_col_inner(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
if let Some(logical_share) = self.as_logical_share() {
if let Some((new_share, merge_required_cols)) = ctx.get_share_cache(self.id()) {
if ctx.get_parent_num(logical_share) == 1 {
let input: PlanRef = logical_share.input();
return input.prune_col(required_cols, ctx);
}
if ctx.visit_share_at_second_round(self.id()) {
let new_logical_share: &LogicalShare = new_share
.as_logical_share()
.expect("must be share operator");
let new_share_input = new_logical_share.input().prune_col(
&(0..new_logical_share.base.schema().len()).collect_vec(),
ctx,
);
new_logical_share.replace_input(new_share_input);
}
let new_required_cols: Vec<usize> = required_cols
.iter()
.map(|col| merge_required_cols.iter().position(|x| x == col).unwrap())
.collect_vec();
let mapping = ColIndexMapping::with_remaining_columns(
&new_required_cols,
new_share.schema().len(),
);
return LogicalProject::with_mapping(new_share, mapping).into();
}
let parent_has_pushed = ctx.add_required_cols(self.id(), required_cols.into());
if parent_has_pushed == ctx.get_parent_num(logical_share) {
let merge_require_cols = ctx
.take_required_cols(self.id())
.expect("must have required columns")
.into_iter()
.flat_map(|x| x.into_iter())
.sorted()
.dedup()
.collect_vec();
let input: PlanRef = logical_share.input();
let input = input.prune_col(&merge_require_cols, ctx);
let new_logical_share = LogicalShare::create(input.clone());
ctx.add_share_cache(self.id(), new_logical_share, merge_require_cols.clone());
let exprs = logical_share
.base
.schema()
.fields
.iter()
.enumerate()
.map(|(i, field)| {
if let Some(pos) = merge_require_cols.iter().position(|x| *x == i) {
ExprImpl::InputRef(Box::new(InputRef::new(
pos,
field.data_type.clone(),
)))
} else {
ExprImpl::Literal(Box::new(Literal::new(None, field.data_type.clone())))
}
})
.collect_vec();
let project = LogicalProject::create(input, exprs);
logical_share.replace_input(project);
}
let mapping =
ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
LogicalProject::with_mapping(self.clone(), mapping).into()
} else {
let dyn_t = self.deref();
dyn_t.prune_col(required_cols, ctx)
}
}
fn predicate_pushdown_inner(
&self,
predicate: Condition,
ctx: &mut PredicatePushdownContext,
) -> PlanRef {
if let Some(logical_share) = self.as_logical_share() {
if ctx.get_parent_num(logical_share) == 1 {
let input: PlanRef = logical_share.input();
return input.predicate_pushdown(predicate, ctx);
}
let parent_has_pushed = ctx.add_predicate(self.id(), predicate.clone());
if parent_has_pushed == ctx.get_parent_num(logical_share) {
let merge_predicate = ctx
.take_predicate(self.id())
.expect("must have predicate")
.into_iter()
.map(|mut c| Condition {
conjunctions: c
.conjunctions
.extract_if(|e| {
let mut finder = ExprCorrelatedIdFinder::default();
finder.visit_expr(e);
e.count_nows() == 0
&& e.is_pure()
&& !finder.has_correlated_input_ref()
})
.collect(),
})
.reduce(|a, b| a.or(b))
.unwrap();
let mut expr_rewriter = ExpressionSimplifyRewriter {};
let mut new_predicate = Condition::true_cond();
for c in merge_predicate.conjunctions {
let c = Condition::with_expr(expr_rewriter.rewrite_cond(c));
new_predicate = new_predicate.and(c);
}
let input: PlanRef = logical_share.input();
let input = input.predicate_pushdown(new_predicate, ctx);
logical_share.replace_input(input);
}
LogicalFilter::create(self.clone(), predicate)
} else {
let dyn_t = self.deref();
dyn_t.predicate_pushdown(predicate, ctx)
}
}
}
impl ColPrunable for PlanRef {
#[allow(clippy::let_and_return)]
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let res = self.prune_col_inner(required_cols, ctx);
#[cfg(debug_assertions)]
super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
"column pruning",
&LogicalProject::with_out_col_idx(self.clone(), required_cols.iter().cloned()).into(),
&res,
);
res
}
}
impl PredicatePushdown for PlanRef {
#[allow(clippy::let_and_return)]
fn predicate_pushdown(
&self,
predicate: Condition,
ctx: &mut PredicatePushdownContext,
) -> PlanRef {
#[cfg(debug_assertions)]
let predicate_clone = predicate.clone();
let res = self.predicate_pushdown_inner(predicate, ctx);
#[cfg(debug_assertions)]
super::heuristic_optimizer::HeuristicOptimizer::check_equivalent_plan(
"predicate push down",
&LogicalFilter::new(self.clone(), predicate_clone).into(),
&res,
);
res
}
}
impl PlanTreeNode for PlanRef {
fn inputs(&self) -> SmallVec<[PlanRef; 2]> {
let dyn_t = self.deref();
dyn_t.inputs()
}
fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
if let Some(logical_share) = self.clone().as_logical_share() {
assert_eq!(inputs.len(), 1);
logical_share.replace_input(inputs[0].clone());
self.clone()
} else if let Some(stream_share) = self.clone().as_stream_share() {
assert_eq!(inputs.len(), 1);
stream_share.replace_input(inputs[0].clone());
self.clone()
} else {
let dyn_t = self.deref();
dyn_t.clone_with_inputs(inputs)
}
}
}
impl AnyPlanNodeMeta for PlanRef {
fn node_type(&self) -> PlanNodeType {
self.0.node_type()
}
fn plan_base(&self) -> PlanBaseRef<'_> {
self.0.plan_base()
}
fn convention(&self) -> Convention {
self.0.convention()
}
}
impl GenericPlanRef for PlanRef {
fn id(&self) -> PlanNodeId {
self.plan_base().id()
}
fn schema(&self) -> &Schema {
self.plan_base().schema()
}
fn stream_key(&self) -> Option<&[usize]> {
self.plan_base().stream_key()
}
fn ctx(&self) -> OptimizerContextRef {
self.plan_base().ctx()
}
fn functional_dependency(&self) -> &FunctionalDependencySet {
self.plan_base().functional_dependency()
}
}
impl PhysicalPlanRef for PlanRef {
fn distribution(&self) -> &Distribution {
self.plan_base().distribution()
}
}
impl StreamPlanRef for PlanRef {
fn append_only(&self) -> bool {
self.plan_base().append_only()
}
fn emit_on_window_close(&self) -> bool {
self.plan_base().emit_on_window_close()
}
fn watermark_columns(&self) -> &FixedBitSet {
self.plan_base().watermark_columns()
}
fn columns_monotonicity(&self) -> &MonotonicityMap {
self.plan_base().columns_monotonicity()
}
}
impl BatchPlanRef for PlanRef {
fn order(&self) -> &Order {
self.plan_base().order()
}
}
pub fn reorganize_elements_id(plan: PlanRef) -> PlanRef {
let old_expr_display_id = plan.ctx().get_expr_display_id();
let old_plan_node_id = plan.ctx().get_plan_node_id();
plan.ctx().set_expr_display_id(0);
plan.ctx().set_plan_node_id(0);
let plan = PlanCloner::clone_whole_plan(plan);
plan.ctx().set_expr_display_id(old_expr_display_id);
plan.ctx().set_plan_node_id(old_plan_node_id);
plan
}
pub trait Explain {
fn explain<'a>(&self) -> Pretty<'a>;
fn explain_with_id<'a>(&self) -> Pretty<'a>;
fn explain_to_string(&self) -> String;
fn explain_to_json(&self) -> String;
fn explain_to_xml(&self) -> String;
fn explain_to_yaml(&self) -> String;
fn explain_to_dot(&self) -> String;
}
impl Explain for PlanRef {
fn explain<'a>(&self) -> Pretty<'a> {
let mut node = self.distill();
let inputs = self.inputs();
for input in inputs.iter().peekable() {
node.children.push(input.explain());
}
Pretty::Record(node)
}
fn explain_with_id<'a>(&self) -> Pretty<'a> {
let node_id = self.id();
let mut node = self.distill();
node.fields
.insert(0, ("id".into(), Pretty::display(&node_id.0)));
let inputs = self.inputs();
for input in inputs.iter().peekable() {
node.children.push(input.explain_with_id());
}
Pretty::Record(node)
}
fn explain_to_string(&self) -> String {
let plan = reorganize_elements_id(self.clone());
let mut output = String::with_capacity(2048);
let mut config = pretty_config();
config.unicode(&mut output, &plan.explain());
output
}
fn explain_to_json(&self) -> String {
let plan = reorganize_elements_id(self.clone());
let explain_ir = plan.explain();
serde_json::to_string_pretty(&PrettySerde(explain_ir, true))
.expect("failed to serialize plan to json")
}
fn explain_to_xml(&self) -> String {
let plan = reorganize_elements_id(self.clone());
let explain_ir = plan.explain();
quick_xml::se::to_string(&PrettySerde(explain_ir, true))
.expect("failed to serialize plan to xml")
}
fn explain_to_yaml(&self) -> String {
let plan = reorganize_elements_id(self.clone());
let explain_ir = plan.explain();
serde_yaml::to_string(&PrettySerde(explain_ir, true))
.expect("failed to serialize plan to yaml")
}
fn explain_to_dot(&self) -> String {
let plan = reorganize_elements_id(self.clone());
let explain_ir = plan.explain_with_id();
let mut graph = Graph::<String, String>::new();
let mut nodes = HashMap::new();
build_graph_from_pretty(&explain_ir, &mut graph, &mut nodes, None);
let dot = Dot::with_config(&graph, &[Config::EdgeNoLabel]);
dot.to_string()
}
}
fn build_graph_from_pretty(
pretty: &Pretty<'_>,
graph: &mut Graph<String, String>,
nodes: &mut HashMap<String, NodeIndex>,
parent_label: Option<&str>,
) {
if let Pretty::Record(r) = pretty {
let mut label = String::new();
label.push_str(&r.name);
for (k, v) in &r.fields {
label.push('\n');
label.push_str(k);
label.push_str(": ");
label.push_str(
&serde_json::to_string(&PrettySerde(v.clone(), false))
.expect("failed to serialize plan to dot"),
);
}
if !r.fields.is_empty() {
label.push('\n');
}
let current_node = *nodes
.entry(label.clone())
.or_insert_with(|| graph.add_node(label.clone()));
if let Some(parent_label) = parent_label {
if let Some(&parent_node) = nodes.get(parent_label) {
graph.add_edge(parent_node, current_node, "contains".to_string());
}
}
for child in &r.children {
build_graph_from_pretty(child, graph, nodes, Some(&label));
}
}
}
pub(crate) fn pretty_config() -> PrettyConfig {
PrettyConfig {
indent: 3,
need_boundaries: false,
width: 2048,
reduced_spaces: true,
}
}
impl dyn PlanNode {
pub fn id(&self) -> PlanNodeId {
self.plan_base().id()
}
pub fn ctx(&self) -> OptimizerContextRef {
self.plan_base().ctx().clone()
}
pub fn schema(&self) -> &Schema {
self.plan_base().schema()
}
pub fn stream_key(&self) -> Option<&[usize]> {
self.plan_base().stream_key()
}
pub fn functional_dependency(&self) -> &FunctionalDependencySet {
self.plan_base().functional_dependency()
}
}
pub const PLAN_DEPTH_THRESHOLD: usize = 30;
pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
Consider simplifying or splitting the query if you encounter any issues.";
impl dyn PlanNode {
pub fn to_stream_prost(
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<PbStreamPlan> {
recursive::tracker!().recurse(|t| {
if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
}
use stream::prelude::*;
if let Some(stream_table_scan) = self.as_stream_table_scan() {
return stream_table_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
return stream_cdc_table_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_source_scan) = self.as_stream_source_scan() {
return stream_source_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_share) = self.as_stream_share() {
return stream_share.adhoc_to_stream_prost(state);
}
let node = Some(self.try_to_stream_prost_body(state)?);
let input = self
.inputs()
.into_iter()
.map(|plan| plan.to_stream_prost(state))
.try_collect()?;
Ok(PbStreamPlan {
input,
identity: self.explain_myself_to_string(),
node_body: node,
operator_id: self.id().0 as _,
stream_key: self
.stream_key()
.unwrap_or_default()
.iter()
.map(|x| *x as u32)
.collect(),
fields: self.schema().to_prost(),
append_only: self.plan_base().append_only(),
})
})
}
pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
self.to_batch_prost_identity(true)
}
pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
recursive::tracker!().recurse(|t| {
if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
}
let node_body = Some(self.try_to_batch_prost_body()?);
let children = self
.inputs()
.into_iter()
.map(|plan| plan.to_batch_prost_identity(identity))
.try_collect()?;
Ok(PbBatchPlan {
children,
identity: if identity {
self.explain_myself_to_string()
} else {
"".into()
},
node_body,
})
})
}
pub fn explain_myself_to_string(&self) -> String {
self.distill_to_string()
}
}
mod plan_base;
pub use plan_base::*;
#[macro_use]
mod plan_tree_node;
pub use plan_tree_node::*;
mod col_pruning;
pub use col_pruning::*;
mod expr_rewritable;
pub use expr_rewritable::*;
mod expr_visitable;
mod convert;
pub use convert::*;
mod eq_join_predicate;
pub use eq_join_predicate::*;
mod to_prost;
pub use to_prost::*;
mod predicate_pushdown;
pub use predicate_pushdown::*;
mod merge_eq_nodes;
pub use merge_eq_nodes::*;
pub mod batch;
pub mod generic;
pub mod stream;
pub use generic::{PlanAggCall, PlanAggCallDisplay};
mod batch_delete;
mod batch_exchange;
mod batch_expand;
mod batch_filter;
mod batch_group_topn;
mod batch_hash_agg;
mod batch_hash_join;
mod batch_hop_window;
mod batch_insert;
mod batch_limit;
mod batch_log_seq_scan;
mod batch_lookup_join;
mod batch_max_one_row;
mod batch_nested_loop_join;
mod batch_over_window;
mod batch_project;
mod batch_project_set;
mod batch_seq_scan;
mod batch_simple_agg;
mod batch_sort;
mod batch_sort_agg;
mod batch_source;
mod batch_sys_seq_scan;
mod batch_table_function;
mod batch_topn;
mod batch_union;
mod batch_update;
mod batch_values;
mod logical_agg;
mod logical_apply;
mod logical_cdc_scan;
mod logical_changelog;
mod logical_cte_ref;
mod logical_dedup;
mod logical_delete;
mod logical_except;
mod logical_expand;
mod logical_filter;
mod logical_hop_window;
mod logical_insert;
mod logical_intersect;
mod logical_join;
mod logical_kafka_scan;
mod logical_limit;
mod logical_max_one_row;
mod logical_multi_join;
mod logical_now;
mod logical_over_window;
mod logical_project;
mod logical_project_set;
mod logical_recursive_union;
mod logical_scan;
mod logical_share;
mod logical_source;
mod logical_sys_scan;
mod logical_table_function;
mod logical_topn;
mod logical_union;
mod logical_update;
mod logical_values;
mod stream_asof_join;
mod stream_changelog;
mod stream_dedup;
mod stream_delta_join;
mod stream_dml;
mod stream_dynamic_filter;
mod stream_eowc_over_window;
mod stream_exchange;
mod stream_expand;
mod stream_filter;
mod stream_fs_fetch;
mod stream_global_approx_percentile;
mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_join_common;
mod stream_local_approx_percentile;
mod stream_materialize;
mod stream_now;
mod stream_over_window;
mod stream_project;
mod stream_project_set;
mod stream_row_id_gen;
mod stream_row_merge;
mod stream_simple_agg;
mod stream_sink;
mod stream_sort;
mod stream_source;
mod stream_source_scan;
mod stream_stateless_simple_agg;
mod stream_table_scan;
mod stream_topn;
mod stream_values;
mod stream_watermark_filter;
mod batch_file_scan;
mod batch_iceberg_scan;
mod batch_kafka_scan;
mod batch_postgres_query;
mod batch_mysql_query;
mod derive;
mod logical_file_scan;
mod logical_iceberg_scan;
mod logical_postgres_query;
mod logical_mysql_query;
mod stream_cdc_table_scan;
mod stream_share;
mod stream_temporal_join;
mod stream_union;
pub mod utils;
pub use batch_delete::BatchDelete;
pub use batch_exchange::BatchExchange;
pub use batch_expand::BatchExpand;
pub use batch_file_scan::BatchFileScan;
pub use batch_filter::BatchFilter;
pub use batch_group_topn::BatchGroupTopN;
pub use batch_hash_agg::BatchHashAgg;
pub use batch_hash_join::BatchHashJoin;
pub use batch_hop_window::BatchHopWindow;
pub use batch_iceberg_scan::BatchIcebergScan;
pub use batch_insert::BatchInsert;
pub use batch_kafka_scan::BatchKafkaScan;
pub use batch_limit::BatchLimit;
pub use batch_log_seq_scan::BatchLogSeqScan;
pub use batch_lookup_join::BatchLookupJoin;
pub use batch_max_one_row::BatchMaxOneRow;
pub use batch_mysql_query::BatchMySqlQuery;
pub use batch_nested_loop_join::BatchNestedLoopJoin;
pub use batch_over_window::BatchOverWindow;
pub use batch_postgres_query::BatchPostgresQuery;
pub use batch_project::BatchProject;
pub use batch_project_set::BatchProjectSet;
pub use batch_seq_scan::BatchSeqScan;
pub use batch_simple_agg::BatchSimpleAgg;
pub use batch_sort::BatchSort;
pub use batch_sort_agg::BatchSortAgg;
pub use batch_source::BatchSource;
pub use batch_sys_seq_scan::BatchSysSeqScan;
pub use batch_table_function::BatchTableFunction;
pub use batch_topn::BatchTopN;
pub use batch_union::BatchUnion;
pub use batch_update::BatchUpdate;
pub use batch_values::BatchValues;
pub use logical_agg::LogicalAgg;
pub use logical_apply::LogicalApply;
pub use logical_cdc_scan::LogicalCdcScan;
pub use logical_changelog::LogicalChangeLog;
pub use logical_cte_ref::LogicalCteRef;
pub use logical_dedup::LogicalDedup;
pub use logical_delete::LogicalDelete;
pub use logical_except::LogicalExcept;
pub use logical_expand::LogicalExpand;
pub use logical_file_scan::LogicalFileScan;
pub use logical_filter::LogicalFilter;
pub use logical_hop_window::LogicalHopWindow;
pub use logical_iceberg_scan::LogicalIcebergScan;
pub use logical_insert::LogicalInsert;
pub use logical_intersect::LogicalIntersect;
pub use logical_join::LogicalJoin;
pub use logical_kafka_scan::LogicalKafkaScan;
pub use logical_limit::LogicalLimit;
pub use logical_max_one_row::LogicalMaxOneRow;
pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder};
pub use logical_mysql_query::LogicalMySqlQuery;
pub use logical_now::LogicalNow;
pub use logical_over_window::LogicalOverWindow;
pub use logical_postgres_query::LogicalPostgresQuery;
pub use logical_project::LogicalProject;
pub use logical_project_set::LogicalProjectSet;
pub use logical_recursive_union::LogicalRecursiveUnion;
pub use logical_scan::LogicalScan;
pub use logical_share::LogicalShare;
pub use logical_source::LogicalSource;
pub use logical_sys_scan::LogicalSysScan;
pub use logical_table_function::LogicalTableFunction;
pub use logical_topn::LogicalTopN;
pub use logical_union::LogicalUnion;
pub use logical_update::LogicalUpdate;
pub use logical_values::LogicalValues;
pub use stream_asof_join::StreamAsOfJoin;
pub use stream_cdc_table_scan::StreamCdcTableScan;
pub use stream_changelog::StreamChangeLog;
pub use stream_dedup::StreamDedup;
pub use stream_delta_join::StreamDeltaJoin;
pub use stream_dml::StreamDml;
pub use stream_dynamic_filter::StreamDynamicFilter;
pub use stream_eowc_over_window::StreamEowcOverWindow;
pub use stream_exchange::StreamExchange;
pub use stream_expand::StreamExpand;
pub use stream_filter::StreamFilter;
pub use stream_fs_fetch::StreamFsFetch;
pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
use stream_join_common::StreamJoinCommon;
pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
pub use stream_over_window::StreamOverWindow;
pub use stream_project::StreamProject;
pub use stream_project_set::StreamProjectSet;
pub use stream_row_id_gen::StreamRowIdGen;
pub use stream_row_merge::StreamRowMerge;
pub use stream_share::StreamShare;
pub use stream_simple_agg::StreamSimpleAgg;
pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink};
pub use stream_sort::StreamEowcSort;
pub use stream_source::StreamSource;
pub use stream_source_scan::StreamSourceScan;
pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
pub use stream_table_scan::StreamTableScan;
pub use stream_temporal_join::StreamTemporalJoin;
pub use stream_topn::StreamTopN;
pub use stream_union::StreamUnion;
pub use stream_values::StreamValues;
pub use stream_watermark_filter::StreamWatermarkFilter;
use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_rewriter::PlanCloner;
use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};
#[macro_export]
macro_rules! for_all_plan_nodes {
($macro:ident) => {
$macro! {
{ Logical, Agg }
, { Logical, Apply }
, { Logical, Filter }
, { Logical, Project }
, { Logical, Scan }
, { Logical, CdcScan }
, { Logical, SysScan }
, { Logical, Source }
, { Logical, Insert }
, { Logical, Delete }
, { Logical, Update }
, { Logical, Join }
, { Logical, Values }
, { Logical, Limit }
, { Logical, TopN }
, { Logical, HopWindow }
, { Logical, TableFunction }
, { Logical, MultiJoin }
, { Logical, Expand }
, { Logical, ProjectSet }
, { Logical, Union }
, { Logical, OverWindow }
, { Logical, Share }
, { Logical, Now }
, { Logical, Dedup }
, { Logical, Intersect }
, { Logical, Except }
, { Logical, MaxOneRow }
, { Logical, KafkaScan }
, { Logical, IcebergScan }
, { Logical, RecursiveUnion }
, { Logical, CteRef }
, { Logical, ChangeLog }
, { Logical, FileScan }
, { Logical, PostgresQuery }
, { Logical, MySqlQuery }
, { Batch, SimpleAgg }
, { Batch, HashAgg }
, { Batch, SortAgg }
, { Batch, Project }
, { Batch, Filter }
, { Batch, Insert }
, { Batch, Delete }
, { Batch, Update }
, { Batch, SeqScan }
, { Batch, SysSeqScan }
, { Batch, LogSeqScan }
, { Batch, HashJoin }
, { Batch, NestedLoopJoin }
, { Batch, Values }
, { Batch, Sort }
, { Batch, Exchange }
, { Batch, Limit }
, { Batch, TopN }
, { Batch, HopWindow }
, { Batch, TableFunction }
, { Batch, Expand }
, { Batch, LookupJoin }
, { Batch, ProjectSet }
, { Batch, Union }
, { Batch, GroupTopN }
, { Batch, Source }
, { Batch, OverWindow }
, { Batch, MaxOneRow }
, { Batch, KafkaScan }
, { Batch, IcebergScan }
, { Batch, FileScan }
, { Batch, PostgresQuery }
, { Batch, MySqlQuery }
, { Stream, Project }
, { Stream, Filter }
, { Stream, TableScan }
, { Stream, CdcTableScan }
, { Stream, Sink }
, { Stream, Source }
, { Stream, SourceScan }
, { Stream, HashJoin }
, { Stream, Exchange }
, { Stream, HashAgg }
, { Stream, SimpleAgg }
, { Stream, StatelessSimpleAgg }
, { Stream, Materialize }
, { Stream, TopN }
, { Stream, HopWindow }
, { Stream, DeltaJoin }
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
, { Stream, GroupTopN }
, { Stream, Union }
, { Stream, RowIdGen }
, { Stream, Dml }
, { Stream, Now }
, { Stream, Share }
, { Stream, WatermarkFilter }
, { Stream, TemporalJoin }
, { Stream, Values }
, { Stream, Dedup }
, { Stream, EowcOverWindow }
, { Stream, EowcSort }
, { Stream, OverWindow }
, { Stream, FsFetch }
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, RowMerge }
, { Stream, AsOfJoin }
}
};
}
#[macro_export]
macro_rules! for_logical_plan_nodes {
($macro:ident) => {
$macro! {
{ Logical, Agg }
, { Logical, Apply }
, { Logical, Filter }
, { Logical, Project }
, { Logical, Scan }
, { Logical, CdcScan }
, { Logical, SysScan }
, { Logical, Source }
, { Logical, Insert }
, { Logical, Delete }
, { Logical, Update }
, { Logical, Join }
, { Logical, Values }
, { Logical, Limit }
, { Logical, TopN }
, { Logical, HopWindow }
, { Logical, TableFunction }
, { Logical, MultiJoin }
, { Logical, Expand }
, { Logical, ProjectSet }
, { Logical, Union }
, { Logical, OverWindow }
, { Logical, Share }
, { Logical, Now }
, { Logical, Dedup }
, { Logical, Intersect }
, { Logical, Except }
, { Logical, MaxOneRow }
, { Logical, KafkaScan }
, { Logical, IcebergScan }
, { Logical, RecursiveUnion }
, { Logical, CteRef }
, { Logical, ChangeLog }
, { Logical, FileScan }
, { Logical, PostgresQuery }
, { Logical, MySqlQuery }
}
};
}
#[macro_export]
macro_rules! for_batch_plan_nodes {
($macro:ident) => {
$macro! {
{ Batch, SimpleAgg }
, { Batch, HashAgg }
, { Batch, SortAgg }
, { Batch, Project }
, { Batch, Filter }
, { Batch, SeqScan }
, { Batch, SysSeqScan }
, { Batch, LogSeqScan }
, { Batch, HashJoin }
, { Batch, NestedLoopJoin }
, { Batch, Values }
, { Batch, Limit }
, { Batch, Sort }
, { Batch, TopN }
, { Batch, Exchange }
, { Batch, Insert }
, { Batch, Delete }
, { Batch, Update }
, { Batch, HopWindow }
, { Batch, TableFunction }
, { Batch, Expand }
, { Batch, LookupJoin }
, { Batch, ProjectSet }
, { Batch, Union }
, { Batch, GroupTopN }
, { Batch, Source }
, { Batch, OverWindow }
, { Batch, MaxOneRow }
, { Batch, KafkaScan }
, { Batch, IcebergScan }
, { Batch, FileScan }
, { Batch, PostgresQuery }
, { Batch, MySqlQuery }
}
};
}
#[macro_export]
macro_rules! for_stream_plan_nodes {
($macro:ident) => {
$macro! {
{ Stream, Project }
, { Stream, Filter }
, { Stream, HashJoin }
, { Stream, Exchange }
, { Stream, TableScan }
, { Stream, CdcTableScan }
, { Stream, Sink }
, { Stream, Source }
, { Stream, SourceScan }
, { Stream, HashAgg }
, { Stream, SimpleAgg }
, { Stream, StatelessSimpleAgg }
, { Stream, Materialize }
, { Stream, TopN }
, { Stream, HopWindow }
, { Stream, DeltaJoin }
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
, { Stream, GroupTopN }
, { Stream, Union }
, { Stream, RowIdGen }
, { Stream, Dml }
, { Stream, Now }
, { Stream, Share }
, { Stream, WatermarkFilter }
, { Stream, TemporalJoin }
, { Stream, Values }
, { Stream, Dedup }
, { Stream, EowcOverWindow }
, { Stream, EowcSort }
, { Stream, OverWindow }
, { Stream, FsFetch }
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, RowMerge }
, { Stream, AsOfJoin }
}
};
}
macro_rules! impl_plan_node_meta {
($( { $convention:ident, $name:ident }),*) => {
paste!{
#[derive(Copy, Clone, PartialEq, Debug, Hash, Eq, Serialize)]
pub enum PlanNodeType {
$( [<$convention $name>] ),*
}
$(impl PlanNodeMeta for [<$convention $name>] {
type Convention = $convention;
const NODE_TYPE: PlanNodeType = PlanNodeType::[<$convention $name>];
fn plan_base(&self) -> &PlanBase<$convention> {
&self.base
}
fn plan_base_ref(&self) -> PlanBaseRef<'_> {
PlanBaseRef::$convention(&self.base)
}
}
impl Deref for [<$convention $name>] {
type Target = PlanBase<$convention>;
fn deref(&self) -> &Self::Target {
&self.base
}
})*
}
}
}
for_all_plan_nodes! { impl_plan_node_meta }
macro_rules! impl_plan_node {
($({ $convention:ident, $name:ident }),*) => {
paste!{
$(impl PlanNode for [<$convention $name>] { })*
}
}
}
for_all_plan_nodes! { impl_plan_node }
macro_rules! impl_down_cast_fn {
($( { $convention:ident, $name:ident }),*) => {
paste!{
impl dyn PlanNode {
$( pub fn [< as_$convention:snake _ $name:snake>](&self) -> Option<&[<$convention $name>]> {
self.downcast_ref::<[<$convention $name>]>()
} )*
}
}
}
}
for_all_plan_nodes! { impl_down_cast_fn }