risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use crate::optimizer::plan_node::generic::GenericPlanRef;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply internal backfill progress rule first
139            TableFunctionToInternalBackfillProgressRule::create(),
140            // Apply internal source backfill progress rule next
141            TableFunctionToInternalSourceBackfillProgressRule::create(),
142            // Apply internal get channel delta stats rule next
143            TableFunctionToInternalGetChannelDeltaStatsRule::create(),
144            // Apply postgres query rule next
145            TableFunctionToPostgresQueryRule::create(),
146            // Apply mysql query rule next
147            TableFunctionToMySqlQueryRule::create(),
148            // Apply project set rule last
149            TableFunctionToProjectSetRule::create(),
150        ],
151        ApplyOrder::TopDown,
152    )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156    OptimizationStage::new(
157        "Table Function To FileScan",
158        vec![TableFunctionToFileScanRule::create()],
159        ApplyOrder::TopDown,
160    )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164    OptimizationStage::new(
165        "Table Function To PostgresQuery",
166        vec![TableFunctionToPostgresQueryRule::create()],
167        ApplyOrder::TopDown,
168    )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172    OptimizationStage::new(
173        "Table Function To MySQL",
174        vec![TableFunctionToMySqlQueryRule::create()],
175        ApplyOrder::TopDown,
176    )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180    LazyLock::new(|| {
181        OptimizationStage::new(
182            "Table Function To Internal Backfill Progress",
183            vec![TableFunctionToInternalBackfillProgressRule::create()],
184            ApplyOrder::TopDown,
185        )
186    });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189    LazyLock::new(|| {
190        OptimizationStage::new(
191            "Table Function To Internal Source Backfill Progress",
192            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193            ApplyOrder::TopDown,
194        )
195    });
196
197static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
198    LazyLock::new(|| {
199        OptimizationStage::new(
200            "Table Function To Internal Get Channel Delta Stats",
201            vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
202            ApplyOrder::TopDown,
203        )
204    });
205
206static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
207    OptimizationStage::new(
208        "Values Extract Project",
209        vec![ValuesExtractProjectRule::create()],
210        ApplyOrder::TopDown,
211    )
212});
213
214static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
215    OptimizationStage::new(
216        "Simple Unnesting",
217        vec![
218            // Pull correlated predicates up the algebra tree to unnest simple subquery.
219            PullUpCorrelatedPredicateRule::create(),
220            // Pull correlated project expressions with values to inline scalar subqueries.
221            PullUpCorrelatedProjectValueRule::create(),
222            PullUpCorrelatedPredicateAggRule::create(),
223            // Eliminate max one row
224            MaxOneRowEliminateRule::create(),
225            // Convert apply to join.
226            ApplyToJoinRule::create(),
227        ],
228        ApplyOrder::BottomUp,
229    )
230});
231
232static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
233    OptimizationStage::new(
234        "Set Operation Merge",
235        vec![
236            UnionMergeRule::create(),
237            IntersectMergeRule::create(),
238            ExceptMergeRule::create(),
239        ],
240        ApplyOrder::BottomUp,
241    )
242});
243
244static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
245    LazyLock::new(|| {
246        OptimizationStage::new(
247            "General Unnesting(Translate Apply)",
248            vec![TranslateApplyRule::create(true)],
249            ApplyOrder::TopDown,
250        )
251    });
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
254    LazyLock::new(|| {
255        OptimizationStage::new(
256            "General Unnesting(Translate Apply)",
257            vec![TranslateApplyRule::create(false)],
258            ApplyOrder::TopDown,
259        )
260    });
261
262static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
263    OptimizationStage::new(
264        "General Unnesting(Push Down Apply)",
265        vec![
266            ApplyEliminateRule::create(),
267            ApplyAggTransposeRule::create(),
268            ApplyDedupTransposeRule::create(),
269            ApplyFilterTransposeRule::create(),
270            ApplyProjectTransposeRule::create(),
271            ApplyProjectSetTransposeRule::create(),
272            ApplyTopNTransposeRule::create(),
273            ApplyLimitTransposeRule::create(),
274            ApplyJoinTransposeRule::create(),
275            ApplyUnionTransposeRule::create(),
276            ApplyOverWindowTransposeRule::create(),
277            ApplyExpandTransposeRule::create(),
278            ApplyHopWindowTransposeRule::create(),
279            CrossJoinEliminateRule::create(),
280            ApplyShareEliminateRule::create(),
281        ],
282        ApplyOrder::TopDown,
283    )
284});
285
286static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
287    OptimizationStage::new(
288        "To MultiJoin",
289        vec![MergeMultiJoinRule::create()],
290        ApplyOrder::TopDown,
291    )
292});
293
294static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
295    OptimizationStage::new(
296        "Join Ordering".to_owned(),
297        vec![LeftDeepTreeJoinOrderingRule::create()],
298        ApplyOrder::TopDown,
299    )
300});
301
302static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
303    OptimizationStage::new(
304        "Join Ordering".to_owned(),
305        vec![BushyTreeJoinOrderingRule::create()],
306        ApplyOrder::TopDown,
307    )
308});
309
310static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
311    OptimizationStage::new(
312        "Push down filter with now into a left semijoin",
313        vec![
314            SplitNowAndRule::create(),
315            SplitNowOrRule::create(),
316            FilterWithNowToJoinRule::create(),
317        ],
318        ApplyOrder::TopDown,
319    )
320});
321
322static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
323    OptimizationStage::new(
324        "Push down the calculation of inputs of join's condition",
325        vec![PushCalculationOfJoinRule::create()],
326        ApplyOrder::TopDown,
327    )
328});
329
330static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
331    OptimizationStage::new(
332        "Convert Distinct Aggregation",
333        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
334        ApplyOrder::TopDown,
335    )
336});
337
338static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
339    OptimizationStage::new(
340        "Convert Distinct Aggregation",
341        vec![
342            UnionToDistinctRule::create(),
343            DistinctAggRule::create(false),
344        ],
345        ApplyOrder::TopDown,
346    )
347});
348
349static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
350    OptimizationStage::new(
351        "Simplify Aggregation",
352        vec![
353            AggGroupBySimplifyRule::create(),
354            AggCallMergeRule::create(),
355            UnifyFirstLastValueRule::create(),
356        ],
357        ApplyOrder::TopDown,
358    )
359});
360
361static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
362    OptimizationStage::new(
363        "Join Commute".to_owned(),
364        vec![JoinCommuteRule::create()],
365        ApplyOrder::TopDown,
366    )
367});
368
369static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
370    OptimizationStage::new(
371        "Constant Output Operator Remove",
372        vec![EmptyAggRemoveRule::create()],
373        ApplyOrder::TopDown,
374    )
375});
376
377static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
378    OptimizationStage::new(
379        "Project Remove",
380        vec![
381            // merge should be applied before eliminate
382            ProjectMergeRule::create(),
383            ProjectEliminateRule::create(),
384            TrivialProjectToValuesRule::create(),
385            UnionInputValuesMergeRule::create(),
386            JoinProjectTransposeRule::create(),
387            // project-join merge should be applied after merge
388            // eliminate and to values
389            ProjectJoinMergeRule::create(),
390            AggProjectMergeRule::create(),
391        ],
392        ApplyOrder::BottomUp,
393    )
394});
395
396static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
397    OptimizationStage::new(
398        "Split Over Window",
399        vec![OverWindowSplitRule::create()],
400        ApplyOrder::TopDown,
401    )
402});
403
404// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
405// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
406// 2. should be after merge the multiple projects
407static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
408    OptimizationStage::new(
409        "Convert Over Window",
410        vec![
411            ProjectMergeRule::create(),
412            ProjectEliminateRule::create(),
413            TrivialProjectToValuesRule::create(),
414            UnionInputValuesMergeRule::create(),
415            OverWindowToAggAndJoinRule::create(),
416            OverWindowToTopNRule::create(),
417        ],
418        ApplyOrder::TopDown,
419    )
420});
421
422static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
423    OptimizationStage::new(
424        "Merge Over Window",
425        vec![OverWindowMergeRule::create()],
426        ApplyOrder::TopDown,
427    )
428});
429
430static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
431    OptimizationStage::new(
432        "Rewrite Like Expr",
433        vec![RewriteLikeExprRule::create()],
434        ApplyOrder::TopDown,
435    )
436});
437
438static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
439    OptimizationStage::new(
440        "TopN/SimpleAgg on Index",
441        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
442        ApplyOrder::TopDown,
443    )
444});
445
446static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
447    OptimizationStage::new(
448        "Void always-false filter's downstream",
449        vec![AlwaysFalseFilterRule::create()],
450        ApplyOrder::TopDown,
451    )
452});
453
454static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
455    OptimizationStage::new(
456        "Push Down Limit",
457        vec![LimitPushDownRule::create()],
458        ApplyOrder::TopDown,
459    )
460});
461
462static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
463    OptimizationStage::new(
464        "Pull Up Hop",
465        vec![PullUpHopRule::create()],
466        ApplyOrder::BottomUp,
467    )
468});
469
470static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
471    OptimizationStage::new(
472        "Set Operation To Join",
473        vec![
474            IntersectToSemiJoinRule::create(),
475            ExceptToAntiJoinRule::create(),
476        ],
477        ApplyOrder::BottomUp,
478    )
479});
480
481static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
482    OptimizationStage::new(
483        "Grouping Sets",
484        vec![
485            GroupingSetsToExpandRule::create(),
486            ExpandToProjectRule::create(),
487        ],
488        ApplyOrder::TopDown,
489    )
490});
491
492static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
493    OptimizationStage::new(
494        "Common Sub Expression Extract",
495        vec![CommonSubExprExtractRule::create()],
496        ApplyOrder::TopDown,
497    )
498});
499
500static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
501    OptimizationStage::new(
502        "Logical Filter Expression Simplify",
503        vec![LogicalFilterExpressionSimplifyRule::create()],
504        ApplyOrder::TopDown,
505    )
506});
507
508static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
509    OptimizationStage::new(
510        "Rewrite Source For Batch",
511        vec![
512            SourceToKafkaScanRule::create(),
513            SourceToIcebergScanRule::create(),
514        ],
515        ApplyOrder::TopDown,
516    )
517});
518
519static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
520    OptimizationStage::new(
521        "TopN to Vector Search",
522        vec![TopNToVectorSearchRule::create()],
523        ApplyOrder::BottomUp,
524    )
525});
526
527static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
528    LazyLock::new(|| {
529        OptimizationStage::new(
530            "Correlated TopN to Vector Search",
531            vec![CorrelatedTopNToVectorSearchRule::create(true)],
532            ApplyOrder::BottomUp,
533        )
534    });
535
536static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
537    LazyLock::new(|| {
538        OptimizationStage::new(
539            "Correlated TopN to Vector Search",
540            vec![CorrelatedTopNToVectorSearchRule::create(false)],
541            ApplyOrder::BottomUp,
542        )
543    });
544
545impl LogicalOptimizer {
546    pub fn predicate_pushdown(
547        plan: LogicalPlanRef,
548        explain_trace: bool,
549        ctx: &OptimizerContextRef,
550    ) -> LogicalPlanRef {
551        let plan = plan.predicate_pushdown(
552            Condition::true_cond(),
553            &mut PredicatePushdownContext::new(plan.clone()),
554        );
555        if explain_trace {
556            ctx.trace("Predicate Push Down:");
557            ctx.trace(plan.explain_to_string());
558        }
559        plan
560    }
561
562    pub fn subquery_unnesting(
563        mut plan: LogicalPlanRef,
564        enable_share_plan: bool,
565        explain_trace: bool,
566        ctx: &OptimizerContextRef,
567    ) -> Result<LogicalPlanRef> {
568        // Bail our if no apply operators.
569        if !has_logical_apply(plan.clone()) {
570            return Ok(plan);
571        }
572        // Simple Unnesting.
573        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
574        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
575        // Predicate push down before translate apply, because we need to calculate the domain
576        // and predicate push down can reduce the size of domain.
577        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
578        // In order to unnest values with correlated input ref, we need to extract project first.
579        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
580        // General Unnesting.
581        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
582        // join.
583        plan = if enable_share_plan {
584            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
585        } else {
586            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
587        };
588        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
589
590        // Check if all `Apply`s are eliminated and the subquery is unnested.
591        plan.check_apply_elimination()?;
592
593        Ok(plan)
594    }
595
596    pub fn column_pruning(
597        mut plan: LogicalPlanRef,
598        explain_trace: bool,
599        ctx: &OptimizerContextRef,
600    ) -> LogicalPlanRef {
601        let required_cols = (0..plan.schema().len()).collect_vec();
602        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
603        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
604        // Column pruning may introduce additional projects, and filter can be pushed again.
605        if explain_trace {
606            ctx.trace("Prune Columns:");
607            ctx.trace(plan.explain_to_string());
608        }
609
610        if column_pruning_ctx.need_second_round() {
611            // Second round of column pruning and reuse the column pruning context.
612            // Try to replace original share operator with the new one.
613            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
614            if explain_trace {
615                ctx.trace("Prune Columns (For DAG):");
616                ctx.trace(plan.explain_to_string());
617            }
618        }
619        plan
620    }
621
622    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
623        // If now() and proctime() are not found, bail out.
624        let mut v = NowProcTimeFinder::default();
625        plan.visit_exprs_recursive(&mut v);
626        if !v.has() {
627            return plan;
628        }
629
630        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
631
632        let plan = plan.rewrite_exprs_recursive(&mut v);
633
634        if ctx.is_explain_trace() {
635            ctx.trace("Inline Now and ProcTime:");
636            ctx.trace(plan.explain_to_string());
637        }
638        plan
639    }
640
641    pub fn gen_optimized_logical_plan_for_stream(
642        mut plan: LogicalPlanRef,
643    ) -> Result<LogicalPlanRef> {
644        let ctx = plan.ctx();
645        let explain_trace = ctx.is_explain_trace();
646
647        if explain_trace {
648            ctx.trace("Begin:");
649            ctx.trace(plan.explain_to_string());
650        }
651
652        // Convert grouping sets at first because other agg rule can't handle grouping sets.
653        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
654        // Remove nodes with constant output.
655        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
656        // Remove project to make common sub-plan sharing easier.
657        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
658
659        // If share plan is disable, we need to remove all the share operator generated by the
660        // binder, e.g. CTE and View. However, we still need to share source to ensure self
661        // source join can return correct result.
662        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
663        if enable_share_plan {
664            // Common sub-plan sharing.
665            plan = plan.common_subplan_sharing();
666            plan = plan.prune_share();
667            if explain_trace {
668                ctx.trace("Common Sub-plan Sharing:");
669                ctx.trace(plan.explain_to_string());
670            }
671        } else {
672            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
673
674            // Replace source to share source.
675            // Perform share source at the beginning so that we can benefit from predicate pushdown
676            // and column pruning for the share operator.
677            plan = ShareSourceRewriter::share_source(plan);
678            if explain_trace {
679                ctx.trace("Share Source:");
680                ctx.trace(plan.explain_to_string());
681            }
682        }
683        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
684        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
685        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
686        // Should be applied before converting table function to project set.
687        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
688        // In order to unnest a table function, we need to convert it into a `project_set` first.
689        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
690
691        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
692
693        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
694        if has_logical_max_one_row(plan.clone()) {
695            // `MaxOneRow` is currently only used for the runtime check of
696            // scalar subqueries, while it's not supported in streaming mode, so
697            // we raise a precise error here.
698            bail!("Scalar subquery might produce more than one row.");
699        }
700
701        // Same to batch plan optimization, this rule shall be applied before
702        // predicate push down
703        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
704
705        // Predicate Push-down
706        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
707
708        if plan.ctx().session_ctx().config().enable_join_ordering() {
709            // Merge inner joins and intermediate filters into multijoin
710            // This rule assumes that filters have already been pushed down near to
711            // their relevant joins.
712            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
713
714            // Reorder multijoin into join tree.
715            if plan
716                .ctx()
717                .session_ctx()
718                .config()
719                .streaming_enable_bushy_join()
720            {
721                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
722            } else {
723                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
724            }
725        }
726
727        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
728        // conditions into a filter above the multijoin.
729        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
730
731        // For stream, push down predicates with now into a left-semi join
732        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
733
734        // Push down the calculation of inputs of join's condition.
735        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
736
737        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
738        // Must push down predicates again after split over window so that OverWindow can be
739        // optimized to TopN.
740        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
741        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
742        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
743
744        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
745        // TODO: better naming of the OptimizationStage
746        // Convert distinct aggregates.
747        plan = if force_split_distinct_agg {
748            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
749        } else {
750            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
751        };
752
753        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
754
755        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
756
757        // Do a final column pruning and predicate pushing down to clean up the plan.
758        plan = Self::column_pruning(plan, explain_trace, &ctx);
759        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
760
761        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
762        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
763
764        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
765
766        #[cfg(debug_assertions)]
767        InputRefValidator.validate(plan.clone());
768
769        ctx.may_store_explain_logical(&plan);
770
771        Ok(plan)
772    }
773
774    pub fn gen_optimized_logical_plan_for_batch(
775        mut plan: LogicalPlanRef,
776    ) -> Result<LogicalPlanRef> {
777        let ctx = plan.ctx();
778        let explain_trace = ctx.is_explain_trace();
779
780        if explain_trace {
781            ctx.trace("Begin:");
782            ctx.trace(plan.explain_to_string());
783        }
784
785        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
786        plan = Self::inline_now_proc_time(plan, &ctx);
787
788        // Convert the dag back to the tree, because we don't support DAG plan for batch.
789        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
790
791        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
792        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
793        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
794        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
795        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
796        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
797        // Table function should be converted into `file_scan` before `project_set`.
798        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
799        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
800        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
801        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
802        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
803        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
804        // In order to unnest a table function, we need to convert it into a `project_set` first.
805        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
806
807        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
808
809        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
810
811        // Filter simplification must be applied before predicate push-down
812        // otherwise the filter for some nodes (e.g., `LogicalScan`)
813        // may not be properly applied.
814        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
815
816        // Predicate Push-down
817        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
818        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
819
820        if plan.ctx().session_ctx().config().enable_join_ordering() {
821            // Merge inner joins and intermediate filters into multijoin
822            // This rule assumes that filters have already been pushed down near to
823            // their relevant joins.
824            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
825
826            // Reorder multijoin into left-deep join tree.
827            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
828        }
829
830        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
831        // conditions into a filter above the multijoin.
832        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
833            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
834            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
835        }
836
837        // Push down the calculation of inputs of join's condition.
838        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
839
840        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
841        // Must push down predicates again after split over window so that OverWindow can be
842        // optimized to TopN.
843        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
844            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
845            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
846        }
847        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
848        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
849
850        // Convert distinct aggregates.
851        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
852
853        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
854
855        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
856
857        plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
858
859        // Do a final column pruning and predicate pushing down to clean up the plan.
860        plan = Self::column_pruning(plan, explain_trace, &ctx);
861        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
862            (#[allow(unused_assignments)]
863            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
864            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
865        }
866
867        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
868        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
869
870        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
871
872        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
873
874        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
875
876        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
877
878        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
879
880        #[cfg(debug_assertions)]
881        InputRefValidator.validate(plan.clone());
882
883        ctx.may_store_explain_logical(&plan);
884
885        Ok(plan)
886    }
887}