risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use crate::optimizer::plan_node::generic::GenericPlanRef;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply internal backfill progress rule first
139            TableFunctionToInternalBackfillProgressRule::create(),
140            // Apply internal source backfill progress rule next
141            TableFunctionToInternalSourceBackfillProgressRule::create(),
142            // Apply internal get channel delta stats rule next
143            TableFunctionToInternalGetChannelDeltaStatsRule::create(),
144            // Apply postgres query rule next
145            TableFunctionToPostgresQueryRule::create(),
146            // Apply mysql query rule next
147            TableFunctionToMySqlQueryRule::create(),
148            // Apply project set rule last
149            TableFunctionToProjectSetRule::create(),
150        ],
151        ApplyOrder::TopDown,
152    )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156    OptimizationStage::new(
157        "Table Function To FileScan",
158        vec![TableFunctionToFileScanRule::create()],
159        ApplyOrder::TopDown,
160    )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164    OptimizationStage::new(
165        "Table Function To PostgresQuery",
166        vec![TableFunctionToPostgresQueryRule::create()],
167        ApplyOrder::TopDown,
168    )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172    OptimizationStage::new(
173        "Table Function To MySQL",
174        vec![TableFunctionToMySqlQueryRule::create()],
175        ApplyOrder::TopDown,
176    )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180    LazyLock::new(|| {
181        OptimizationStage::new(
182            "Table Function To Internal Backfill Progress",
183            vec![TableFunctionToInternalBackfillProgressRule::create()],
184            ApplyOrder::TopDown,
185        )
186    });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189    LazyLock::new(|| {
190        OptimizationStage::new(
191            "Table Function To Internal Source Backfill Progress",
192            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193            ApplyOrder::TopDown,
194        )
195    });
196
197static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
198    LazyLock::new(|| {
199        OptimizationStage::new(
200            "Table Function To Internal Get Channel Delta Stats",
201            vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
202            ApplyOrder::TopDown,
203        )
204    });
205
206static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
207    OptimizationStage::new(
208        "Values Extract Project",
209        vec![ValuesExtractProjectRule::create()],
210        ApplyOrder::TopDown,
211    )
212});
213
214static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
215    OptimizationStage::new(
216        "Simple Unnesting",
217        vec![
218            // Pull correlated predicates up the algebra tree to unnest simple subquery.
219            PullUpCorrelatedPredicateRule::create(),
220            // Pull correlated project expressions with values to inline scalar subqueries.
221            PullUpCorrelatedProjectValueRule::create(),
222            PullUpCorrelatedPredicateAggRule::create(),
223            // Eliminate max one row
224            MaxOneRowEliminateRule::create(),
225            // Convert apply to join.
226            ApplyToJoinRule::create(),
227        ],
228        ApplyOrder::BottomUp,
229    )
230});
231
232static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
233    OptimizationStage::new(
234        "Set Operation Merge",
235        vec![
236            UnionMergeRule::create(),
237            IntersectMergeRule::create(),
238            ExceptMergeRule::create(),
239        ],
240        ApplyOrder::BottomUp,
241    )
242});
243
244static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
245    LazyLock::new(|| {
246        OptimizationStage::new(
247            "General Unnesting(Translate Apply)",
248            vec![TranslateApplyRule::create(true)],
249            ApplyOrder::TopDown,
250        )
251    });
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
254    LazyLock::new(|| {
255        OptimizationStage::new(
256            "General Unnesting(Translate Apply)",
257            vec![TranslateApplyRule::create(false)],
258            ApplyOrder::TopDown,
259        )
260    });
261
262static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
263    OptimizationStage::new(
264        "General Unnesting(Push Down Apply)",
265        vec![
266            ApplyEliminateRule::create(),
267            ApplyAggTransposeRule::create(),
268            ApplyDedupTransposeRule::create(),
269            ApplyFilterTransposeRule::create(),
270            ApplyProjectTransposeRule::create(),
271            ApplyProjectSetTransposeRule::create(),
272            ApplyTopNTransposeRule::create(),
273            ApplyLimitTransposeRule::create(),
274            ApplyJoinTransposeRule::create(),
275            ApplyUnionTransposeRule::create(),
276            ApplyOverWindowTransposeRule::create(),
277            ApplyExpandTransposeRule::create(),
278            ApplyHopWindowTransposeRule::create(),
279            CrossJoinEliminateRule::create(),
280            ApplyShareEliminateRule::create(),
281        ],
282        ApplyOrder::TopDown,
283    )
284});
285
286static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
287    OptimizationStage::new(
288        "To MultiJoin",
289        vec![MergeMultiJoinRule::create()],
290        ApplyOrder::TopDown,
291    )
292});
293
294static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
295    OptimizationStage::new(
296        "Join Ordering".to_owned(),
297        vec![LeftDeepTreeJoinOrderingRule::create()],
298        ApplyOrder::TopDown,
299    )
300});
301
302static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
303    OptimizationStage::new(
304        "Join Ordering".to_owned(),
305        vec![BushyTreeJoinOrderingRule::create()],
306        ApplyOrder::TopDown,
307    )
308});
309
310static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
311    OptimizationStage::new(
312        "Push down filter with now into a left semijoin",
313        vec![
314            SplitNowAndRule::create(),
315            SplitNowOrRule::create(),
316            FilterWithNowToJoinRule::create(),
317        ],
318        ApplyOrder::TopDown,
319    )
320});
321
322static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
323    OptimizationStage::new(
324        "Push down the calculation of inputs of join's condition",
325        vec![PushCalculationOfJoinRule::create()],
326        ApplyOrder::TopDown,
327    )
328});
329
330static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
331    OptimizationStage::new(
332        "Convert Distinct Aggregation",
333        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
334        ApplyOrder::TopDown,
335    )
336});
337
338static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
339    OptimizationStage::new(
340        "Convert Distinct Aggregation",
341        vec![
342            UnionToDistinctRule::create(),
343            DistinctAggRule::create(false),
344        ],
345        ApplyOrder::TopDown,
346    )
347});
348
349static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
350    OptimizationStage::new(
351        "Simplify Aggregation",
352        vec![
353            AggGroupBySimplifyRule::create(),
354            AggCallMergeRule::create(),
355            UnifyFirstLastValueRule::create(),
356        ],
357        ApplyOrder::TopDown,
358    )
359});
360
361static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
362    OptimizationStage::new(
363        "Join Commute".to_owned(),
364        vec![JoinCommuteRule::create()],
365        ApplyOrder::TopDown,
366    )
367});
368
369static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
370    OptimizationStage::new(
371        "Constant Output Operator Remove",
372        vec![EmptyAggRemoveRule::create()],
373        ApplyOrder::TopDown,
374    )
375});
376
377static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
378    OptimizationStage::new(
379        "Project Remove",
380        vec![
381            // merge should be applied before eliminate
382            ProjectMergeRule::create(),
383            ProjectEliminateRule::create(),
384            TrivialProjectToValuesRule::create(),
385            UnionInputValuesMergeRule::create(),
386            JoinProjectTransposeRule::create(),
387            // project-join merge should be applied after merge
388            // eliminate and to values
389            ProjectJoinMergeRule::create(),
390            AggProjectMergeRule::create(),
391        ],
392        ApplyOrder::BottomUp,
393    )
394});
395
396static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
397    OptimizationStage::new(
398        "Split Over Window",
399        vec![OverWindowSplitRule::create()],
400        ApplyOrder::TopDown,
401    )
402});
403
404// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
405// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
406// 2. should be after merge the multiple projects
407static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
408    OptimizationStage::new(
409        "Convert Over Window",
410        vec![
411            ProjectMergeRule::create(),
412            ProjectEliminateRule::create(),
413            TrivialProjectToValuesRule::create(),
414            UnionInputValuesMergeRule::create(),
415            OverWindowToAggAndJoinRule::create(),
416            OverWindowToTopNRule::create(),
417        ],
418        ApplyOrder::TopDown,
419    )
420});
421
422static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
423    OptimizationStage::new(
424        "Merge Over Window",
425        vec![OverWindowMergeRule::create()],
426        ApplyOrder::TopDown,
427    )
428});
429
430static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
431    OptimizationStage::new(
432        "Rewrite Like Expr",
433        vec![RewriteLikeExprRule::create()],
434        ApplyOrder::TopDown,
435    )
436});
437
438static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
439    OptimizationStage::new(
440        "TopN/SimpleAgg on Index",
441        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
442        ApplyOrder::TopDown,
443    )
444});
445
446static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
447    OptimizationStage::new(
448        "Void always-false filter's downstream",
449        vec![AlwaysFalseFilterRule::create()],
450        ApplyOrder::TopDown,
451    )
452});
453
454static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
455    OptimizationStage::new(
456        "Push Down Limit",
457        vec![LimitPushDownRule::create()],
458        ApplyOrder::TopDown,
459    )
460});
461
462static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
463    OptimizationStage::new(
464        "Pull Up Hop",
465        vec![PullUpHopRule::create()],
466        ApplyOrder::BottomUp,
467    )
468});
469
470static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
471    OptimizationStage::new(
472        "Set Operation To Join",
473        vec![
474            IntersectToSemiJoinRule::create(),
475            ExceptToAntiJoinRule::create(),
476        ],
477        ApplyOrder::BottomUp,
478    )
479});
480
481static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
482    OptimizationStage::new(
483        "Grouping Sets",
484        vec![
485            GroupingSetsToExpandRule::create(),
486            ExpandToProjectRule::create(),
487        ],
488        ApplyOrder::TopDown,
489    )
490});
491
492static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
493    OptimizationStage::new(
494        "Common Sub Expression Extract",
495        vec![CommonSubExprExtractRule::create()],
496        ApplyOrder::TopDown,
497    )
498});
499
500static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
501    OptimizationStage::new(
502        "Logical Filter Expression Simplify",
503        vec![LogicalFilterExpressionSimplifyRule::create()],
504        ApplyOrder::TopDown,
505    )
506});
507
508static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
509    OptimizationStage::new(
510        "Rewrite Source For Batch",
511        vec![
512            SourceToKafkaScanRule::create(),
513            SourceToIcebergScanRule::create(),
514        ],
515        ApplyOrder::TopDown,
516    )
517});
518
519static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
520    OptimizationStage::new(
521        "TopN to Vector Search",
522        vec![TopNToVectorSearchRule::create()],
523        ApplyOrder::BottomUp,
524    )
525});
526
527impl LogicalOptimizer {
528    pub fn predicate_pushdown(
529        plan: LogicalPlanRef,
530        explain_trace: bool,
531        ctx: &OptimizerContextRef,
532    ) -> LogicalPlanRef {
533        let plan = plan.predicate_pushdown(
534            Condition::true_cond(),
535            &mut PredicatePushdownContext::new(plan.clone()),
536        );
537        if explain_trace {
538            ctx.trace("Predicate Push Down:");
539            ctx.trace(plan.explain_to_string());
540        }
541        plan
542    }
543
544    pub fn subquery_unnesting(
545        mut plan: LogicalPlanRef,
546        enable_share_plan: bool,
547        explain_trace: bool,
548        ctx: &OptimizerContextRef,
549    ) -> Result<LogicalPlanRef> {
550        // Bail our if no apply operators.
551        if !has_logical_apply(plan.clone()) {
552            return Ok(plan);
553        }
554        // Simple Unnesting.
555        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
556        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
557        // Predicate push down before translate apply, because we need to calculate the domain
558        // and predicate push down can reduce the size of domain.
559        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
560        // In order to unnest values with correlated input ref, we need to extract project first.
561        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
562        // General Unnesting.
563        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
564        // join.
565        plan = if enable_share_plan {
566            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
567        } else {
568            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
569        };
570        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
571
572        // Check if all `Apply`s are eliminated and the subquery is unnested.
573        plan.check_apply_elimination()?;
574
575        Ok(plan)
576    }
577
578    pub fn column_pruning(
579        mut plan: LogicalPlanRef,
580        explain_trace: bool,
581        ctx: &OptimizerContextRef,
582    ) -> LogicalPlanRef {
583        let required_cols = (0..plan.schema().len()).collect_vec();
584        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
585        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
586        // Column pruning may introduce additional projects, and filter can be pushed again.
587        if explain_trace {
588            ctx.trace("Prune Columns:");
589            ctx.trace(plan.explain_to_string());
590        }
591
592        if column_pruning_ctx.need_second_round() {
593            // Second round of column pruning and reuse the column pruning context.
594            // Try to replace original share operator with the new one.
595            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
596            if explain_trace {
597                ctx.trace("Prune Columns (For DAG):");
598                ctx.trace(plan.explain_to_string());
599            }
600        }
601        plan
602    }
603
604    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
605        // If now() and proctime() are not found, bail out.
606        let mut v = NowProcTimeFinder::default();
607        plan.visit_exprs_recursive(&mut v);
608        if !v.has() {
609            return plan;
610        }
611
612        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
613
614        let plan = plan.rewrite_exprs_recursive(&mut v);
615
616        if ctx.is_explain_trace() {
617            ctx.trace("Inline Now and ProcTime:");
618            ctx.trace(plan.explain_to_string());
619        }
620        plan
621    }
622
623    pub fn gen_optimized_logical_plan_for_stream(
624        mut plan: LogicalPlanRef,
625    ) -> Result<LogicalPlanRef> {
626        let ctx = plan.ctx();
627        let explain_trace = ctx.is_explain_trace();
628
629        if explain_trace {
630            ctx.trace("Begin:");
631            ctx.trace(plan.explain_to_string());
632        }
633
634        // Convert grouping sets at first because other agg rule can't handle grouping sets.
635        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
636        // Remove nodes with constant output.
637        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
638        // Remove project to make common sub-plan sharing easier.
639        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
640
641        // If share plan is disable, we need to remove all the share operator generated by the
642        // binder, e.g. CTE and View. However, we still need to share source to ensure self
643        // source join can return correct result.
644        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
645        if enable_share_plan {
646            // Common sub-plan sharing.
647            plan = plan.common_subplan_sharing();
648            plan = plan.prune_share();
649            if explain_trace {
650                ctx.trace("Common Sub-plan Sharing:");
651                ctx.trace(plan.explain_to_string());
652            }
653        } else {
654            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
655
656            // Replace source to share source.
657            // Perform share source at the beginning so that we can benefit from predicate pushdown
658            // and column pruning for the share operator.
659            plan = ShareSourceRewriter::share_source(plan);
660            if explain_trace {
661                ctx.trace("Share Source:");
662                ctx.trace(plan.explain_to_string());
663            }
664        }
665        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
666        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
667        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
668        // Should be applied before converting table function to project set.
669        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
670        // In order to unnest a table function, we need to convert it into a `project_set` first.
671        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
672
673        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
674        if has_logical_max_one_row(plan.clone()) {
675            // `MaxOneRow` is currently only used for the runtime check of
676            // scalar subqueries, while it's not supported in streaming mode, so
677            // we raise a precise error here.
678            bail!("Scalar subquery might produce more than one row.");
679        }
680
681        // Same to batch plan optimization, this rule shall be applied before
682        // predicate push down
683        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
684
685        // Predicate Push-down
686        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
687
688        if plan.ctx().session_ctx().config().enable_join_ordering() {
689            // Merge inner joins and intermediate filters into multijoin
690            // This rule assumes that filters have already been pushed down near to
691            // their relevant joins.
692            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
693
694            // Reorder multijoin into join tree.
695            if plan
696                .ctx()
697                .session_ctx()
698                .config()
699                .streaming_enable_bushy_join()
700            {
701                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
702            } else {
703                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
704            }
705        }
706
707        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
708        // conditions into a filter above the multijoin.
709        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
710
711        // For stream, push down predicates with now into a left-semi join
712        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
713
714        // Push down the calculation of inputs of join's condition.
715        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
716
717        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
718        // Must push down predicates again after split over window so that OverWindow can be
719        // optimized to TopN.
720        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
721        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
722        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
723
724        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
725        // TODO: better naming of the OptimizationStage
726        // Convert distinct aggregates.
727        plan = if force_split_distinct_agg {
728            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
729        } else {
730            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
731        };
732
733        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
734
735        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
736
737        // Do a final column pruning and predicate pushing down to clean up the plan.
738        plan = Self::column_pruning(plan, explain_trace, &ctx);
739        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
740
741        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
742        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
743
744        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
745
746        #[cfg(debug_assertions)]
747        InputRefValidator.validate(plan.clone());
748
749        ctx.may_store_explain_logical(&plan);
750
751        Ok(plan)
752    }
753
754    pub fn gen_optimized_logical_plan_for_batch(
755        mut plan: LogicalPlanRef,
756    ) -> Result<LogicalPlanRef> {
757        let ctx = plan.ctx();
758        let explain_trace = ctx.is_explain_trace();
759
760        if explain_trace {
761            ctx.trace("Begin:");
762            ctx.trace(plan.explain_to_string());
763        }
764
765        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
766        plan = Self::inline_now_proc_time(plan, &ctx);
767
768        // Convert the dag back to the tree, because we don't support DAG plan for batch.
769        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
770
771        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
772        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
773        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
774        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
775        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
776        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
777        // Table function should be converted into `file_scan` before `project_set`.
778        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
779        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
780        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
781        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
782        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
783        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
784        // In order to unnest a table function, we need to convert it into a `project_set` first.
785        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
786
787        plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
788
789        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
790
791        // Filter simplification must be applied before predicate push-down
792        // otherwise the filter for some nodes (e.g., `LogicalScan`)
793        // may not be properly applied.
794        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
795
796        // Predicate Push-down
797        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
798        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
799
800        if plan.ctx().session_ctx().config().enable_join_ordering() {
801            // Merge inner joins and intermediate filters into multijoin
802            // This rule assumes that filters have already been pushed down near to
803            // their relevant joins.
804            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
805
806            // Reorder multijoin into left-deep join tree.
807            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
808        }
809
810        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
811        // conditions into a filter above the multijoin.
812        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
813            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
814            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
815        }
816
817        // Push down the calculation of inputs of join's condition.
818        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
819
820        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
821        // Must push down predicates again after split over window so that OverWindow can be
822        // optimized to TopN.
823        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
824            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
825            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
826        }
827        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
828        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
829
830        // Convert distinct aggregates.
831        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
832
833        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
834
835        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
836
837        // Do a final column pruning and predicate pushing down to clean up the plan.
838        plan = Self::column_pruning(plan, explain_trace, &ctx);
839        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
840            (#[allow(unused_assignments)]
841            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
842            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
843        }
844
845        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
846        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
847
848        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
849
850        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
851
852        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
853
854        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
855
856        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
857
858        #[cfg(debug_assertions)]
859        InputRefValidator.validate(plan.clone());
860
861        ctx.may_store_explain_logical(&plan);
862
863        Ok(plan)
864    }
865}