risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42        stage_name: &str,
43    ) -> Result<PlanRef> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage,
69    ) -> Result<PlanRef> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage,
80    ) -> Result<PlanRef> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage {
92    stage_name: String,
93    rules: Vec<BoxedRule>,
94    apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply postgres query rule next
139            TableFunctionToPostgresQueryRule::create(),
140            // Apply mysql query rule next
141            TableFunctionToMySqlQueryRule::create(),
142            // Apply project set rule last
143            TableFunctionToProjectSetRule::create(),
144        ],
145        ApplyOrder::TopDown,
146    )
147});
148
149static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
150    OptimizationStage::new(
151        "Table Function To FileScan",
152        vec![TableFunctionToFileScanRule::create()],
153        ApplyOrder::TopDown,
154    )
155});
156
157static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
158    OptimizationStage::new(
159        "Table Function To PostgresQuery",
160        vec![TableFunctionToPostgresQueryRule::create()],
161        ApplyOrder::TopDown,
162    )
163});
164
165static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
166    OptimizationStage::new(
167        "Table Function To MySQL",
168        vec![TableFunctionToMySqlQueryRule::create()],
169        ApplyOrder::TopDown,
170    )
171});
172
173static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
174    OptimizationStage::new(
175        "Values Extract Project",
176        vec![ValuesExtractProjectRule::create()],
177        ApplyOrder::TopDown,
178    )
179});
180
181static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
182    OptimizationStage::new(
183        "Simple Unnesting",
184        vec![
185            // Eliminate max one row
186            MaxOneRowEliminateRule::create(),
187            // Convert apply to join.
188            ApplyToJoinRule::create(),
189            // Pull correlated predicates up the algebra tree to unnest simple subquery.
190            PullUpCorrelatedPredicateRule::create(),
191            PullUpCorrelatedPredicateAggRule::create(),
192        ],
193        ApplyOrder::BottomUp,
194    )
195});
196
197static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
198    OptimizationStage::new(
199        "Set Operation Merge",
200        vec![
201            UnionMergeRule::create(),
202            IntersectMergeRule::create(),
203            ExceptMergeRule::create(),
204        ],
205        ApplyOrder::BottomUp,
206    )
207});
208
209static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
210    LazyLock::new(|| {
211        OptimizationStage::new(
212            "General Unnesting(Translate Apply)",
213            vec![TranslateApplyRule::create(true)],
214            ApplyOrder::TopDown,
215        )
216    });
217
218static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
219    LazyLock::new(|| {
220        OptimizationStage::new(
221            "General Unnesting(Translate Apply)",
222            vec![TranslateApplyRule::create(false)],
223            ApplyOrder::TopDown,
224        )
225    });
226
227static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
228    OptimizationStage::new(
229        "General Unnesting(Push Down Apply)",
230        vec![
231            ApplyEliminateRule::create(),
232            ApplyAggTransposeRule::create(),
233            ApplyDedupTransposeRule::create(),
234            ApplyFilterTransposeRule::create(),
235            ApplyProjectTransposeRule::create(),
236            ApplyProjectSetTransposeRule::create(),
237            ApplyTopNTransposeRule::create(),
238            ApplyLimitTransposeRule::create(),
239            ApplyJoinTransposeRule::create(),
240            ApplyUnionTransposeRule::create(),
241            ApplyOverWindowTransposeRule::create(),
242            ApplyExpandTransposeRule::create(),
243            ApplyHopWindowTransposeRule::create(),
244            CrossJoinEliminateRule::create(),
245            ApplyShareEliminateRule::create(),
246        ],
247        ApplyOrder::TopDown,
248    )
249});
250
251static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
252    OptimizationStage::new(
253        "To MultiJoin",
254        vec![MergeMultiJoinRule::create()],
255        ApplyOrder::TopDown,
256    )
257});
258
259static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
260    OptimizationStage::new(
261        "Join Ordering".to_owned(),
262        vec![LeftDeepTreeJoinOrderingRule::create()],
263        ApplyOrder::TopDown,
264    )
265});
266
267static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
268    OptimizationStage::new(
269        "Join Ordering".to_owned(),
270        vec![BushyTreeJoinOrderingRule::create()],
271        ApplyOrder::TopDown,
272    )
273});
274
275static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
276    OptimizationStage::new(
277        "Push down filter with now into a left semijoin",
278        vec![
279            SplitNowAndRule::create(),
280            SplitNowOrRule::create(),
281            FilterWithNowToJoinRule::create(),
282        ],
283        ApplyOrder::TopDown,
284    )
285});
286
287static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
288    OptimizationStage::new(
289        "Push down the calculation of inputs of join's condition",
290        vec![PushCalculationOfJoinRule::create()],
291        ApplyOrder::TopDown,
292    )
293});
294
295static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
296    OptimizationStage::new(
297        "Convert Distinct Aggregation",
298        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
299        ApplyOrder::TopDown,
300    )
301});
302
303static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
304    OptimizationStage::new(
305        "Convert Distinct Aggregation",
306        vec![
307            UnionToDistinctRule::create(),
308            DistinctAggRule::create(false),
309        ],
310        ApplyOrder::TopDown,
311    )
312});
313
314static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
315    OptimizationStage::new(
316        "Simplify Aggregation",
317        vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
318        ApplyOrder::TopDown,
319    )
320});
321
322static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
323    OptimizationStage::new(
324        "Join Commute".to_owned(),
325        vec![JoinCommuteRule::create()],
326        ApplyOrder::TopDown,
327    )
328});
329
330static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
331    OptimizationStage::new(
332        "Project Remove",
333        vec![
334            // merge should be applied before eliminate
335            ProjectMergeRule::create(),
336            ProjectEliminateRule::create(),
337            TrivialProjectToValuesRule::create(),
338            UnionInputValuesMergeRule::create(),
339            JoinProjectTransposeRule::create(),
340            // project-join merge should be applied after merge
341            // eliminate and to values
342            ProjectJoinMergeRule::create(),
343            AggProjectMergeRule::create(),
344        ],
345        ApplyOrder::BottomUp,
346    )
347});
348
349static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
350    OptimizationStage::new(
351        "Split Over Window",
352        vec![OverWindowSplitRule::create()],
353        ApplyOrder::TopDown,
354    )
355});
356
357// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
358// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
359// 2. should be after merge the multiple projects
360static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
361    OptimizationStage::new(
362        "Convert Over Window",
363        vec![
364            ProjectMergeRule::create(),
365            ProjectEliminateRule::create(),
366            TrivialProjectToValuesRule::create(),
367            UnionInputValuesMergeRule::create(),
368            OverWindowToAggAndJoinRule::create(),
369            OverWindowToTopNRule::create(),
370        ],
371        ApplyOrder::TopDown,
372    )
373});
374
375static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
376    OptimizationStage::new(
377        "Merge Over Window",
378        vec![OverWindowMergeRule::create()],
379        ApplyOrder::TopDown,
380    )
381});
382
383static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
384    OptimizationStage::new(
385        "Rewrite Like Expr",
386        vec![RewriteLikeExprRule::create()],
387        ApplyOrder::TopDown,
388    )
389});
390
391static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
392    OptimizationStage::new(
393        "TopN/SimpleAgg on Index",
394        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
395        ApplyOrder::TopDown,
396    )
397});
398
399static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
400    OptimizationStage::new(
401        "Void always-false filter's downstream",
402        vec![AlwaysFalseFilterRule::create()],
403        ApplyOrder::TopDown,
404    )
405});
406
407static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
408    OptimizationStage::new(
409        "Push Down Limit",
410        vec![LimitPushDownRule::create()],
411        ApplyOrder::TopDown,
412    )
413});
414
415static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
416    OptimizationStage::new(
417        "Pull Up Hop",
418        vec![PullUpHopRule::create()],
419        ApplyOrder::BottomUp,
420    )
421});
422
423static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
424    OptimizationStage::new(
425        "Set Operation To Join",
426        vec![
427            IntersectToSemiJoinRule::create(),
428            ExceptToAntiJoinRule::create(),
429        ],
430        ApplyOrder::BottomUp,
431    )
432});
433
434static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
435    OptimizationStage::new(
436        "Grouping Sets",
437        vec![
438            GroupingSetsToExpandRule::create(),
439            ExpandToProjectRule::create(),
440        ],
441        ApplyOrder::TopDown,
442    )
443});
444
445static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
446    OptimizationStage::new(
447        "Common Sub Expression Extract",
448        vec![CommonSubExprExtractRule::create()],
449        ApplyOrder::TopDown,
450    )
451});
452
453static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
454    OptimizationStage::new(
455        "Logical Filter Expression Simplify",
456        vec![LogicalFilterExpressionSimplifyRule::create()],
457        ApplyOrder::TopDown,
458    )
459});
460
461static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
462    OptimizationStage::new(
463        "Rewrite Source For Batch",
464        vec![
465            SourceToKafkaScanRule::create(),
466            SourceToIcebergScanRule::create(),
467        ],
468        ApplyOrder::TopDown,
469    )
470});
471
472impl LogicalOptimizer {
473    pub fn predicate_pushdown(
474        plan: PlanRef,
475        explain_trace: bool,
476        ctx: &OptimizerContextRef,
477    ) -> PlanRef {
478        let plan = plan.predicate_pushdown(
479            Condition::true_cond(),
480            &mut PredicatePushdownContext::new(plan.clone()),
481        );
482        if explain_trace {
483            ctx.trace("Predicate Push Down:");
484            ctx.trace(plan.explain_to_string());
485        }
486        plan
487    }
488
489    pub fn subquery_unnesting(
490        mut plan: PlanRef,
491        enable_share_plan: bool,
492        explain_trace: bool,
493        ctx: &OptimizerContextRef,
494    ) -> Result<PlanRef> {
495        // Bail our if no apply operators.
496        if !has_logical_apply(plan.clone()) {
497            return Ok(plan);
498        }
499        // Simple Unnesting.
500        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
501        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
502        // Predicate push down before translate apply, because we need to calculate the domain
503        // and predicate push down can reduce the size of domain.
504        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
505        // In order to unnest values with correlated input ref, we need to extract project first.
506        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
507        // General Unnesting.
508        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
509        // join.
510        plan = if enable_share_plan {
511            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
512        } else {
513            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
514        };
515        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
516
517        // Check if all `Apply`s are eliminated and the subquery is unnested.
518        plan.check_apply_elimination()?;
519
520        Ok(plan)
521    }
522
523    pub fn column_pruning(
524        mut plan: PlanRef,
525        explain_trace: bool,
526        ctx: &OptimizerContextRef,
527    ) -> PlanRef {
528        let required_cols = (0..plan.schema().len()).collect_vec();
529        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
530        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
531        // Column pruning may introduce additional projects, and filter can be pushed again.
532        if explain_trace {
533            ctx.trace("Prune Columns:");
534            ctx.trace(plan.explain_to_string());
535        }
536
537        if column_pruning_ctx.need_second_round() {
538            // Second round of column pruning and reuse the column pruning context.
539            // Try to replace original share operator with the new one.
540            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
541            if explain_trace {
542                ctx.trace("Prune Columns (For DAG):");
543                ctx.trace(plan.explain_to_string());
544            }
545        }
546        plan
547    }
548
549    pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
550        // If now() and proctime() are not found, bail out.
551        let mut v = NowProcTimeFinder::default();
552        plan.visit_exprs_recursive(&mut v);
553        if !v.has() {
554            return plan;
555        }
556
557        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
558
559        let plan = plan.rewrite_exprs_recursive(&mut v);
560
561        if ctx.is_explain_trace() {
562            ctx.trace("Inline Now and ProcTime:");
563            ctx.trace(plan.explain_to_string());
564        }
565        plan
566    }
567
568    pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
569        let ctx = plan.ctx();
570        let explain_trace = ctx.is_explain_trace();
571
572        if explain_trace {
573            ctx.trace("Begin:");
574            ctx.trace(plan.explain_to_string());
575        }
576
577        // Convert grouping sets at first because other agg rule can't handle grouping sets.
578        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
579        // Remove project to make common sub-plan sharing easier.
580        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
581
582        // If share plan is disable, we need to remove all the share operator generated by the
583        // binder, e.g. CTE and View. However, we still need to share source to ensure self
584        // source join can return correct result.
585        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
586        if enable_share_plan {
587            // Common sub-plan sharing.
588            plan = plan.common_subplan_sharing();
589            plan = plan.prune_share();
590            if explain_trace {
591                ctx.trace("Common Sub-plan Sharing:");
592                ctx.trace(plan.explain_to_string());
593            }
594        } else {
595            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
596
597            // Replace source to share source.
598            // Perform share source at the beginning so that we can benefit from predicate pushdown
599            // and column pruning for the share operator.
600            plan = ShareSourceRewriter::share_source(plan);
601            if explain_trace {
602                ctx.trace("Share Source:");
603                ctx.trace(plan.explain_to_string());
604            }
605        }
606        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
607        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
608        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
609        // Should be applied before converting table function to project set.
610        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
611        // In order to unnest a table function, we need to convert it into a `project_set` first.
612        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
613
614        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
615        if has_logical_max_one_row(plan.clone()) {
616            // `MaxOneRow` is currently only used for the runtime check of
617            // scalar subqueries, while it's not supported in streaming mode, so
618            // we raise a precise error here.
619            bail!("Scalar subquery might produce more than one row.");
620        }
621
622        // Same to batch plan optimization, this rule shall be applied before
623        // predicate push down
624        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
625
626        // Predicate Push-down
627        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
628
629        if plan.ctx().session_ctx().config().enable_join_ordering() {
630            // Merge inner joins and intermediate filters into multijoin
631            // This rule assumes that filters have already been pushed down near to
632            // their relevant joins.
633            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
634
635            // Reorder multijoin into join tree.
636            if plan
637                .ctx()
638                .session_ctx()
639                .config()
640                .streaming_enable_bushy_join()
641            {
642                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
643            } else {
644                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
645            }
646        }
647
648        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
649        // conditions into a filter above the multijoin.
650        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
651
652        // For stream, push down predicates with now into a left-semi join
653        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
654
655        // Push down the calculation of inputs of join's condition.
656        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
657
658        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
659        // Must push down predicates again after split over window so that OverWindow can be
660        // optimized to TopN.
661        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
662        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
663        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
664
665        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
666        // TODO: better naming of the OptimizationStage
667        // Convert distinct aggregates.
668        plan = if force_split_distinct_agg {
669            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
670        } else {
671            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
672        };
673
674        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
675
676        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
677
678        // Do a final column pruning and predicate pushing down to clean up the plan.
679        plan = Self::column_pruning(plan, explain_trace, &ctx);
680        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
681
682        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
683
684        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
685
686        #[cfg(debug_assertions)]
687        InputRefValidator.validate(plan.clone());
688
689        if ctx.is_explain_logical() {
690            match ctx.explain_format() {
691                ExplainFormat::Text => {
692                    ctx.store_logical(plan.explain_to_string());
693                }
694                ExplainFormat::Json => {
695                    ctx.store_logical(plan.explain_to_json());
696                }
697                ExplainFormat::Xml => {
698                    ctx.store_logical(plan.explain_to_xml());
699                }
700                ExplainFormat::Yaml => {
701                    ctx.store_logical(plan.explain_to_yaml());
702                }
703                ExplainFormat::Dot => {
704                    ctx.store_logical(plan.explain_to_dot());
705                }
706            }
707        }
708
709        Ok(plan)
710    }
711
712    pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
713        let ctx = plan.ctx();
714        let explain_trace = ctx.is_explain_trace();
715
716        if explain_trace {
717            ctx.trace("Begin:");
718            ctx.trace(plan.explain_to_string());
719        }
720
721        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
722        plan = Self::inline_now_proc_time(plan, &ctx);
723
724        // Convert the dag back to the tree, because we don't support DAG plan for batch.
725        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
726
727        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
728        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
729        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
730        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
731        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
732        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
733        // Table function should be converted into `file_scan` before `project_set`.
734        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
735        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
736        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
737        // In order to unnest a table function, we need to convert it into a `project_set` first.
738        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
739
740        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
741
742        // Filter simplification must be applied before predicate push-down
743        // otherwise the filter for some nodes (e.g., `LogicalScan`)
744        // may not be properly applied.
745        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
746
747        // Predicate Push-down
748        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
749        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
750
751        if plan.ctx().session_ctx().config().enable_join_ordering() {
752            // Merge inner joins and intermediate filters into multijoin
753            // This rule assumes that filters have already been pushed down near to
754            // their relevant joins.
755            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
756
757            // Reorder multijoin into left-deep join tree.
758            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
759        }
760
761        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
762        // conditions into a filter above the multijoin.
763        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
764            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
765            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
766        }
767
768        // Push down the calculation of inputs of join's condition.
769        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
770
771        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
772        // Must push down predicates again after split over window so that OverWindow can be
773        // optimized to TopN.
774        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
775            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
776            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
777        }
778        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
779        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
780
781        // Convert distinct aggregates.
782        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
783
784        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
785
786        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
787
788        // Do a final column pruning and predicate pushing down to clean up the plan.
789        plan = Self::column_pruning(plan, explain_trace, &ctx);
790        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
791            (#[allow(unused_assignments)]
792            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
793            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
794        }
795
796        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
797
798        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
799
800        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
801
802        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
803
804        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
805
806        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
807
808        #[cfg(debug_assertions)]
809        InputRefValidator.validate(plan.clone());
810
811        if ctx.is_explain_logical() {
812            match ctx.explain_format() {
813                ExplainFormat::Text => {
814                    ctx.store_logical(plan.explain_to_string());
815                }
816                ExplainFormat::Json => {
817                    ctx.store_logical(plan.explain_to_json());
818                }
819                ExplainFormat::Xml => {
820                    ctx.store_logical(plan.explain_to_xml());
821                }
822                ExplainFormat::Yaml => {
823                    ctx.store_logical(plan.explain_to_yaml());
824                }
825                ExplainFormat::Dot => {
826                    ctx.store_logical(plan.explain_to_dot());
827                }
828            }
829        }
830
831        Ok(plan)
832    }
833}