risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42        stage_name: &str,
43    ) -> Result<PlanRef> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage,
69    ) -> Result<PlanRef> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage,
80    ) -> Result<PlanRef> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage {
92    stage_name: String,
93    rules: Vec<BoxedRule>,
94    apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply internal backfill progress rule first
139            TableFunctionToInternalBackfillProgressRule::create(),
140            // Apply internal source backfill progress rule next
141            TableFunctionToInternalSourceBackfillProgressRule::create(),
142            // Apply postgres query rule next
143            TableFunctionToPostgresQueryRule::create(),
144            // Apply mysql query rule next
145            TableFunctionToMySqlQueryRule::create(),
146            // Apply project set rule last
147            TableFunctionToProjectSetRule::create(),
148        ],
149        ApplyOrder::TopDown,
150    )
151});
152
153static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
154    OptimizationStage::new(
155        "Table Function To FileScan",
156        vec![TableFunctionToFileScanRule::create()],
157        ApplyOrder::TopDown,
158    )
159});
160
161static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
162    OptimizationStage::new(
163        "Table Function To PostgresQuery",
164        vec![TableFunctionToPostgresQueryRule::create()],
165        ApplyOrder::TopDown,
166    )
167});
168
169static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
170    OptimizationStage::new(
171        "Table Function To MySQL",
172        vec![TableFunctionToMySqlQueryRule::create()],
173        ApplyOrder::TopDown,
174    )
175});
176
177static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
178    LazyLock::new(|| {
179        OptimizationStage::new(
180            "Table Function To Internal Backfill Progress",
181            vec![TableFunctionToInternalBackfillProgressRule::create()],
182            ApplyOrder::TopDown,
183        )
184    });
185
186static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
187    LazyLock::new(|| {
188        OptimizationStage::new(
189            "Table Function To Internal Source Backfill Progress",
190            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
191            ApplyOrder::TopDown,
192        )
193    });
194
195static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
196    OptimizationStage::new(
197        "Values Extract Project",
198        vec![ValuesExtractProjectRule::create()],
199        ApplyOrder::TopDown,
200    )
201});
202
203static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
204    OptimizationStage::new(
205        "Simple Unnesting",
206        vec![
207            // Eliminate max one row
208            MaxOneRowEliminateRule::create(),
209            // Convert apply to join.
210            ApplyToJoinRule::create(),
211            // Pull correlated predicates up the algebra tree to unnest simple subquery.
212            PullUpCorrelatedPredicateRule::create(),
213            PullUpCorrelatedPredicateAggRule::create(),
214        ],
215        ApplyOrder::BottomUp,
216    )
217});
218
219static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
220    OptimizationStage::new(
221        "Set Operation Merge",
222        vec![
223            UnionMergeRule::create(),
224            IntersectMergeRule::create(),
225            ExceptMergeRule::create(),
226        ],
227        ApplyOrder::BottomUp,
228    )
229});
230
231static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
232    LazyLock::new(|| {
233        OptimizationStage::new(
234            "General Unnesting(Translate Apply)",
235            vec![TranslateApplyRule::create(true)],
236            ApplyOrder::TopDown,
237        )
238    });
239
240static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
241    LazyLock::new(|| {
242        OptimizationStage::new(
243            "General Unnesting(Translate Apply)",
244            vec![TranslateApplyRule::create(false)],
245            ApplyOrder::TopDown,
246        )
247    });
248
249static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
250    OptimizationStage::new(
251        "General Unnesting(Push Down Apply)",
252        vec![
253            ApplyEliminateRule::create(),
254            ApplyAggTransposeRule::create(),
255            ApplyDedupTransposeRule::create(),
256            ApplyFilterTransposeRule::create(),
257            ApplyProjectTransposeRule::create(),
258            ApplyProjectSetTransposeRule::create(),
259            ApplyTopNTransposeRule::create(),
260            ApplyLimitTransposeRule::create(),
261            ApplyJoinTransposeRule::create(),
262            ApplyUnionTransposeRule::create(),
263            ApplyOverWindowTransposeRule::create(),
264            ApplyExpandTransposeRule::create(),
265            ApplyHopWindowTransposeRule::create(),
266            CrossJoinEliminateRule::create(),
267            ApplyShareEliminateRule::create(),
268        ],
269        ApplyOrder::TopDown,
270    )
271});
272
273static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
274    OptimizationStage::new(
275        "To MultiJoin",
276        vec![MergeMultiJoinRule::create()],
277        ApplyOrder::TopDown,
278    )
279});
280
281static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
282    OptimizationStage::new(
283        "Join Ordering".to_owned(),
284        vec![LeftDeepTreeJoinOrderingRule::create()],
285        ApplyOrder::TopDown,
286    )
287});
288
289static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
290    OptimizationStage::new(
291        "Join Ordering".to_owned(),
292        vec![BushyTreeJoinOrderingRule::create()],
293        ApplyOrder::TopDown,
294    )
295});
296
297static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
298    OptimizationStage::new(
299        "Push down filter with now into a left semijoin",
300        vec![
301            SplitNowAndRule::create(),
302            SplitNowOrRule::create(),
303            FilterWithNowToJoinRule::create(),
304        ],
305        ApplyOrder::TopDown,
306    )
307});
308
309static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
310    OptimizationStage::new(
311        "Push down the calculation of inputs of join's condition",
312        vec![PushCalculationOfJoinRule::create()],
313        ApplyOrder::TopDown,
314    )
315});
316
317static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
318    OptimizationStage::new(
319        "Convert Distinct Aggregation",
320        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
321        ApplyOrder::TopDown,
322    )
323});
324
325static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
326    OptimizationStage::new(
327        "Convert Distinct Aggregation",
328        vec![
329            UnionToDistinctRule::create(),
330            DistinctAggRule::create(false),
331        ],
332        ApplyOrder::TopDown,
333    )
334});
335
336static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
337    OptimizationStage::new(
338        "Simplify Aggregation",
339        vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
340        ApplyOrder::TopDown,
341    )
342});
343
344static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
345    OptimizationStage::new(
346        "Join Commute".to_owned(),
347        vec![JoinCommuteRule::create()],
348        ApplyOrder::TopDown,
349    )
350});
351
352static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
353    OptimizationStage::new(
354        "Constant Output Operator Remove",
355        vec![EmptyAggRemoveRule::create()],
356        ApplyOrder::TopDown,
357    )
358});
359
360static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
361    OptimizationStage::new(
362        "Project Remove",
363        vec![
364            // merge should be applied before eliminate
365            ProjectMergeRule::create(),
366            ProjectEliminateRule::create(),
367            TrivialProjectToValuesRule::create(),
368            UnionInputValuesMergeRule::create(),
369            JoinProjectTransposeRule::create(),
370            // project-join merge should be applied after merge
371            // eliminate and to values
372            ProjectJoinMergeRule::create(),
373            AggProjectMergeRule::create(),
374        ],
375        ApplyOrder::BottomUp,
376    )
377});
378
379static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
380    OptimizationStage::new(
381        "Split Over Window",
382        vec![OverWindowSplitRule::create()],
383        ApplyOrder::TopDown,
384    )
385});
386
387// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
388// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
389// 2. should be after merge the multiple projects
390static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
391    OptimizationStage::new(
392        "Convert Over Window",
393        vec![
394            ProjectMergeRule::create(),
395            ProjectEliminateRule::create(),
396            TrivialProjectToValuesRule::create(),
397            UnionInputValuesMergeRule::create(),
398            OverWindowToAggAndJoinRule::create(),
399            OverWindowToTopNRule::create(),
400        ],
401        ApplyOrder::TopDown,
402    )
403});
404
405static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
406    OptimizationStage::new(
407        "Merge Over Window",
408        vec![OverWindowMergeRule::create()],
409        ApplyOrder::TopDown,
410    )
411});
412
413static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
414    OptimizationStage::new(
415        "Rewrite Like Expr",
416        vec![RewriteLikeExprRule::create()],
417        ApplyOrder::TopDown,
418    )
419});
420
421static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
422    OptimizationStage::new(
423        "TopN/SimpleAgg on Index",
424        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
425        ApplyOrder::TopDown,
426    )
427});
428
429static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
430    OptimizationStage::new(
431        "Void always-false filter's downstream",
432        vec![AlwaysFalseFilterRule::create()],
433        ApplyOrder::TopDown,
434    )
435});
436
437static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
438    OptimizationStage::new(
439        "Push Down Limit",
440        vec![LimitPushDownRule::create()],
441        ApplyOrder::TopDown,
442    )
443});
444
445static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
446    OptimizationStage::new(
447        "Pull Up Hop",
448        vec![PullUpHopRule::create()],
449        ApplyOrder::BottomUp,
450    )
451});
452
453static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
454    OptimizationStage::new(
455        "Set Operation To Join",
456        vec![
457            IntersectToSemiJoinRule::create(),
458            ExceptToAntiJoinRule::create(),
459        ],
460        ApplyOrder::BottomUp,
461    )
462});
463
464static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
465    OptimizationStage::new(
466        "Grouping Sets",
467        vec![
468            GroupingSetsToExpandRule::create(),
469            ExpandToProjectRule::create(),
470        ],
471        ApplyOrder::TopDown,
472    )
473});
474
475static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
476    OptimizationStage::new(
477        "Common Sub Expression Extract",
478        vec![CommonSubExprExtractRule::create()],
479        ApplyOrder::TopDown,
480    )
481});
482
483static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
484    OptimizationStage::new(
485        "Logical Filter Expression Simplify",
486        vec![LogicalFilterExpressionSimplifyRule::create()],
487        ApplyOrder::TopDown,
488    )
489});
490
491static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
492    OptimizationStage::new(
493        "Rewrite Source For Batch",
494        vec![
495            SourceToKafkaScanRule::create(),
496            SourceToIcebergScanRule::create(),
497        ],
498        ApplyOrder::TopDown,
499    )
500});
501
502impl LogicalOptimizer {
503    pub fn predicate_pushdown(
504        plan: PlanRef,
505        explain_trace: bool,
506        ctx: &OptimizerContextRef,
507    ) -> PlanRef {
508        let plan = plan.predicate_pushdown(
509            Condition::true_cond(),
510            &mut PredicatePushdownContext::new(plan.clone()),
511        );
512        if explain_trace {
513            ctx.trace("Predicate Push Down:");
514            ctx.trace(plan.explain_to_string());
515        }
516        plan
517    }
518
519    pub fn subquery_unnesting(
520        mut plan: PlanRef,
521        enable_share_plan: bool,
522        explain_trace: bool,
523        ctx: &OptimizerContextRef,
524    ) -> Result<PlanRef> {
525        // Bail our if no apply operators.
526        if !has_logical_apply(plan.clone()) {
527            return Ok(plan);
528        }
529        // Simple Unnesting.
530        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
531        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
532        // Predicate push down before translate apply, because we need to calculate the domain
533        // and predicate push down can reduce the size of domain.
534        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
535        // In order to unnest values with correlated input ref, we need to extract project first.
536        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
537        // General Unnesting.
538        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
539        // join.
540        plan = if enable_share_plan {
541            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
542        } else {
543            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
544        };
545        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
546
547        // Check if all `Apply`s are eliminated and the subquery is unnested.
548        plan.check_apply_elimination()?;
549
550        Ok(plan)
551    }
552
553    pub fn column_pruning(
554        mut plan: PlanRef,
555        explain_trace: bool,
556        ctx: &OptimizerContextRef,
557    ) -> PlanRef {
558        let required_cols = (0..plan.schema().len()).collect_vec();
559        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
560        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
561        // Column pruning may introduce additional projects, and filter can be pushed again.
562        if explain_trace {
563            ctx.trace("Prune Columns:");
564            ctx.trace(plan.explain_to_string());
565        }
566
567        if column_pruning_ctx.need_second_round() {
568            // Second round of column pruning and reuse the column pruning context.
569            // Try to replace original share operator with the new one.
570            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
571            if explain_trace {
572                ctx.trace("Prune Columns (For DAG):");
573                ctx.trace(plan.explain_to_string());
574            }
575        }
576        plan
577    }
578
579    pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
580        // If now() and proctime() are not found, bail out.
581        let mut v = NowProcTimeFinder::default();
582        plan.visit_exprs_recursive(&mut v);
583        if !v.has() {
584            return plan;
585        }
586
587        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
588
589        let plan = plan.rewrite_exprs_recursive(&mut v);
590
591        if ctx.is_explain_trace() {
592            ctx.trace("Inline Now and ProcTime:");
593            ctx.trace(plan.explain_to_string());
594        }
595        plan
596    }
597
598    pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
599        let ctx = plan.ctx();
600        let explain_trace = ctx.is_explain_trace();
601
602        if explain_trace {
603            ctx.trace("Begin:");
604            ctx.trace(plan.explain_to_string());
605        }
606
607        // Convert grouping sets at first because other agg rule can't handle grouping sets.
608        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
609        // Remove nodes with constant output.
610        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
611        // Remove project to make common sub-plan sharing easier.
612        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
613
614        // If share plan is disable, we need to remove all the share operator generated by the
615        // binder, e.g. CTE and View. However, we still need to share source to ensure self
616        // source join can return correct result.
617        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
618        if enable_share_plan {
619            // Common sub-plan sharing.
620            plan = plan.common_subplan_sharing();
621            plan = plan.prune_share();
622            if explain_trace {
623                ctx.trace("Common Sub-plan Sharing:");
624                ctx.trace(plan.explain_to_string());
625            }
626        } else {
627            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
628
629            // Replace source to share source.
630            // Perform share source at the beginning so that we can benefit from predicate pushdown
631            // and column pruning for the share operator.
632            plan = ShareSourceRewriter::share_source(plan);
633            if explain_trace {
634                ctx.trace("Share Source:");
635                ctx.trace(plan.explain_to_string());
636            }
637        }
638        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
639        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
640        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
641        // Should be applied before converting table function to project set.
642        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
643        // In order to unnest a table function, we need to convert it into a `project_set` first.
644        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
645
646        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
647        if has_logical_max_one_row(plan.clone()) {
648            // `MaxOneRow` is currently only used for the runtime check of
649            // scalar subqueries, while it's not supported in streaming mode, so
650            // we raise a precise error here.
651            bail!("Scalar subquery might produce more than one row.");
652        }
653
654        // Same to batch plan optimization, this rule shall be applied before
655        // predicate push down
656        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
657
658        // Predicate Push-down
659        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
660
661        if plan.ctx().session_ctx().config().enable_join_ordering() {
662            // Merge inner joins and intermediate filters into multijoin
663            // This rule assumes that filters have already been pushed down near to
664            // their relevant joins.
665            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
666
667            // Reorder multijoin into join tree.
668            if plan
669                .ctx()
670                .session_ctx()
671                .config()
672                .streaming_enable_bushy_join()
673            {
674                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
675            } else {
676                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
677            }
678        }
679
680        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
681        // conditions into a filter above the multijoin.
682        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
683
684        // For stream, push down predicates with now into a left-semi join
685        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
686
687        // Push down the calculation of inputs of join's condition.
688        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
689
690        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
691        // Must push down predicates again after split over window so that OverWindow can be
692        // optimized to TopN.
693        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
694        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
695        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
696
697        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
698        // TODO: better naming of the OptimizationStage
699        // Convert distinct aggregates.
700        plan = if force_split_distinct_agg {
701            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
702        } else {
703            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
704        };
705
706        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
707
708        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
709
710        // Do a final column pruning and predicate pushing down to clean up the plan.
711        plan = Self::column_pruning(plan, explain_trace, &ctx);
712        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
713
714        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
715        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
716
717        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
718
719        #[cfg(debug_assertions)]
720        InputRefValidator.validate(plan.clone());
721
722        if ctx.is_explain_logical() {
723            match ctx.explain_format() {
724                ExplainFormat::Text => {
725                    ctx.store_logical(plan.explain_to_string());
726                }
727                ExplainFormat::Json => {
728                    ctx.store_logical(plan.explain_to_json());
729                }
730                ExplainFormat::Xml => {
731                    ctx.store_logical(plan.explain_to_xml());
732                }
733                ExplainFormat::Yaml => {
734                    ctx.store_logical(plan.explain_to_yaml());
735                }
736                ExplainFormat::Dot => {
737                    ctx.store_logical(plan.explain_to_dot());
738                }
739            }
740        }
741
742        Ok(plan)
743    }
744
745    pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
746        let ctx = plan.ctx();
747        let explain_trace = ctx.is_explain_trace();
748
749        if explain_trace {
750            ctx.trace("Begin:");
751            ctx.trace(plan.explain_to_string());
752        }
753
754        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
755        plan = Self::inline_now_proc_time(plan, &ctx);
756
757        // Convert the dag back to the tree, because we don't support DAG plan for batch.
758        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
759
760        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
761        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
762        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
763        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
764        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
765        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
766        // Table function should be converted into `file_scan` before `project_set`.
767        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
768        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
769        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
770        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
771        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
772        // In order to unnest a table function, we need to convert it into a `project_set` first.
773        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
774
775        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
776
777        // Filter simplification must be applied before predicate push-down
778        // otherwise the filter for some nodes (e.g., `LogicalScan`)
779        // may not be properly applied.
780        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
781
782        // Predicate Push-down
783        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
784        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
785
786        if plan.ctx().session_ctx().config().enable_join_ordering() {
787            // Merge inner joins and intermediate filters into multijoin
788            // This rule assumes that filters have already been pushed down near to
789            // their relevant joins.
790            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
791
792            // Reorder multijoin into left-deep join tree.
793            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
794        }
795
796        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
797        // conditions into a filter above the multijoin.
798        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
799            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
800            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
801        }
802
803        // Push down the calculation of inputs of join's condition.
804        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
805
806        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
807        // Must push down predicates again after split over window so that OverWindow can be
808        // optimized to TopN.
809        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
810            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
811            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
812        }
813        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
814        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
815
816        // Convert distinct aggregates.
817        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
818
819        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
820
821        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
822
823        // Do a final column pruning and predicate pushing down to clean up the plan.
824        plan = Self::column_pruning(plan, explain_trace, &ctx);
825        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
826            (#[allow(unused_assignments)]
827            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
828            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
829        }
830
831        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
832        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
833
834        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
835
836        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
837
838        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
839
840        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
841
842        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
843
844        #[cfg(debug_assertions)]
845        InputRefValidator.validate(plan.clone());
846
847        if ctx.is_explain_logical() {
848            match ctx.explain_format() {
849                ExplainFormat::Text => {
850                    ctx.store_logical(plan.explain_to_string());
851                }
852                ExplainFormat::Json => {
853                    ctx.store_logical(plan.explain_to_json());
854                }
855                ExplainFormat::Xml => {
856                    ctx.store_logical(plan.explain_to_xml());
857                }
858                ExplainFormat::Yaml => {
859                    ctx.store_logical(plan.explain_to_yaml());
860                }
861                ExplainFormat::Dot => {
862                    ctx.store_logical(plan.explain_to_dot());
863                }
864            }
865        }
866
867        Ok(plan)
868    }
869}