risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42        stage_name: &str,
43    ) -> Result<PlanRef> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage,
69    ) -> Result<PlanRef> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage,
80    ) -> Result<PlanRef> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage {
92    stage_name: String,
93    rules: Vec<BoxedRule>,
94    apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply postgres query rule next
139            TableFunctionToPostgresQueryRule::create(),
140            // Apply mysql query rule next
141            TableFunctionToMySqlQueryRule::create(),
142            // Apply project set rule last
143            TableFunctionToProjectSetRule::create(),
144        ],
145        ApplyOrder::TopDown,
146    )
147});
148
149static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
150    OptimizationStage::new(
151        "Table Function To FileScan",
152        vec![TableFunctionToFileScanRule::create()],
153        ApplyOrder::TopDown,
154    )
155});
156
157static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
158    OptimizationStage::new(
159        "Table Function To PostgresQuery",
160        vec![TableFunctionToPostgresQueryRule::create()],
161        ApplyOrder::TopDown,
162    )
163});
164
165static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
166    OptimizationStage::new(
167        "Table Function To MySQL",
168        vec![TableFunctionToMySqlQueryRule::create()],
169        ApplyOrder::TopDown,
170    )
171});
172
173static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
174    OptimizationStage::new(
175        "Values Extract Project",
176        vec![ValuesExtractProjectRule::create()],
177        ApplyOrder::TopDown,
178    )
179});
180
181static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
182    OptimizationStage::new(
183        "Simple Unnesting",
184        vec![
185            // Eliminate max one row
186            MaxOneRowEliminateRule::create(),
187            // Convert apply to join.
188            ApplyToJoinRule::create(),
189            // Pull correlated predicates up the algebra tree to unnest simple subquery.
190            PullUpCorrelatedPredicateRule::create(),
191            PullUpCorrelatedPredicateAggRule::create(),
192        ],
193        ApplyOrder::BottomUp,
194    )
195});
196
197static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
198    OptimizationStage::new(
199        "Set Operation Merge",
200        vec![
201            UnionMergeRule::create(),
202            IntersectMergeRule::create(),
203            ExceptMergeRule::create(),
204        ],
205        ApplyOrder::BottomUp,
206    )
207});
208
209static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
210    LazyLock::new(|| {
211        OptimizationStage::new(
212            "General Unnesting(Translate Apply)",
213            vec![TranslateApplyRule::create(true)],
214            ApplyOrder::TopDown,
215        )
216    });
217
218static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
219    LazyLock::new(|| {
220        OptimizationStage::new(
221            "General Unnesting(Translate Apply)",
222            vec![TranslateApplyRule::create(false)],
223            ApplyOrder::TopDown,
224        )
225    });
226
227static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
228    OptimizationStage::new(
229        "General Unnesting(Push Down Apply)",
230        vec![
231            ApplyEliminateRule::create(),
232            ApplyAggTransposeRule::create(),
233            ApplyDedupTransposeRule::create(),
234            ApplyFilterTransposeRule::create(),
235            ApplyProjectTransposeRule::create(),
236            ApplyProjectSetTransposeRule::create(),
237            ApplyTopNTransposeRule::create(),
238            ApplyLimitTransposeRule::create(),
239            ApplyJoinTransposeRule::create(),
240            ApplyUnionTransposeRule::create(),
241            ApplyOverWindowTransposeRule::create(),
242            ApplyExpandTransposeRule::create(),
243            ApplyHopWindowTransposeRule::create(),
244            CrossJoinEliminateRule::create(),
245            ApplyShareEliminateRule::create(),
246        ],
247        ApplyOrder::TopDown,
248    )
249});
250
251static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
252    OptimizationStage::new(
253        "To MultiJoin",
254        vec![MergeMultiJoinRule::create()],
255        ApplyOrder::TopDown,
256    )
257});
258
259static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
260    OptimizationStage::new(
261        "Join Ordering".to_owned(),
262        vec![LeftDeepTreeJoinOrderingRule::create()],
263        ApplyOrder::TopDown,
264    )
265});
266
267static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
268    OptimizationStage::new(
269        "Join Ordering".to_owned(),
270        vec![BushyTreeJoinOrderingRule::create()],
271        ApplyOrder::TopDown,
272    )
273});
274
275static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
276    OptimizationStage::new(
277        "Push down filter with now into a left semijoin",
278        vec![
279            SplitNowAndRule::create(),
280            SplitNowOrRule::create(),
281            FilterWithNowToJoinRule::create(),
282        ],
283        ApplyOrder::TopDown,
284    )
285});
286
287static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
288    OptimizationStage::new(
289        "Push down the calculation of inputs of join's condition",
290        vec![PushCalculationOfJoinRule::create()],
291        ApplyOrder::TopDown,
292    )
293});
294
295static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
296    OptimizationStage::new(
297        "Convert Distinct Aggregation",
298        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
299        ApplyOrder::TopDown,
300    )
301});
302
303static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
304    OptimizationStage::new(
305        "Convert Distinct Aggregation",
306        vec![
307            UnionToDistinctRule::create(),
308            DistinctAggRule::create(false),
309        ],
310        ApplyOrder::TopDown,
311    )
312});
313
314static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
315    OptimizationStage::new(
316        "Simplify Aggregation",
317        vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
318        ApplyOrder::TopDown,
319    )
320});
321
322static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
323    OptimizationStage::new(
324        "Join Commute".to_owned(),
325        vec![JoinCommuteRule::create()],
326        ApplyOrder::TopDown,
327    )
328});
329
330static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
331    OptimizationStage::new(
332        "Constant Output Operator Remove",
333        vec![EmptyAggRemoveRule::create()],
334        ApplyOrder::TopDown,
335    )
336});
337
338static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
339    OptimizationStage::new(
340        "Project Remove",
341        vec![
342            // merge should be applied before eliminate
343            ProjectMergeRule::create(),
344            ProjectEliminateRule::create(),
345            TrivialProjectToValuesRule::create(),
346            UnionInputValuesMergeRule::create(),
347            JoinProjectTransposeRule::create(),
348            // project-join merge should be applied after merge
349            // eliminate and to values
350            ProjectJoinMergeRule::create(),
351            AggProjectMergeRule::create(),
352        ],
353        ApplyOrder::BottomUp,
354    )
355});
356
357static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
358    OptimizationStage::new(
359        "Split Over Window",
360        vec![OverWindowSplitRule::create()],
361        ApplyOrder::TopDown,
362    )
363});
364
365// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
366// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
367// 2. should be after merge the multiple projects
368static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
369    OptimizationStage::new(
370        "Convert Over Window",
371        vec![
372            ProjectMergeRule::create(),
373            ProjectEliminateRule::create(),
374            TrivialProjectToValuesRule::create(),
375            UnionInputValuesMergeRule::create(),
376            OverWindowToAggAndJoinRule::create(),
377            OverWindowToTopNRule::create(),
378        ],
379        ApplyOrder::TopDown,
380    )
381});
382
383static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
384    OptimizationStage::new(
385        "Merge Over Window",
386        vec![OverWindowMergeRule::create()],
387        ApplyOrder::TopDown,
388    )
389});
390
391static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
392    OptimizationStage::new(
393        "Rewrite Like Expr",
394        vec![RewriteLikeExprRule::create()],
395        ApplyOrder::TopDown,
396    )
397});
398
399static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
400    OptimizationStage::new(
401        "TopN/SimpleAgg on Index",
402        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
403        ApplyOrder::TopDown,
404    )
405});
406
407static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
408    OptimizationStage::new(
409        "Void always-false filter's downstream",
410        vec![AlwaysFalseFilterRule::create()],
411        ApplyOrder::TopDown,
412    )
413});
414
415static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
416    OptimizationStage::new(
417        "Push Down Limit",
418        vec![LimitPushDownRule::create()],
419        ApplyOrder::TopDown,
420    )
421});
422
423static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
424    OptimizationStage::new(
425        "Pull Up Hop",
426        vec![PullUpHopRule::create()],
427        ApplyOrder::BottomUp,
428    )
429});
430
431static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
432    OptimizationStage::new(
433        "Set Operation To Join",
434        vec![
435            IntersectToSemiJoinRule::create(),
436            ExceptToAntiJoinRule::create(),
437        ],
438        ApplyOrder::BottomUp,
439    )
440});
441
442static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
443    OptimizationStage::new(
444        "Grouping Sets",
445        vec![
446            GroupingSetsToExpandRule::create(),
447            ExpandToProjectRule::create(),
448        ],
449        ApplyOrder::TopDown,
450    )
451});
452
453static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
454    OptimizationStage::new(
455        "Common Sub Expression Extract",
456        vec![CommonSubExprExtractRule::create()],
457        ApplyOrder::TopDown,
458    )
459});
460
461static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
462    OptimizationStage::new(
463        "Logical Filter Expression Simplify",
464        vec![LogicalFilterExpressionSimplifyRule::create()],
465        ApplyOrder::TopDown,
466    )
467});
468
469static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
470    OptimizationStage::new(
471        "Rewrite Source For Batch",
472        vec![
473            SourceToKafkaScanRule::create(),
474            SourceToIcebergScanRule::create(),
475        ],
476        ApplyOrder::TopDown,
477    )
478});
479
480impl LogicalOptimizer {
481    pub fn predicate_pushdown(
482        plan: PlanRef,
483        explain_trace: bool,
484        ctx: &OptimizerContextRef,
485    ) -> PlanRef {
486        let plan = plan.predicate_pushdown(
487            Condition::true_cond(),
488            &mut PredicatePushdownContext::new(plan.clone()),
489        );
490        if explain_trace {
491            ctx.trace("Predicate Push Down:");
492            ctx.trace(plan.explain_to_string());
493        }
494        plan
495    }
496
497    pub fn subquery_unnesting(
498        mut plan: PlanRef,
499        enable_share_plan: bool,
500        explain_trace: bool,
501        ctx: &OptimizerContextRef,
502    ) -> Result<PlanRef> {
503        // Bail our if no apply operators.
504        if !has_logical_apply(plan.clone()) {
505            return Ok(plan);
506        }
507        // Simple Unnesting.
508        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
509        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
510        // Predicate push down before translate apply, because we need to calculate the domain
511        // and predicate push down can reduce the size of domain.
512        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
513        // In order to unnest values with correlated input ref, we need to extract project first.
514        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
515        // General Unnesting.
516        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
517        // join.
518        plan = if enable_share_plan {
519            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
520        } else {
521            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
522        };
523        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
524
525        // Check if all `Apply`s are eliminated and the subquery is unnested.
526        plan.check_apply_elimination()?;
527
528        Ok(plan)
529    }
530
531    pub fn column_pruning(
532        mut plan: PlanRef,
533        explain_trace: bool,
534        ctx: &OptimizerContextRef,
535    ) -> PlanRef {
536        let required_cols = (0..plan.schema().len()).collect_vec();
537        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
538        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
539        // Column pruning may introduce additional projects, and filter can be pushed again.
540        if explain_trace {
541            ctx.trace("Prune Columns:");
542            ctx.trace(plan.explain_to_string());
543        }
544
545        if column_pruning_ctx.need_second_round() {
546            // Second round of column pruning and reuse the column pruning context.
547            // Try to replace original share operator with the new one.
548            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
549            if explain_trace {
550                ctx.trace("Prune Columns (For DAG):");
551                ctx.trace(plan.explain_to_string());
552            }
553        }
554        plan
555    }
556
557    pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
558        // If now() and proctime() are not found, bail out.
559        let mut v = NowProcTimeFinder::default();
560        plan.visit_exprs_recursive(&mut v);
561        if !v.has() {
562            return plan;
563        }
564
565        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
566
567        let plan = plan.rewrite_exprs_recursive(&mut v);
568
569        if ctx.is_explain_trace() {
570            ctx.trace("Inline Now and ProcTime:");
571            ctx.trace(plan.explain_to_string());
572        }
573        plan
574    }
575
576    pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
577        let ctx = plan.ctx();
578        let explain_trace = ctx.is_explain_trace();
579
580        if explain_trace {
581            ctx.trace("Begin:");
582            ctx.trace(plan.explain_to_string());
583        }
584
585        // Convert grouping sets at first because other agg rule can't handle grouping sets.
586        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
587        // Remove nodes with constant output.
588        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
589        // Remove project to make common sub-plan sharing easier.
590        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
591
592        // If share plan is disable, we need to remove all the share operator generated by the
593        // binder, e.g. CTE and View. However, we still need to share source to ensure self
594        // source join can return correct result.
595        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
596        if enable_share_plan {
597            // Common sub-plan sharing.
598            plan = plan.common_subplan_sharing();
599            plan = plan.prune_share();
600            if explain_trace {
601                ctx.trace("Common Sub-plan Sharing:");
602                ctx.trace(plan.explain_to_string());
603            }
604        } else {
605            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
606
607            // Replace source to share source.
608            // Perform share source at the beginning so that we can benefit from predicate pushdown
609            // and column pruning for the share operator.
610            plan = ShareSourceRewriter::share_source(plan);
611            if explain_trace {
612                ctx.trace("Share Source:");
613                ctx.trace(plan.explain_to_string());
614            }
615        }
616        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
617        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
618        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
619        // Should be applied before converting table function to project set.
620        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
621        // In order to unnest a table function, we need to convert it into a `project_set` first.
622        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
623
624        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
625        if has_logical_max_one_row(plan.clone()) {
626            // `MaxOneRow` is currently only used for the runtime check of
627            // scalar subqueries, while it's not supported in streaming mode, so
628            // we raise a precise error here.
629            bail!("Scalar subquery might produce more than one row.");
630        }
631
632        // Same to batch plan optimization, this rule shall be applied before
633        // predicate push down
634        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
635
636        // Predicate Push-down
637        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
638
639        if plan.ctx().session_ctx().config().enable_join_ordering() {
640            // Merge inner joins and intermediate filters into multijoin
641            // This rule assumes that filters have already been pushed down near to
642            // their relevant joins.
643            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
644
645            // Reorder multijoin into join tree.
646            if plan
647                .ctx()
648                .session_ctx()
649                .config()
650                .streaming_enable_bushy_join()
651            {
652                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
653            } else {
654                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
655            }
656        }
657
658        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
659        // conditions into a filter above the multijoin.
660        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
661
662        // For stream, push down predicates with now into a left-semi join
663        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
664
665        // Push down the calculation of inputs of join's condition.
666        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
667
668        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
669        // Must push down predicates again after split over window so that OverWindow can be
670        // optimized to TopN.
671        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
672        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
673        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
674
675        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
676        // TODO: better naming of the OptimizationStage
677        // Convert distinct aggregates.
678        plan = if force_split_distinct_agg {
679            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
680        } else {
681            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
682        };
683
684        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
685
686        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
687
688        // Do a final column pruning and predicate pushing down to clean up the plan.
689        plan = Self::column_pruning(plan, explain_trace, &ctx);
690        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
691
692        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
693        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
694
695        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
696
697        #[cfg(debug_assertions)]
698        InputRefValidator.validate(plan.clone());
699
700        if ctx.is_explain_logical() {
701            match ctx.explain_format() {
702                ExplainFormat::Text => {
703                    ctx.store_logical(plan.explain_to_string());
704                }
705                ExplainFormat::Json => {
706                    ctx.store_logical(plan.explain_to_json());
707                }
708                ExplainFormat::Xml => {
709                    ctx.store_logical(plan.explain_to_xml());
710                }
711                ExplainFormat::Yaml => {
712                    ctx.store_logical(plan.explain_to_yaml());
713                }
714                ExplainFormat::Dot => {
715                    ctx.store_logical(plan.explain_to_dot());
716                }
717            }
718        }
719
720        Ok(plan)
721    }
722
723    pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
724        let ctx = plan.ctx();
725        let explain_trace = ctx.is_explain_trace();
726
727        if explain_trace {
728            ctx.trace("Begin:");
729            ctx.trace(plan.explain_to_string());
730        }
731
732        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
733        plan = Self::inline_now_proc_time(plan, &ctx);
734
735        // Convert the dag back to the tree, because we don't support DAG plan for batch.
736        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
737
738        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
739        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
740        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
741        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
742        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
743        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
744        // Table function should be converted into `file_scan` before `project_set`.
745        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
746        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
747        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
748        // In order to unnest a table function, we need to convert it into a `project_set` first.
749        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
750
751        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
752
753        // Filter simplification must be applied before predicate push-down
754        // otherwise the filter for some nodes (e.g., `LogicalScan`)
755        // may not be properly applied.
756        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
757
758        // Predicate Push-down
759        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
760        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
761
762        if plan.ctx().session_ctx().config().enable_join_ordering() {
763            // Merge inner joins and intermediate filters into multijoin
764            // This rule assumes that filters have already been pushed down near to
765            // their relevant joins.
766            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
767
768            // Reorder multijoin into left-deep join tree.
769            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
770        }
771
772        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
773        // conditions into a filter above the multijoin.
774        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
775            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
776            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
777        }
778
779        // Push down the calculation of inputs of join's condition.
780        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
781
782        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
783        // Must push down predicates again after split over window so that OverWindow can be
784        // optimized to TopN.
785        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
786            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
787            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
788        }
789        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
790        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
791
792        // Convert distinct aggregates.
793        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
794
795        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
796
797        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
798
799        // Do a final column pruning and predicate pushing down to clean up the plan.
800        plan = Self::column_pruning(plan, explain_trace, &ctx);
801        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
802            (#[allow(unused_assignments)]
803            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
804            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
805        }
806
807        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
808        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
809
810        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
811
812        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
813
814        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
815
816        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
817
818        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
819
820        #[cfg(debug_assertions)]
821        InputRefValidator.validate(plan.clone());
822
823        if ctx.is_explain_logical() {
824            match ctx.explain_format() {
825                ExplainFormat::Text => {
826                    ctx.store_logical(plan.explain_to_string());
827                }
828                ExplainFormat::Json => {
829                    ctx.store_logical(plan.explain_to_json());
830                }
831                ExplainFormat::Xml => {
832                    ctx.store_logical(plan.explain_to_xml());
833                }
834                ExplainFormat::Yaml => {
835                    ctx.store_logical(plan.explain_to_yaml());
836                }
837                ExplainFormat::Dot => {
838                    ctx.store_logical(plan.explain_to_dot());
839                }
840            }
841        }
842
843        Ok(plan)
844    }
845}