risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42        stage_name: &str,
43    ) -> Result<PlanRef> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage,
69    ) -> Result<PlanRef> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage,
80    ) -> Result<PlanRef> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage {
92    stage_name: String,
93    rules: Vec<BoxedRule>,
94    apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply internal backfill progress rule first
139            TableFunctionToInternalBackfillProgressRule::create(),
140            // Apply postgres query rule next
141            TableFunctionToPostgresQueryRule::create(),
142            // Apply mysql query rule next
143            TableFunctionToMySqlQueryRule::create(),
144            // Apply project set rule last
145            TableFunctionToProjectSetRule::create(),
146        ],
147        ApplyOrder::TopDown,
148    )
149});
150
151static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
152    OptimizationStage::new(
153        "Table Function To FileScan",
154        vec![TableFunctionToFileScanRule::create()],
155        ApplyOrder::TopDown,
156    )
157});
158
159static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
160    OptimizationStage::new(
161        "Table Function To PostgresQuery",
162        vec![TableFunctionToPostgresQueryRule::create()],
163        ApplyOrder::TopDown,
164    )
165});
166
167static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
168    OptimizationStage::new(
169        "Table Function To MySQL",
170        vec![TableFunctionToMySqlQueryRule::create()],
171        ApplyOrder::TopDown,
172    )
173});
174
175static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
176    LazyLock::new(|| {
177        OptimizationStage::new(
178            "Table Function To Internal Backfill Progress",
179            vec![TableFunctionToInternalBackfillProgressRule::create()],
180            ApplyOrder::TopDown,
181        )
182    });
183
184static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
185    OptimizationStage::new(
186        "Values Extract Project",
187        vec![ValuesExtractProjectRule::create()],
188        ApplyOrder::TopDown,
189    )
190});
191
192static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
193    OptimizationStage::new(
194        "Simple Unnesting",
195        vec![
196            // Eliminate max one row
197            MaxOneRowEliminateRule::create(),
198            // Convert apply to join.
199            ApplyToJoinRule::create(),
200            // Pull correlated predicates up the algebra tree to unnest simple subquery.
201            PullUpCorrelatedPredicateRule::create(),
202            PullUpCorrelatedPredicateAggRule::create(),
203        ],
204        ApplyOrder::BottomUp,
205    )
206});
207
208static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
209    OptimizationStage::new(
210        "Set Operation Merge",
211        vec![
212            UnionMergeRule::create(),
213            IntersectMergeRule::create(),
214            ExceptMergeRule::create(),
215        ],
216        ApplyOrder::BottomUp,
217    )
218});
219
220static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
221    LazyLock::new(|| {
222        OptimizationStage::new(
223            "General Unnesting(Translate Apply)",
224            vec![TranslateApplyRule::create(true)],
225            ApplyOrder::TopDown,
226        )
227    });
228
229static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
230    LazyLock::new(|| {
231        OptimizationStage::new(
232            "General Unnesting(Translate Apply)",
233            vec![TranslateApplyRule::create(false)],
234            ApplyOrder::TopDown,
235        )
236    });
237
238static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
239    OptimizationStage::new(
240        "General Unnesting(Push Down Apply)",
241        vec![
242            ApplyEliminateRule::create(),
243            ApplyAggTransposeRule::create(),
244            ApplyDedupTransposeRule::create(),
245            ApplyFilterTransposeRule::create(),
246            ApplyProjectTransposeRule::create(),
247            ApplyProjectSetTransposeRule::create(),
248            ApplyTopNTransposeRule::create(),
249            ApplyLimitTransposeRule::create(),
250            ApplyJoinTransposeRule::create(),
251            ApplyUnionTransposeRule::create(),
252            ApplyOverWindowTransposeRule::create(),
253            ApplyExpandTransposeRule::create(),
254            ApplyHopWindowTransposeRule::create(),
255            CrossJoinEliminateRule::create(),
256            ApplyShareEliminateRule::create(),
257        ],
258        ApplyOrder::TopDown,
259    )
260});
261
262static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
263    OptimizationStage::new(
264        "To MultiJoin",
265        vec![MergeMultiJoinRule::create()],
266        ApplyOrder::TopDown,
267    )
268});
269
270static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
271    OptimizationStage::new(
272        "Join Ordering".to_owned(),
273        vec![LeftDeepTreeJoinOrderingRule::create()],
274        ApplyOrder::TopDown,
275    )
276});
277
278static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
279    OptimizationStage::new(
280        "Join Ordering".to_owned(),
281        vec![BushyTreeJoinOrderingRule::create()],
282        ApplyOrder::TopDown,
283    )
284});
285
286static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
287    OptimizationStage::new(
288        "Push down filter with now into a left semijoin",
289        vec![
290            SplitNowAndRule::create(),
291            SplitNowOrRule::create(),
292            FilterWithNowToJoinRule::create(),
293        ],
294        ApplyOrder::TopDown,
295    )
296});
297
298static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
299    OptimizationStage::new(
300        "Push down the calculation of inputs of join's condition",
301        vec![PushCalculationOfJoinRule::create()],
302        ApplyOrder::TopDown,
303    )
304});
305
306static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
307    OptimizationStage::new(
308        "Convert Distinct Aggregation",
309        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
310        ApplyOrder::TopDown,
311    )
312});
313
314static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
315    OptimizationStage::new(
316        "Convert Distinct Aggregation",
317        vec![
318            UnionToDistinctRule::create(),
319            DistinctAggRule::create(false),
320        ],
321        ApplyOrder::TopDown,
322    )
323});
324
325static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
326    OptimizationStage::new(
327        "Simplify Aggregation",
328        vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
329        ApplyOrder::TopDown,
330    )
331});
332
333static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
334    OptimizationStage::new(
335        "Join Commute".to_owned(),
336        vec![JoinCommuteRule::create()],
337        ApplyOrder::TopDown,
338    )
339});
340
341static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
342    OptimizationStage::new(
343        "Constant Output Operator Remove",
344        vec![EmptyAggRemoveRule::create()],
345        ApplyOrder::TopDown,
346    )
347});
348
349static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
350    OptimizationStage::new(
351        "Project Remove",
352        vec![
353            // merge should be applied before eliminate
354            ProjectMergeRule::create(),
355            ProjectEliminateRule::create(),
356            TrivialProjectToValuesRule::create(),
357            UnionInputValuesMergeRule::create(),
358            JoinProjectTransposeRule::create(),
359            // project-join merge should be applied after merge
360            // eliminate and to values
361            ProjectJoinMergeRule::create(),
362            AggProjectMergeRule::create(),
363        ],
364        ApplyOrder::BottomUp,
365    )
366});
367
368static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
369    OptimizationStage::new(
370        "Split Over Window",
371        vec![OverWindowSplitRule::create()],
372        ApplyOrder::TopDown,
373    )
374});
375
376// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
377// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
378// 2. should be after merge the multiple projects
379static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
380    OptimizationStage::new(
381        "Convert Over Window",
382        vec![
383            ProjectMergeRule::create(),
384            ProjectEliminateRule::create(),
385            TrivialProjectToValuesRule::create(),
386            UnionInputValuesMergeRule::create(),
387            OverWindowToAggAndJoinRule::create(),
388            OverWindowToTopNRule::create(),
389        ],
390        ApplyOrder::TopDown,
391    )
392});
393
394static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
395    OptimizationStage::new(
396        "Merge Over Window",
397        vec![OverWindowMergeRule::create()],
398        ApplyOrder::TopDown,
399    )
400});
401
402static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
403    OptimizationStage::new(
404        "Rewrite Like Expr",
405        vec![RewriteLikeExprRule::create()],
406        ApplyOrder::TopDown,
407    )
408});
409
410static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
411    OptimizationStage::new(
412        "TopN/SimpleAgg on Index",
413        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
414        ApplyOrder::TopDown,
415    )
416});
417
418static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
419    OptimizationStage::new(
420        "Void always-false filter's downstream",
421        vec![AlwaysFalseFilterRule::create()],
422        ApplyOrder::TopDown,
423    )
424});
425
426static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
427    OptimizationStage::new(
428        "Push Down Limit",
429        vec![LimitPushDownRule::create()],
430        ApplyOrder::TopDown,
431    )
432});
433
434static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
435    OptimizationStage::new(
436        "Pull Up Hop",
437        vec![PullUpHopRule::create()],
438        ApplyOrder::BottomUp,
439    )
440});
441
442static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
443    OptimizationStage::new(
444        "Set Operation To Join",
445        vec![
446            IntersectToSemiJoinRule::create(),
447            ExceptToAntiJoinRule::create(),
448        ],
449        ApplyOrder::BottomUp,
450    )
451});
452
453static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
454    OptimizationStage::new(
455        "Grouping Sets",
456        vec![
457            GroupingSetsToExpandRule::create(),
458            ExpandToProjectRule::create(),
459        ],
460        ApplyOrder::TopDown,
461    )
462});
463
464static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
465    OptimizationStage::new(
466        "Common Sub Expression Extract",
467        vec![CommonSubExprExtractRule::create()],
468        ApplyOrder::TopDown,
469    )
470});
471
472static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
473    OptimizationStage::new(
474        "Logical Filter Expression Simplify",
475        vec![LogicalFilterExpressionSimplifyRule::create()],
476        ApplyOrder::TopDown,
477    )
478});
479
480static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
481    OptimizationStage::new(
482        "Rewrite Source For Batch",
483        vec![
484            SourceToKafkaScanRule::create(),
485            SourceToIcebergScanRule::create(),
486        ],
487        ApplyOrder::TopDown,
488    )
489});
490
491impl LogicalOptimizer {
492    pub fn predicate_pushdown(
493        plan: PlanRef,
494        explain_trace: bool,
495        ctx: &OptimizerContextRef,
496    ) -> PlanRef {
497        let plan = plan.predicate_pushdown(
498            Condition::true_cond(),
499            &mut PredicatePushdownContext::new(plan.clone()),
500        );
501        if explain_trace {
502            ctx.trace("Predicate Push Down:");
503            ctx.trace(plan.explain_to_string());
504        }
505        plan
506    }
507
508    pub fn subquery_unnesting(
509        mut plan: PlanRef,
510        enable_share_plan: bool,
511        explain_trace: bool,
512        ctx: &OptimizerContextRef,
513    ) -> Result<PlanRef> {
514        // Bail our if no apply operators.
515        if !has_logical_apply(plan.clone()) {
516            return Ok(plan);
517        }
518        // Simple Unnesting.
519        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
520        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
521        // Predicate push down before translate apply, because we need to calculate the domain
522        // and predicate push down can reduce the size of domain.
523        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
524        // In order to unnest values with correlated input ref, we need to extract project first.
525        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
526        // General Unnesting.
527        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
528        // join.
529        plan = if enable_share_plan {
530            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
531        } else {
532            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
533        };
534        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
535
536        // Check if all `Apply`s are eliminated and the subquery is unnested.
537        plan.check_apply_elimination()?;
538
539        Ok(plan)
540    }
541
542    pub fn column_pruning(
543        mut plan: PlanRef,
544        explain_trace: bool,
545        ctx: &OptimizerContextRef,
546    ) -> PlanRef {
547        let required_cols = (0..plan.schema().len()).collect_vec();
548        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
549        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
550        // Column pruning may introduce additional projects, and filter can be pushed again.
551        if explain_trace {
552            ctx.trace("Prune Columns:");
553            ctx.trace(plan.explain_to_string());
554        }
555
556        if column_pruning_ctx.need_second_round() {
557            // Second round of column pruning and reuse the column pruning context.
558            // Try to replace original share operator with the new one.
559            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
560            if explain_trace {
561                ctx.trace("Prune Columns (For DAG):");
562                ctx.trace(plan.explain_to_string());
563            }
564        }
565        plan
566    }
567
568    pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
569        // If now() and proctime() are not found, bail out.
570        let mut v = NowProcTimeFinder::default();
571        plan.visit_exprs_recursive(&mut v);
572        if !v.has() {
573            return plan;
574        }
575
576        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
577
578        let plan = plan.rewrite_exprs_recursive(&mut v);
579
580        if ctx.is_explain_trace() {
581            ctx.trace("Inline Now and ProcTime:");
582            ctx.trace(plan.explain_to_string());
583        }
584        plan
585    }
586
587    pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
588        let ctx = plan.ctx();
589        let explain_trace = ctx.is_explain_trace();
590
591        if explain_trace {
592            ctx.trace("Begin:");
593            ctx.trace(plan.explain_to_string());
594        }
595
596        // Convert grouping sets at first because other agg rule can't handle grouping sets.
597        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
598        // Remove nodes with constant output.
599        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
600        // Remove project to make common sub-plan sharing easier.
601        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
602
603        // If share plan is disable, we need to remove all the share operator generated by the
604        // binder, e.g. CTE and View. However, we still need to share source to ensure self
605        // source join can return correct result.
606        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
607        if enable_share_plan {
608            // Common sub-plan sharing.
609            plan = plan.common_subplan_sharing();
610            plan = plan.prune_share();
611            if explain_trace {
612                ctx.trace("Common Sub-plan Sharing:");
613                ctx.trace(plan.explain_to_string());
614            }
615        } else {
616            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
617
618            // Replace source to share source.
619            // Perform share source at the beginning so that we can benefit from predicate pushdown
620            // and column pruning for the share operator.
621            plan = ShareSourceRewriter::share_source(plan);
622            if explain_trace {
623                ctx.trace("Share Source:");
624                ctx.trace(plan.explain_to_string());
625            }
626        }
627        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
628        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
629        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
630        // Should be applied before converting table function to project set.
631        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
632        // In order to unnest a table function, we need to convert it into a `project_set` first.
633        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
634
635        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
636        if has_logical_max_one_row(plan.clone()) {
637            // `MaxOneRow` is currently only used for the runtime check of
638            // scalar subqueries, while it's not supported in streaming mode, so
639            // we raise a precise error here.
640            bail!("Scalar subquery might produce more than one row.");
641        }
642
643        // Same to batch plan optimization, this rule shall be applied before
644        // predicate push down
645        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
646
647        // Predicate Push-down
648        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
649
650        if plan.ctx().session_ctx().config().enable_join_ordering() {
651            // Merge inner joins and intermediate filters into multijoin
652            // This rule assumes that filters have already been pushed down near to
653            // their relevant joins.
654            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
655
656            // Reorder multijoin into join tree.
657            if plan
658                .ctx()
659                .session_ctx()
660                .config()
661                .streaming_enable_bushy_join()
662            {
663                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
664            } else {
665                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
666            }
667        }
668
669        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
670        // conditions into a filter above the multijoin.
671        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
672
673        // For stream, push down predicates with now into a left-semi join
674        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
675
676        // Push down the calculation of inputs of join's condition.
677        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
678
679        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
680        // Must push down predicates again after split over window so that OverWindow can be
681        // optimized to TopN.
682        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
683        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
684        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
685
686        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
687        // TODO: better naming of the OptimizationStage
688        // Convert distinct aggregates.
689        plan = if force_split_distinct_agg {
690            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
691        } else {
692            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
693        };
694
695        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
696
697        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
698
699        // Do a final column pruning and predicate pushing down to clean up the plan.
700        plan = Self::column_pruning(plan, explain_trace, &ctx);
701        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
702
703        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
704        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
705
706        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
707
708        #[cfg(debug_assertions)]
709        InputRefValidator.validate(plan.clone());
710
711        if ctx.is_explain_logical() {
712            match ctx.explain_format() {
713                ExplainFormat::Text => {
714                    ctx.store_logical(plan.explain_to_string());
715                }
716                ExplainFormat::Json => {
717                    ctx.store_logical(plan.explain_to_json());
718                }
719                ExplainFormat::Xml => {
720                    ctx.store_logical(plan.explain_to_xml());
721                }
722                ExplainFormat::Yaml => {
723                    ctx.store_logical(plan.explain_to_yaml());
724                }
725                ExplainFormat::Dot => {
726                    ctx.store_logical(plan.explain_to_dot());
727                }
728            }
729        }
730
731        Ok(plan)
732    }
733
734    pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
735        let ctx = plan.ctx();
736        let explain_trace = ctx.is_explain_trace();
737
738        if explain_trace {
739            ctx.trace("Begin:");
740            ctx.trace(plan.explain_to_string());
741        }
742
743        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
744        plan = Self::inline_now_proc_time(plan, &ctx);
745
746        // Convert the dag back to the tree, because we don't support DAG plan for batch.
747        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
748
749        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
750        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
751        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
752        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
753        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
754        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
755        // Table function should be converted into `file_scan` before `project_set`.
756        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
757        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
758        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
759        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
760        // In order to unnest a table function, we need to convert it into a `project_set` first.
761        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
762
763        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
764
765        // Filter simplification must be applied before predicate push-down
766        // otherwise the filter for some nodes (e.g., `LogicalScan`)
767        // may not be properly applied.
768        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
769
770        // Predicate Push-down
771        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
772        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
773
774        if plan.ctx().session_ctx().config().enable_join_ordering() {
775            // Merge inner joins and intermediate filters into multijoin
776            // This rule assumes that filters have already been pushed down near to
777            // their relevant joins.
778            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
779
780            // Reorder multijoin into left-deep join tree.
781            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
782        }
783
784        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
785        // conditions into a filter above the multijoin.
786        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
787            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
788            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
789        }
790
791        // Push down the calculation of inputs of join's condition.
792        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
793
794        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
795        // Must push down predicates again after split over window so that OverWindow can be
796        // optimized to TopN.
797        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
798            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
799            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
800        }
801        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
802        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
803
804        // Convert distinct aggregates.
805        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
806
807        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
808
809        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
810
811        // Do a final column pruning and predicate pushing down to clean up the plan.
812        plan = Self::column_pruning(plan, explain_trace, &ctx);
813        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
814            (#[allow(unused_assignments)]
815            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
816            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
817        }
818
819        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
820        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
821
822        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
823
824        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
825
826        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
827
828        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
829
830        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
831
832        #[cfg(debug_assertions)]
833        InputRefValidator.validate(plan.clone());
834
835        if ctx.is_explain_logical() {
836            match ctx.explain_format() {
837                ExplainFormat::Text => {
838                    ctx.store_logical(plan.explain_to_string());
839                }
840                ExplainFormat::Json => {
841                    ctx.store_logical(plan.explain_to_json());
842                }
843                ExplainFormat::Xml => {
844                    ctx.store_logical(plan.explain_to_xml());
845                }
846                ExplainFormat::Yaml => {
847                    ctx.store_logical(plan.explain_to_yaml());
848                }
849                ExplainFormat::Dot => {
850                    ctx.store_logical(plan.explain_to_dot());
851                }
852            }
853        }
854
855        Ok(plan)
856    }
857}