risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114use crate::optimizer::plan_node::generic::GenericPlanRef;
115
116pub struct LogicalOptimizer {}
117
118static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
119    OptimizationStage::new(
120        "DAG To Tree",
121        vec![DagToTreeRule::create()],
122        ApplyOrder::TopDown,
123    )
124});
125
126static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
127    OptimizationStage::new(
128        "Convert GENERATE_SERIES Ends With NOW",
129        vec![GenerateSeriesWithNowRule::create()],
130        ApplyOrder::TopDown,
131    )
132});
133
134static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
135    OptimizationStage::new(
136        "Table Function Convert",
137        vec![
138            // Apply file scan rule first
139            TableFunctionToFileScanRule::create(),
140            // Apply internal backfill progress rule first
141            TableFunctionToInternalBackfillProgressRule::create(),
142            // Apply internal source backfill progress rule next
143            TableFunctionToInternalSourceBackfillProgressRule::create(),
144            // Apply postgres query rule next
145            TableFunctionToPostgresQueryRule::create(),
146            // Apply mysql query rule next
147            TableFunctionToMySqlQueryRule::create(),
148            // Apply project set rule last
149            TableFunctionToProjectSetRule::create(),
150        ],
151        ApplyOrder::TopDown,
152    )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156    OptimizationStage::new(
157        "Table Function To FileScan",
158        vec![TableFunctionToFileScanRule::create()],
159        ApplyOrder::TopDown,
160    )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164    OptimizationStage::new(
165        "Table Function To PostgresQuery",
166        vec![TableFunctionToPostgresQueryRule::create()],
167        ApplyOrder::TopDown,
168    )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172    OptimizationStage::new(
173        "Table Function To MySQL",
174        vec![TableFunctionToMySqlQueryRule::create()],
175        ApplyOrder::TopDown,
176    )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180    LazyLock::new(|| {
181        OptimizationStage::new(
182            "Table Function To Internal Backfill Progress",
183            vec![TableFunctionToInternalBackfillProgressRule::create()],
184            ApplyOrder::TopDown,
185        )
186    });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189    LazyLock::new(|| {
190        OptimizationStage::new(
191            "Table Function To Internal Source Backfill Progress",
192            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193            ApplyOrder::TopDown,
194        )
195    });
196
197static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
198    OptimizationStage::new(
199        "Values Extract Project",
200        vec![ValuesExtractProjectRule::create()],
201        ApplyOrder::TopDown,
202    )
203});
204
205static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
206    OptimizationStage::new(
207        "Simple Unnesting",
208        vec![
209            // Pull correlated predicates up the algebra tree to unnest simple subquery.
210            PullUpCorrelatedPredicateRule::create(),
211            // Pull correlated project expressions with values to inline scalar subqueries.
212            PullUpCorrelatedProjectValueRule::create(),
213            PullUpCorrelatedPredicateAggRule::create(),
214            // Eliminate max one row
215            MaxOneRowEliminateRule::create(),
216            // Convert apply to join.
217            ApplyToJoinRule::create(),
218        ],
219        ApplyOrder::BottomUp,
220    )
221});
222
223static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
224    OptimizationStage::new(
225        "Set Operation Merge",
226        vec![
227            UnionMergeRule::create(),
228            IntersectMergeRule::create(),
229            ExceptMergeRule::create(),
230        ],
231        ApplyOrder::BottomUp,
232    )
233});
234
235static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
236    LazyLock::new(|| {
237        OptimizationStage::new(
238            "General Unnesting(Translate Apply)",
239            vec![TranslateApplyRule::create(true)],
240            ApplyOrder::TopDown,
241        )
242    });
243
244static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
245    LazyLock::new(|| {
246        OptimizationStage::new(
247            "General Unnesting(Translate Apply)",
248            vec![TranslateApplyRule::create(false)],
249            ApplyOrder::TopDown,
250        )
251    });
252
253static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
254    OptimizationStage::new(
255        "General Unnesting(Push Down Apply)",
256        vec![
257            ApplyEliminateRule::create(),
258            ApplyAggTransposeRule::create(),
259            ApplyDedupTransposeRule::create(),
260            ApplyFilterTransposeRule::create(),
261            ApplyProjectTransposeRule::create(),
262            ApplyProjectSetTransposeRule::create(),
263            ApplyTopNTransposeRule::create(),
264            ApplyLimitTransposeRule::create(),
265            ApplyJoinTransposeRule::create(),
266            ApplyUnionTransposeRule::create(),
267            ApplyOverWindowTransposeRule::create(),
268            ApplyExpandTransposeRule::create(),
269            ApplyHopWindowTransposeRule::create(),
270            CrossJoinEliminateRule::create(),
271            ApplyShareEliminateRule::create(),
272        ],
273        ApplyOrder::TopDown,
274    )
275});
276
277static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
278    OptimizationStage::new(
279        "To MultiJoin",
280        vec![MergeMultiJoinRule::create()],
281        ApplyOrder::TopDown,
282    )
283});
284
285static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
286    OptimizationStage::new(
287        "Join Ordering".to_owned(),
288        vec![LeftDeepTreeJoinOrderingRule::create()],
289        ApplyOrder::TopDown,
290    )
291});
292
293static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
294    OptimizationStage::new(
295        "Join Ordering".to_owned(),
296        vec![BushyTreeJoinOrderingRule::create()],
297        ApplyOrder::TopDown,
298    )
299});
300
301static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
302    OptimizationStage::new(
303        "Push down filter with now into a left semijoin",
304        vec![
305            SplitNowAndRule::create(),
306            SplitNowOrRule::create(),
307            FilterWithNowToJoinRule::create(),
308        ],
309        ApplyOrder::TopDown,
310    )
311});
312
313static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
314    OptimizationStage::new(
315        "Push down the calculation of inputs of join's condition",
316        vec![PushCalculationOfJoinRule::create()],
317        ApplyOrder::TopDown,
318    )
319});
320
321static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
322    OptimizationStage::new(
323        "Convert Distinct Aggregation",
324        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
325        ApplyOrder::TopDown,
326    )
327});
328
329static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
330    OptimizationStage::new(
331        "Convert Distinct Aggregation",
332        vec![
333            UnionToDistinctRule::create(),
334            DistinctAggRule::create(false),
335        ],
336        ApplyOrder::TopDown,
337    )
338});
339
340static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
341    OptimizationStage::new(
342        "Simplify Aggregation",
343        vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
344        ApplyOrder::TopDown,
345    )
346});
347
348static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
349    OptimizationStage::new(
350        "Join Commute".to_owned(),
351        vec![JoinCommuteRule::create()],
352        ApplyOrder::TopDown,
353    )
354});
355
356static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
357    OptimizationStage::new(
358        "Constant Output Operator Remove",
359        vec![EmptyAggRemoveRule::create()],
360        ApplyOrder::TopDown,
361    )
362});
363
364static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
365    OptimizationStage::new(
366        "Project Remove",
367        vec![
368            // merge should be applied before eliminate
369            ProjectMergeRule::create(),
370            ProjectEliminateRule::create(),
371            TrivialProjectToValuesRule::create(),
372            UnionInputValuesMergeRule::create(),
373            JoinProjectTransposeRule::create(),
374            // project-join merge should be applied after merge
375            // eliminate and to values
376            ProjectJoinMergeRule::create(),
377            AggProjectMergeRule::create(),
378        ],
379        ApplyOrder::BottomUp,
380    )
381});
382
383static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
384    OptimizationStage::new(
385        "Split Over Window",
386        vec![OverWindowSplitRule::create()],
387        ApplyOrder::TopDown,
388    )
389});
390
391// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
392// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
393// 2. should be after merge the multiple projects
394static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
395    OptimizationStage::new(
396        "Convert Over Window",
397        vec![
398            ProjectMergeRule::create(),
399            ProjectEliminateRule::create(),
400            TrivialProjectToValuesRule::create(),
401            UnionInputValuesMergeRule::create(),
402            OverWindowToAggAndJoinRule::create(),
403            OverWindowToTopNRule::create(),
404        ],
405        ApplyOrder::TopDown,
406    )
407});
408
409static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
410    OptimizationStage::new(
411        "Merge Over Window",
412        vec![OverWindowMergeRule::create()],
413        ApplyOrder::TopDown,
414    )
415});
416
417static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
418    OptimizationStage::new(
419        "Rewrite Like Expr",
420        vec![RewriteLikeExprRule::create()],
421        ApplyOrder::TopDown,
422    )
423});
424
425static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
426    OptimizationStage::new(
427        "TopN/SimpleAgg on Index",
428        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
429        ApplyOrder::TopDown,
430    )
431});
432
433static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
434    OptimizationStage::new(
435        "Void always-false filter's downstream",
436        vec![AlwaysFalseFilterRule::create()],
437        ApplyOrder::TopDown,
438    )
439});
440
441static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
442    OptimizationStage::new(
443        "Push Down Limit",
444        vec![LimitPushDownRule::create()],
445        ApplyOrder::TopDown,
446    )
447});
448
449static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
450    OptimizationStage::new(
451        "Pull Up Hop",
452        vec![PullUpHopRule::create()],
453        ApplyOrder::BottomUp,
454    )
455});
456
457static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
458    OptimizationStage::new(
459        "Set Operation To Join",
460        vec![
461            IntersectToSemiJoinRule::create(),
462            ExceptToAntiJoinRule::create(),
463        ],
464        ApplyOrder::BottomUp,
465    )
466});
467
468static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
469    OptimizationStage::new(
470        "Grouping Sets",
471        vec![
472            GroupingSetsToExpandRule::create(),
473            ExpandToProjectRule::create(),
474        ],
475        ApplyOrder::TopDown,
476    )
477});
478
479static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
480    OptimizationStage::new(
481        "Common Sub Expression Extract",
482        vec![CommonSubExprExtractRule::create()],
483        ApplyOrder::TopDown,
484    )
485});
486
487static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
488    OptimizationStage::new(
489        "Logical Filter Expression Simplify",
490        vec![LogicalFilterExpressionSimplifyRule::create()],
491        ApplyOrder::TopDown,
492    )
493});
494
495static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
496    OptimizationStage::new(
497        "Rewrite Source For Batch",
498        vec![
499            SourceToKafkaScanRule::create(),
500            SourceToIcebergScanRule::create(),
501        ],
502        ApplyOrder::TopDown,
503    )
504});
505
506impl LogicalOptimizer {
507    pub fn predicate_pushdown(
508        plan: LogicalPlanRef,
509        explain_trace: bool,
510        ctx: &OptimizerContextRef,
511    ) -> LogicalPlanRef {
512        let plan = plan.predicate_pushdown(
513            Condition::true_cond(),
514            &mut PredicatePushdownContext::new(plan.clone()),
515        );
516        if explain_trace {
517            ctx.trace("Predicate Push Down:");
518            ctx.trace(plan.explain_to_string());
519        }
520        plan
521    }
522
523    pub fn subquery_unnesting(
524        mut plan: LogicalPlanRef,
525        enable_share_plan: bool,
526        explain_trace: bool,
527        ctx: &OptimizerContextRef,
528    ) -> Result<LogicalPlanRef> {
529        // Bail our if no apply operators.
530        if !has_logical_apply(plan.clone()) {
531            return Ok(plan);
532        }
533        // Simple Unnesting.
534        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
535        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
536        // Predicate push down before translate apply, because we need to calculate the domain
537        // and predicate push down can reduce the size of domain.
538        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
539        // In order to unnest values with correlated input ref, we need to extract project first.
540        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
541        // General Unnesting.
542        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
543        // join.
544        plan = if enable_share_plan {
545            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
546        } else {
547            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
548        };
549        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
550
551        // Check if all `Apply`s are eliminated and the subquery is unnested.
552        plan.check_apply_elimination()?;
553
554        Ok(plan)
555    }
556
557    pub fn column_pruning(
558        mut plan: LogicalPlanRef,
559        explain_trace: bool,
560        ctx: &OptimizerContextRef,
561    ) -> LogicalPlanRef {
562        let required_cols = (0..plan.schema().len()).collect_vec();
563        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
564        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
565        // Column pruning may introduce additional projects, and filter can be pushed again.
566        if explain_trace {
567            ctx.trace("Prune Columns:");
568            ctx.trace(plan.explain_to_string());
569        }
570
571        if column_pruning_ctx.need_second_round() {
572            // Second round of column pruning and reuse the column pruning context.
573            // Try to replace original share operator with the new one.
574            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
575            if explain_trace {
576                ctx.trace("Prune Columns (For DAG):");
577                ctx.trace(plan.explain_to_string());
578            }
579        }
580        plan
581    }
582
583    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
584        // If now() and proctime() are not found, bail out.
585        let mut v = NowProcTimeFinder::default();
586        plan.visit_exprs_recursive(&mut v);
587        if !v.has() {
588            return plan;
589        }
590
591        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
592
593        let plan = plan.rewrite_exprs_recursive(&mut v);
594
595        if ctx.is_explain_trace() {
596            ctx.trace("Inline Now and ProcTime:");
597            ctx.trace(plan.explain_to_string());
598        }
599        plan
600    }
601
602    pub fn gen_optimized_logical_plan_for_stream(
603        mut plan: LogicalPlanRef,
604    ) -> Result<LogicalPlanRef> {
605        let ctx = plan.ctx();
606        let explain_trace = ctx.is_explain_trace();
607
608        if explain_trace {
609            ctx.trace("Begin:");
610            ctx.trace(plan.explain_to_string());
611        }
612
613        // Convert grouping sets at first because other agg rule can't handle grouping sets.
614        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
615        // Remove nodes with constant output.
616        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
617        // Remove project to make common sub-plan sharing easier.
618        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
619
620        // If share plan is disable, we need to remove all the share operator generated by the
621        // binder, e.g. CTE and View. However, we still need to share source to ensure self
622        // source join can return correct result.
623        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
624        if enable_share_plan {
625            // Common sub-plan sharing.
626            plan = plan.common_subplan_sharing();
627            plan = plan.prune_share();
628            if explain_trace {
629                ctx.trace("Common Sub-plan Sharing:");
630                ctx.trace(plan.explain_to_string());
631            }
632        } else {
633            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
634
635            // Replace source to share source.
636            // Perform share source at the beginning so that we can benefit from predicate pushdown
637            // and column pruning for the share operator.
638            plan = ShareSourceRewriter::share_source(plan);
639            if explain_trace {
640                ctx.trace("Share Source:");
641                ctx.trace(plan.explain_to_string());
642            }
643        }
644        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
645        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
646        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
647        // Should be applied before converting table function to project set.
648        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
649        // In order to unnest a table function, we need to convert it into a `project_set` first.
650        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
651
652        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
653        if has_logical_max_one_row(plan.clone()) {
654            // `MaxOneRow` is currently only used for the runtime check of
655            // scalar subqueries, while it's not supported in streaming mode, so
656            // we raise a precise error here.
657            bail!("Scalar subquery might produce more than one row.");
658        }
659
660        // Same to batch plan optimization, this rule shall be applied before
661        // predicate push down
662        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
663
664        // Predicate Push-down
665        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
666
667        if plan.ctx().session_ctx().config().enable_join_ordering() {
668            // Merge inner joins and intermediate filters into multijoin
669            // This rule assumes that filters have already been pushed down near to
670            // their relevant joins.
671            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
672
673            // Reorder multijoin into join tree.
674            if plan
675                .ctx()
676                .session_ctx()
677                .config()
678                .streaming_enable_bushy_join()
679            {
680                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
681            } else {
682                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
683            }
684        }
685
686        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
687        // conditions into a filter above the multijoin.
688        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
689
690        // For stream, push down predicates with now into a left-semi join
691        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
692
693        // Push down the calculation of inputs of join's condition.
694        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
695
696        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
697        // Must push down predicates again after split over window so that OverWindow can be
698        // optimized to TopN.
699        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
700        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
701        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
702
703        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
704        // TODO: better naming of the OptimizationStage
705        // Convert distinct aggregates.
706        plan = if force_split_distinct_agg {
707            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
708        } else {
709            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
710        };
711
712        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
713
714        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
715
716        // Do a final column pruning and predicate pushing down to clean up the plan.
717        plan = Self::column_pruning(plan, explain_trace, &ctx);
718        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
719
720        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
721        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
722
723        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
724
725        #[cfg(debug_assertions)]
726        InputRefValidator.validate(plan.clone());
727
728        if ctx.is_explain_logical() {
729            match ctx.explain_format() {
730                ExplainFormat::Text => {
731                    ctx.store_logical(plan.explain_to_string());
732                }
733                ExplainFormat::Json => {
734                    ctx.store_logical(plan.explain_to_json());
735                }
736                ExplainFormat::Xml => {
737                    ctx.store_logical(plan.explain_to_xml());
738                }
739                ExplainFormat::Yaml => {
740                    ctx.store_logical(plan.explain_to_yaml());
741                }
742                ExplainFormat::Dot => {
743                    ctx.store_logical(plan.explain_to_dot());
744                }
745            }
746        }
747
748        Ok(plan)
749    }
750
751    pub fn gen_optimized_logical_plan_for_batch(
752        mut plan: LogicalPlanRef,
753    ) -> Result<LogicalPlanRef> {
754        let ctx = plan.ctx();
755        let explain_trace = ctx.is_explain_trace();
756
757        if explain_trace {
758            ctx.trace("Begin:");
759            ctx.trace(plan.explain_to_string());
760        }
761
762        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
763        plan = Self::inline_now_proc_time(plan, &ctx);
764
765        // Convert the dag back to the tree, because we don't support DAG plan for batch.
766        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
767
768        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
769        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
770        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
771        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
772        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
773        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
774        // Table function should be converted into `file_scan` before `project_set`.
775        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
776        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
777        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
778        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
779        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
780        // In order to unnest a table function, we need to convert it into a `project_set` first.
781        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
782
783        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
784
785        // Filter simplification must be applied before predicate push-down
786        // otherwise the filter for some nodes (e.g., `LogicalScan`)
787        // may not be properly applied.
788        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
789
790        // Predicate Push-down
791        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
792        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
793
794        if plan.ctx().session_ctx().config().enable_join_ordering() {
795            // Merge inner joins and intermediate filters into multijoin
796            // This rule assumes that filters have already been pushed down near to
797            // their relevant joins.
798            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
799
800            // Reorder multijoin into left-deep join tree.
801            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
802        }
803
804        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
805        // conditions into a filter above the multijoin.
806        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
807            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
808            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
809        }
810
811        // Push down the calculation of inputs of join's condition.
812        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
813
814        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
815        // Must push down predicates again after split over window so that OverWindow can be
816        // optimized to TopN.
817        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
818            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
819            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
820        }
821        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
822        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
823
824        // Convert distinct aggregates.
825        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
826
827        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
828
829        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
830
831        // Do a final column pruning and predicate pushing down to clean up the plan.
832        plan = Self::column_pruning(plan, explain_trace, &ctx);
833        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
834            (#[allow(unused_assignments)]
835            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
836            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
837        }
838
839        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
840        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
841
842        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
843
844        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
845
846        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
847
848        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
849
850        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
851
852        #[cfg(debug_assertions)]
853        InputRefValidator.validate(plan.clone());
854
855        if ctx.is_explain_logical() {
856            match ctx.explain_format() {
857                ExplainFormat::Text => {
858                    ctx.store_logical(plan.explain_to_string());
859                }
860                ExplainFormat::Json => {
861                    ctx.store_logical(plan.explain_to_json());
862                }
863                ExplainFormat::Xml => {
864                    ctx.store_logical(plan.explain_to_xml());
865                }
866                ExplainFormat::Yaml => {
867                    ctx.store_logical(plan.explain_to_yaml());
868                }
869                ExplainFormat::Dot => {
870                    ctx.store_logical(plan.explain_to_dot());
871                }
872            }
873        }
874
875        Ok(plan)
876    }
877}