use itertools::Itertools;
use risingwave_common::bail;
use thiserror_ext::AsReport as _;
use super::plan_node::RewriteExprsRecursive;
use super::plan_visitor::has_logical_max_one_row;
use crate::error::Result;
use crate::expr::NowProcTimeFinder;
use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
};
use crate::optimizer::plan_rewriter::ShareSourceRewriter;
#[cfg(debug_assertions)]
use crate::optimizer::plan_visitor::InputRefValidator;
use crate::optimizer::plan_visitor::{
has_logical_apply, HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor,
};
use crate::optimizer::rule::*;
use crate::optimizer::PlanRef;
use crate::utils::Condition;
use crate::{Explain, OptimizerContextRef};
impl PlanRef {
fn optimize_by_rules_inner(
self,
heuristic_optimizer: &mut HeuristicOptimizer<'_>,
stage_name: &str,
) -> Result<PlanRef> {
let ctx = self.ctx();
let result = heuristic_optimizer.optimize(self);
let stats = heuristic_optimizer.get_stats();
if ctx.is_explain_trace() && stats.has_applied_rule() {
ctx.trace(format!("{}:", stage_name));
ctx.trace(format!("{}", stats));
ctx.trace(match &result {
Ok(plan) => plan.explain_to_string(),
Err(error) => format!("Optimization failed: {}", error.as_report()),
});
}
ctx.add_rule_applied(stats.total_applied());
result
}
pub(crate) fn optimize_by_rules(
self,
OptimizationStage {
stage_name,
rules,
apply_order,
}: &OptimizationStage,
) -> Result<PlanRef> {
self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
}
pub(crate) fn optimize_by_rules_until_fix_point(
mut self,
OptimizationStage {
stage_name,
rules,
apply_order,
}: &OptimizationStage,
) -> Result<PlanRef> {
loop {
let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
if !heuristic_optimizer.get_stats().has_applied_rule() {
return Ok(self);
}
}
}
}
pub struct OptimizationStage {
stage_name: String,
rules: Vec<BoxedRule>,
apply_order: ApplyOrder,
}
impl OptimizationStage {
pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
where
S: Into<String>,
{
OptimizationStage {
stage_name: name.into(),
rules,
apply_order,
}
}
}
use std::sync::LazyLock;
use risingwave_sqlparser::ast::ExplainFormat;
pub struct LogicalOptimizer {}
static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"DAG To Tree",
vec![DagToTreeRule::create()],
ApplyOrder::TopDown,
)
});
static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert GENERATE_SERIES Ends With NOW",
vec![GenerateSeriesWithNowRule::create()],
ApplyOrder::TopDown,
)
});
static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function Convert",
vec![
TableFunctionToFileScanRule::create(),
TableFunctionToPostgresQueryRule::create(),
TableFunctionToMySqlQueryRule::create(),
TableFunctionToProjectSetRule::create(),
],
ApplyOrder::TopDown,
)
});
static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To FileScan",
vec![TableFunctionToFileScanRule::create()],
ApplyOrder::TopDown,
)
});
static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To PostgresQuery",
vec![TableFunctionToPostgresQueryRule::create()],
ApplyOrder::TopDown,
)
});
static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To MySQL",
vec![TableFunctionToMySqlQueryRule::create()],
ApplyOrder::TopDown,
)
});
static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Values Extract Project",
vec![ValuesExtractProjectRule::create()],
ApplyOrder::TopDown,
)
});
static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Simple Unnesting",
vec![
MaxOneRowEliminateRule::create(),
ApplyToJoinRule::create(),
PullUpCorrelatedPredicateRule::create(),
PullUpCorrelatedPredicateAggRule::create(),
],
ApplyOrder::BottomUp,
)
});
static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Set Operation Merge",
vec![
UnionMergeRule::create(),
IntersectMergeRule::create(),
ExceptMergeRule::create(),
],
ApplyOrder::BottomUp,
)
});
static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Translate Apply)",
vec![TranslateApplyRule::create(true)],
ApplyOrder::TopDown,
)
});
static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Translate Apply)",
vec![TranslateApplyRule::create(false)],
ApplyOrder::TopDown,
)
});
static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Push Down Apply)",
vec![
ApplyEliminateRule::create(),
ApplyAggTransposeRule::create(),
ApplyDedupTransposeRule::create(),
ApplyFilterTransposeRule::create(),
ApplyProjectTransposeRule::create(),
ApplyProjectSetTransposeRule::create(),
ApplyTopNTransposeRule::create(),
ApplyLimitTransposeRule::create(),
ApplyJoinTransposeRule::create(),
ApplyUnionTransposeRule::create(),
ApplyOverWindowTransposeRule::create(),
ApplyExpandTransposeRule::create(),
ApplyHopWindowTransposeRule::create(),
CrossJoinEliminateRule::create(),
ApplyShareEliminateRule::create(),
],
ApplyOrder::TopDown,
)
});
static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"To MultiJoin",
vec![MergeMultiJoinRule::create()],
ApplyOrder::TopDown,
)
});
static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Join Ordering".to_string(),
vec![LeftDeepTreeJoinOrderingRule::create()],
ApplyOrder::TopDown,
)
});
static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Join Ordering".to_string(),
vec![BushyTreeJoinOrderingRule::create()],
ApplyOrder::TopDown,
)
});
static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Push down filter with now into a left semijoin",
vec![
SplitNowAndRule::create(),
SplitNowOrRule::create(),
FilterWithNowToJoinRule::create(),
],
ApplyOrder::TopDown,
)
});
static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Push down the calculation of inputs of join's condition",
vec![PushCalculationOfJoinRule::create()],
ApplyOrder::TopDown,
)
});
static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert Distinct Aggregation",
vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
ApplyOrder::TopDown,
)
});
static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert Distinct Aggregation",
vec![
UnionToDistinctRule::create(),
DistinctAggRule::create(false),
],
ApplyOrder::TopDown,
)
});
static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Simplify Aggregation",
vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
ApplyOrder::TopDown,
)
});
static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Join Commute".to_string(),
vec![JoinCommuteRule::create()],
ApplyOrder::TopDown,
)
});
static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Project Remove",
vec![
ProjectMergeRule::create(),
ProjectEliminateRule::create(),
TrivialProjectToValuesRule::create(),
UnionInputValuesMergeRule::create(),
JoinProjectTransposeRule::create(),
ProjectJoinMergeRule::create(),
AggProjectMergeRule::create(),
],
ApplyOrder::BottomUp,
)
});
static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Split Over Window",
vec![OverWindowSplitRule::create()],
ApplyOrder::TopDown,
)
});
static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert Over Window",
vec![
ProjectMergeRule::create(),
ProjectEliminateRule::create(),
TrivialProjectToValuesRule::create(),
UnionInputValuesMergeRule::create(),
OverWindowToAggAndJoinRule::create(),
OverWindowToTopNRule::create(),
],
ApplyOrder::TopDown,
)
});
static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Merge Over Window",
vec![OverWindowMergeRule::create()],
ApplyOrder::TopDown,
)
});
static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Rewrite Like Expr",
vec![RewriteLikeExprRule::create()],
ApplyOrder::TopDown,
)
});
static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"TopN/SimpleAgg on Index",
vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
ApplyOrder::TopDown,
)
});
static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Void always-false filter's downstream",
vec![AlwaysFalseFilterRule::create()],
ApplyOrder::TopDown,
)
});
static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Push Down Limit",
vec![LimitPushDownRule::create()],
ApplyOrder::TopDown,
)
});
static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Pull Up Hop",
vec![PullUpHopRule::create()],
ApplyOrder::BottomUp,
)
});
static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Set Operation To Join",
vec![
IntersectToSemiJoinRule::create(),
ExceptToAntiJoinRule::create(),
],
ApplyOrder::BottomUp,
)
});
static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Grouping Sets",
vec![
GroupingSetsToExpandRule::create(),
ExpandToProjectRule::create(),
],
ApplyOrder::TopDown,
)
});
static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Common Sub Expression Extract",
vec![CommonSubExprExtractRule::create()],
ApplyOrder::TopDown,
)
});
static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Logical Filter Expression Simplify",
vec![LogicalFilterExpressionSimplifyRule::create()],
ApplyOrder::TopDown,
)
});
static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Rewrite Source For Batch",
vec![
SourceToKafkaScanRule::create(),
SourceToIcebergScanRule::create(),
],
ApplyOrder::TopDown,
)
});
impl LogicalOptimizer {
pub fn predicate_pushdown(
plan: PlanRef,
explain_trace: bool,
ctx: &OptimizerContextRef,
) -> PlanRef {
let plan = plan.predicate_pushdown(
Condition::true_cond(),
&mut PredicatePushdownContext::new(plan.clone()),
);
if explain_trace {
ctx.trace("Predicate Push Down:");
ctx.trace(plan.explain_to_string());
}
plan
}
pub fn subquery_unnesting(
mut plan: PlanRef,
enable_share_plan: bool,
explain_trace: bool,
ctx: &OptimizerContextRef,
) -> Result<PlanRef> {
if !has_logical_apply(plan.clone()) {
return Ok(plan);
}
plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
plan = Self::predicate_pushdown(plan, explain_trace, ctx);
plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
plan = if enable_share_plan {
plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
} else {
plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
};
plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
plan.check_apply_elimination()?;
Ok(plan)
}
pub fn column_pruning(
mut plan: PlanRef,
explain_trace: bool,
ctx: &OptimizerContextRef,
) -> PlanRef {
let required_cols = (0..plan.schema().len()).collect_vec();
let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
if explain_trace {
ctx.trace("Prune Columns:");
ctx.trace(plan.explain_to_string());
}
if column_pruning_ctx.need_second_round() {
plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
if explain_trace {
ctx.trace("Prune Columns (For DAG):");
ctx.trace(plan.explain_to_string());
}
}
plan
}
pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
let mut v = NowProcTimeFinder::default();
plan.visit_exprs_recursive(&mut v);
if !v.has() {
return plan;
}
let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
let plan = plan.rewrite_exprs_recursive(&mut v);
if ctx.is_explain_trace() {
ctx.trace("Inline Now and ProcTime:");
ctx.trace(plan.explain_to_string());
}
plan
}
pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
let ctx = plan.ctx();
let explain_trace = ctx.is_explain_trace();
if explain_trace {
ctx.trace("Begin:");
ctx.trace(plan.explain_to_string());
}
plan = plan.optimize_by_rules(&GROUPING_SETS)?;
plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
if enable_share_plan {
plan = plan.common_subplan_sharing();
plan = plan.prune_share();
if explain_trace {
ctx.trace("Common Sub-plan Sharing:");
ctx.trace(plan.explain_to_string());
}
} else {
plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
plan = ShareSourceRewriter::share_source(plan);
if explain_trace {
ctx.trace("Share Source:");
ctx.trace(plan.explain_to_string());
}
}
plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
if has_logical_max_one_row(plan.clone()) {
bail!("Scalar subquery might produce more than one row.");
}
plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
if plan.ctx().session_ctx().config().enable_join_ordering() {
plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
if plan
.ctx()
.session_ctx()
.config()
.streaming_enable_bushy_join()
{
plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
} else {
plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
}
}
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
plan = if force_split_distinct_agg {
plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
} else {
plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
};
plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
plan = Self::column_pruning(plan, explain_trace, &ctx);
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());
if ctx.is_explain_logical() {
match ctx.explain_format() {
ExplainFormat::Text => {
ctx.store_logical(plan.explain_to_string());
}
ExplainFormat::Json => {
ctx.store_logical(plan.explain_to_json());
}
ExplainFormat::Xml => {
ctx.store_logical(plan.explain_to_xml());
}
ExplainFormat::Yaml => {
ctx.store_logical(plan.explain_to_yaml());
}
ExplainFormat::Dot => {
ctx.store_logical(plan.explain_to_dot());
}
}
}
Ok(plan)
}
pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
let ctx = plan.ctx();
let explain_trace = ctx.is_explain_trace();
if explain_trace {
ctx.trace("Begin:");
ctx.trace(plan.explain_to_string());
}
plan = Self::inline_now_proc_time(plan, &ctx);
plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
plan = plan.optimize_by_rules(&GROUPING_SETS)?;
plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
if plan.ctx().session_ctx().config().enable_join_ordering() {
plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
}
if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
}
plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
}
plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
plan = Self::column_pruning(plan, explain_trace, &ctx);
if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
(#[allow(unused_assignments)]
last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
}
plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());
if ctx.is_explain_logical() {
match ctx.explain_format() {
ExplainFormat::Text => {
ctx.store_logical(plan.explain_to_string());
}
ExplainFormat::Json => {
ctx.store_logical(plan.explain_to_json());
}
ExplainFormat::Xml => {
ctx.store_logical(plan.explain_to_xml());
}
ExplainFormat::Yaml => {
ctx.store_logical(plan.explain_to_yaml());
}
ExplainFormat::Dot => {
ctx.store_logical(plan.explain_to_dot());
}
}
}
Ok(plan)
}
}