risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use crate::optimizer::plan_node::generic::GenericPlanRef;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply internal backfill progress rule first
139            TableFunctionToInternalBackfillProgressRule::create(),
140            // Apply internal source backfill progress rule next
141            TableFunctionToInternalSourceBackfillProgressRule::create(),
142            // Apply internal get channel delta stats rule next
143            TableFunctionToInternalGetChannelDeltaStatsRule::create(),
144            // Apply postgres query rule next
145            TableFunctionToPostgresQueryRule::create(),
146            // Apply mysql query rule next
147            TableFunctionToMySqlQueryRule::create(),
148            // Apply project set rule last
149            TableFunctionToProjectSetRule::create(),
150        ],
151        ApplyOrder::TopDown,
152    )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156    OptimizationStage::new(
157        "Table Function To FileScan",
158        vec![TableFunctionToFileScanRule::create()],
159        ApplyOrder::TopDown,
160    )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164    OptimizationStage::new(
165        "Table Function To PostgresQuery",
166        vec![TableFunctionToPostgresQueryRule::create()],
167        ApplyOrder::TopDown,
168    )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172    OptimizationStage::new(
173        "Table Function To MySQL",
174        vec![TableFunctionToMySqlQueryRule::create()],
175        ApplyOrder::TopDown,
176    )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180    LazyLock::new(|| {
181        OptimizationStage::new(
182            "Table Function To Internal Backfill Progress",
183            vec![TableFunctionToInternalBackfillProgressRule::create()],
184            ApplyOrder::TopDown,
185        )
186    });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189    LazyLock::new(|| {
190        OptimizationStage::new(
191            "Table Function To Internal Source Backfill Progress",
192            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193            ApplyOrder::TopDown,
194        )
195    });
196
197static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
198    LazyLock::new(|| {
199        OptimizationStage::new(
200            "Table Function To Internal Get Channel Delta Stats",
201            vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
202            ApplyOrder::TopDown,
203        )
204    });
205
206static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
207    OptimizationStage::new(
208        "Values Extract Project",
209        vec![ValuesExtractProjectRule::create()],
210        ApplyOrder::TopDown,
211    )
212});
213
214static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
215    OptimizationStage::new(
216        "Simple Unnesting",
217        vec![
218            // Pull correlated predicates up the algebra tree to unnest simple subquery.
219            PullUpCorrelatedPredicateRule::create(),
220            // Pull correlated project expressions with values to inline scalar subqueries.
221            PullUpCorrelatedProjectValueRule::create(),
222            PullUpCorrelatedPredicateAggRule::create(),
223            // Eliminate max one row
224            MaxOneRowEliminateRule::create(),
225            // Convert apply to join.
226            ApplyToJoinRule::create(),
227        ],
228        ApplyOrder::BottomUp,
229    )
230});
231
232static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
233    OptimizationStage::new(
234        "Set Operation Merge",
235        vec![
236            UnionMergeRule::create(),
237            IntersectMergeRule::create(),
238            ExceptMergeRule::create(),
239        ],
240        ApplyOrder::BottomUp,
241    )
242});
243
244static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
245    LazyLock::new(|| {
246        OptimizationStage::new(
247            "General Unnesting(Translate Apply)",
248            vec![TranslateApplyRule::create(true)],
249            ApplyOrder::TopDown,
250        )
251    });
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
254    LazyLock::new(|| {
255        OptimizationStage::new(
256            "General Unnesting(Translate Apply)",
257            vec![TranslateApplyRule::create(false)],
258            ApplyOrder::TopDown,
259        )
260    });
261
262static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
263    OptimizationStage::new(
264        "General Unnesting(Push Down Apply)",
265        vec![
266            ApplyEliminateRule::create(),
267            ApplyAggTransposeRule::create(),
268            ApplyDedupTransposeRule::create(),
269            ApplyFilterTransposeRule::create(),
270            ApplyProjectTransposeRule::create(),
271            ApplyProjectSetTransposeRule::create(),
272            ApplyTopNTransposeRule::create(),
273            ApplyLimitTransposeRule::create(),
274            ApplyJoinTransposeRule::create(),
275            ApplyUnionTransposeRule::create(),
276            ApplyOverWindowTransposeRule::create(),
277            ApplyExpandTransposeRule::create(),
278            ApplyHopWindowTransposeRule::create(),
279            CrossJoinEliminateRule::create(),
280            ApplyShareEliminateRule::create(),
281        ],
282        ApplyOrder::TopDown,
283    )
284});
285
286static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
287    OptimizationStage::new(
288        "To MultiJoin",
289        vec![MergeMultiJoinRule::create()],
290        ApplyOrder::TopDown,
291    )
292});
293
294static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
295    OptimizationStage::new(
296        "Join Ordering".to_owned(),
297        vec![LeftDeepTreeJoinOrderingRule::create()],
298        ApplyOrder::TopDown,
299    )
300});
301
302static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
303    OptimizationStage::new(
304        "Join Ordering".to_owned(),
305        vec![BushyTreeJoinOrderingRule::create()],
306        ApplyOrder::TopDown,
307    )
308});
309
310static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
311    OptimizationStage::new(
312        "Push down filter with now into a left semijoin",
313        vec![
314            SplitNowAndRule::create(),
315            SplitNowOrRule::create(),
316            FilterWithNowToJoinRule::create(),
317        ],
318        ApplyOrder::TopDown,
319    )
320});
321
322static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
323    OptimizationStage::new(
324        "Push down the calculation of inputs of join's condition",
325        vec![PushCalculationOfJoinRule::create()],
326        ApplyOrder::TopDown,
327    )
328});
329
330static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
331    OptimizationStage::new(
332        "Convert Distinct Aggregation",
333        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
334        ApplyOrder::TopDown,
335    )
336});
337
338static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
339    OptimizationStage::new(
340        "Convert Distinct Aggregation",
341        vec![
342            UnionToDistinctRule::create(),
343            DistinctAggRule::create(false),
344        ],
345        ApplyOrder::TopDown,
346    )
347});
348
349static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
350    OptimizationStage::new(
351        "Simplify Aggregation",
352        vec![
353            AggGroupBySimplifyRule::create(),
354            AggCallMergeRule::create(),
355            UnifyFirstLastValueRule::create(),
356        ],
357        ApplyOrder::TopDown,
358    )
359});
360
361static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
362    OptimizationStage::new(
363        "Join Commute".to_owned(),
364        vec![JoinCommuteRule::create()],
365        ApplyOrder::TopDown,
366    )
367});
368
369static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
370    OptimizationStage::new(
371        "Constant Output Operator Remove",
372        vec![EmptyAggRemoveRule::create()],
373        ApplyOrder::TopDown,
374    )
375});
376
377static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
378    OptimizationStage::new(
379        "Project Remove",
380        vec![
381            // merge should be applied before eliminate
382            ProjectMergeRule::create(),
383            ProjectEliminateRule::create(),
384            TrivialProjectToValuesRule::create(),
385            UnionInputValuesMergeRule::create(),
386            JoinProjectTransposeRule::create(),
387            // project-join merge should be applied after merge
388            // eliminate and to values
389            ProjectJoinMergeRule::create(),
390            AggProjectMergeRule::create(),
391        ],
392        ApplyOrder::BottomUp,
393    )
394});
395
396static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
397    OptimizationStage::new(
398        "Split Over Window",
399        vec![OverWindowSplitRule::create()],
400        ApplyOrder::TopDown,
401    )
402});
403
404// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
405// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
406// 2. should be after merge the multiple projects
407static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
408    OptimizationStage::new(
409        "Convert Over Window",
410        vec![
411            ProjectMergeRule::create(),
412            ProjectEliminateRule::create(),
413            TrivialProjectToValuesRule::create(),
414            UnionInputValuesMergeRule::create(),
415            OverWindowToAggAndJoinRule::create(),
416            OverWindowToTopNRule::create(),
417        ],
418        ApplyOrder::TopDown,
419    )
420});
421
422// DataFusion cannot apply `OverWindowToTopNRule`
423static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
424    OptimizationStage::new(
425        "Convert Over Window",
426        vec![
427            ProjectMergeRule::create(),
428            ProjectEliminateRule::create(),
429            TrivialProjectToValuesRule::create(),
430            UnionInputValuesMergeRule::create(),
431            OverWindowToAggAndJoinRule::create(),
432        ],
433        ApplyOrder::TopDown,
434    )
435});
436
437static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
438    OptimizationStage::new(
439        "Merge Over Window",
440        vec![OverWindowMergeRule::create()],
441        ApplyOrder::TopDown,
442    )
443});
444
445static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
446    OptimizationStage::new(
447        "Rewrite Like Expr",
448        vec![RewriteLikeExprRule::create()],
449        ApplyOrder::TopDown,
450    )
451});
452
453static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
454    OptimizationStage::new(
455        "TopN/SimpleAgg on Index",
456        vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
457        ApplyOrder::TopDown,
458    )
459});
460
461static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
462    OptimizationStage::new(
463        "Void always-false filter's downstream",
464        vec![AlwaysFalseFilterRule::create()],
465        ApplyOrder::TopDown,
466    )
467});
468
469static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
470    OptimizationStage::new(
471        "Push Down Limit",
472        vec![LimitPushDownRule::create()],
473        ApplyOrder::TopDown,
474    )
475});
476
477static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
478    OptimizationStage::new(
479        "Pull Up Hop",
480        vec![PullUpHopRule::create()],
481        ApplyOrder::BottomUp,
482    )
483});
484
485static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
486    OptimizationStage::new(
487        "Set Operation To Join",
488        vec![
489            IntersectToSemiJoinRule::create(),
490            ExceptToAntiJoinRule::create(),
491        ],
492        ApplyOrder::BottomUp,
493    )
494});
495
496static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
497    OptimizationStage::new(
498        "Grouping Sets",
499        vec![
500            GroupingSetsToExpandRule::create(),
501            ExpandToProjectRule::create(),
502        ],
503        ApplyOrder::TopDown,
504    )
505});
506
507static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
508    OptimizationStage::new(
509        "Common Sub Expression Extract",
510        vec![CommonSubExprExtractRule::create()],
511        ApplyOrder::TopDown,
512    )
513});
514
515static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
516    OptimizationStage::new(
517        "Logical Filter Expression Simplify",
518        vec![LogicalFilterExpressionSimplifyRule::create()],
519        ApplyOrder::TopDown,
520    )
521});
522
523static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
524    OptimizationStage::new(
525        "Rewrite Source For Batch",
526        vec![
527            SourceToKafkaScanRule::create(),
528            SourceToIcebergScanRule::create(),
529        ],
530        ApplyOrder::TopDown,
531    )
532});
533
534static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
535    OptimizationStage::new(
536        "TopN to Vector Search",
537        vec![TopNToVectorSearchRule::create()],
538        ApplyOrder::BottomUp,
539    )
540});
541
542static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
543    LazyLock::new(|| {
544        OptimizationStage::new(
545            "Correlated TopN to Vector Search",
546            vec![CorrelatedTopNToVectorSearchRule::create(true)],
547            ApplyOrder::BottomUp,
548        )
549    });
550
551static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
552    LazyLock::new(|| {
553        OptimizationStage::new(
554            "Correlated TopN to Vector Search",
555            vec![CorrelatedTopNToVectorSearchRule::create(false)],
556            ApplyOrder::BottomUp,
557        )
558    });
559
560impl LogicalOptimizer {
561    pub fn predicate_pushdown(
562        plan: LogicalPlanRef,
563        explain_trace: bool,
564        ctx: &OptimizerContextRef,
565    ) -> LogicalPlanRef {
566        let plan = plan.predicate_pushdown(
567            Condition::true_cond(),
568            &mut PredicatePushdownContext::new(plan.clone()),
569        );
570        if explain_trace {
571            ctx.trace("Predicate Push Down:");
572            ctx.trace(plan.explain_to_string());
573        }
574        plan
575    }
576
577    pub fn subquery_unnesting(
578        mut plan: LogicalPlanRef,
579        enable_share_plan: bool,
580        explain_trace: bool,
581        ctx: &OptimizerContextRef,
582    ) -> Result<LogicalPlanRef> {
583        // Bail our if no apply operators.
584        if !has_logical_apply(plan.clone()) {
585            return Ok(plan);
586        }
587        // Simple Unnesting.
588        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
589        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
590        // Predicate push down before translate apply, because we need to calculate the domain
591        // and predicate push down can reduce the size of domain.
592        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
593        // In order to unnest values with correlated input ref, we need to extract project first.
594        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
595        // General Unnesting.
596        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
597        // join.
598        plan = if enable_share_plan {
599            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
600        } else {
601            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
602        };
603        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
604
605        // Check if all `Apply`s are eliminated and the subquery is unnested.
606        plan.check_apply_elimination()?;
607
608        Ok(plan)
609    }
610
611    pub fn column_pruning(
612        mut plan: LogicalPlanRef,
613        explain_trace: bool,
614        ctx: &OptimizerContextRef,
615    ) -> LogicalPlanRef {
616        let required_cols = (0..plan.schema().len()).collect_vec();
617        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
618        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
619        // Column pruning may introduce additional projects, and filter can be pushed again.
620        if explain_trace {
621            ctx.trace("Prune Columns:");
622            ctx.trace(plan.explain_to_string());
623        }
624
625        if column_pruning_ctx.need_second_round() {
626            // Second round of column pruning and reuse the column pruning context.
627            // Try to replace original share operator with the new one.
628            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
629            if explain_trace {
630                ctx.trace("Prune Columns (For DAG):");
631                ctx.trace(plan.explain_to_string());
632            }
633        }
634        plan
635    }
636
637    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
638        // If now() and proctime() are not found, bail out.
639        let mut v = NowProcTimeFinder::default();
640        plan.visit_exprs_recursive(&mut v);
641        if !v.has() {
642            return plan;
643        }
644
645        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
646
647        let plan = plan.rewrite_exprs_recursive(&mut v);
648
649        if ctx.is_explain_trace() {
650            ctx.trace("Inline Now and ProcTime:");
651            ctx.trace(plan.explain_to_string());
652        }
653        plan
654    }
655
656    pub fn gen_optimized_logical_plan_for_stream(
657        mut plan: LogicalPlanRef,
658    ) -> Result<LogicalPlanRef> {
659        let ctx = plan.ctx();
660        let explain_trace = ctx.is_explain_trace();
661
662        if explain_trace {
663            ctx.trace("Begin:");
664            ctx.trace(plan.explain_to_string());
665        }
666
667        // Convert grouping sets at first because other agg rule can't handle grouping sets.
668        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
669        // Remove nodes with constant output.
670        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
671        // Remove project to make common sub-plan sharing easier.
672        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
673
674        // If share plan is disable, we need to remove all the share operator generated by the
675        // binder, e.g. CTE and View. However, we still need to share source to ensure self
676        // source join can return correct result.
677        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
678        if enable_share_plan {
679            // Common sub-plan sharing.
680            plan = plan.common_subplan_sharing();
681            plan = plan.prune_share();
682            if explain_trace {
683                ctx.trace("Common Sub-plan Sharing:");
684                ctx.trace(plan.explain_to_string());
685            }
686        } else {
687            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
688
689            // Replace source to share source.
690            // Perform share source at the beginning so that we can benefit from predicate pushdown
691            // and column pruning for the share operator.
692            plan = ShareSourceRewriter::share_source(plan);
693            if explain_trace {
694                ctx.trace("Share Source:");
695                ctx.trace(plan.explain_to_string());
696            }
697        }
698        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
699        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
700        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
701        // Should be applied before converting table function to project set.
702        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
703        // In order to unnest a table function, we need to convert it into a `project_set` first.
704        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
705
706        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
707
708        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
709        if has_logical_max_one_row(plan.clone()) {
710            // `MaxOneRow` is currently only used for the runtime check of
711            // scalar subqueries, while it's not supported in streaming mode, so
712            // we raise a precise error here.
713            bail!("Scalar subquery might produce more than one row.");
714        }
715
716        // Same to batch plan optimization, this rule shall be applied before
717        // predicate push down
718        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
719
720        // Predicate Push-down
721        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
722
723        if plan.ctx().session_ctx().config().enable_join_ordering() {
724            // Merge inner joins and intermediate filters into multijoin
725            // This rule assumes that filters have already been pushed down near to
726            // their relevant joins.
727            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
728
729            // Reorder multijoin into join tree.
730            if plan
731                .ctx()
732                .session_ctx()
733                .config()
734                .streaming_enable_bushy_join()
735            {
736                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
737            } else {
738                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
739            }
740        }
741
742        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
743        // conditions into a filter above the multijoin.
744        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
745
746        // For stream, push down predicates with now into a left-semi join
747        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
748
749        // Push down the calculation of inputs of join's condition.
750        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
751
752        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
753        // Must push down predicates again after split over window so that OverWindow can be
754        // optimized to TopN.
755        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
756        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
757        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
758
759        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
760        // TODO: better naming of the OptimizationStage
761        // Convert distinct aggregates.
762        plan = if force_split_distinct_agg {
763            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
764        } else {
765            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
766        };
767
768        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
769
770        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
771
772        // Do a final column pruning and predicate pushing down to clean up the plan.
773        plan = Self::column_pruning(plan, explain_trace, &ctx);
774        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
775
776        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
777        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
778
779        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
780
781        #[cfg(debug_assertions)]
782        InputRefValidator.validate(plan.clone());
783
784        ctx.may_store_explain_logical(&plan);
785
786        Ok(plan)
787    }
788
789    pub fn gen_optimized_logical_plan_for_batch(
790        mut plan: LogicalPlanRef,
791    ) -> Result<LogicalPlanRef> {
792        let ctx = plan.ctx();
793        let explain_trace = ctx.is_explain_trace();
794
795        if explain_trace {
796            ctx.trace("Begin:");
797            ctx.trace(plan.explain_to_string());
798        }
799
800        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
801        plan = Self::inline_now_proc_time(plan, &ctx);
802
803        // Convert the dag back to the tree, because we don't support DAG plan for batch.
804        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
805
806        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
807        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
808        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
809        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
810        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
811        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
812        // Table function should be converted into `file_scan` before `project_set`.
813        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
814        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
815        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
816        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
817        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
818        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
819        // In order to unnest a table function, we need to convert it into a `project_set` first.
820        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
821
822        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
823
824        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
825
826        // Filter simplification must be applied before predicate push-down
827        // otherwise the filter for some nodes (e.g., `LogicalScan`)
828        // may not be properly applied.
829        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
830
831        // Predicate Push-down
832        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
833        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
834
835        if plan.ctx().session_ctx().config().enable_join_ordering() {
836            // Merge inner joins and intermediate filters into multijoin
837            // This rule assumes that filters have already been pushed down near to
838            // their relevant joins.
839            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
840
841            // Reorder multijoin into left-deep join tree.
842            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
843        }
844
845        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
846        // conditions into a filter above the multijoin.
847        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
848            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
849            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
850        }
851
852        // Push down the calculation of inputs of join's condition.
853        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
854
855        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
856        // Must push down predicates again after split over window so that OverWindow can be
857        // optimized to TopN.
858        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
859            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
860            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
861        }
862        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
863        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
864
865        // Convert distinct aggregates.
866        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
867
868        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
869
870        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
871
872        plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
873
874        // Do a final column pruning and predicate pushing down to clean up the plan.
875        plan = Self::column_pruning(plan, explain_trace, &ctx);
876        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
877            (#[allow(unused_assignments)]
878            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
879            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
880        }
881
882        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
883        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
884
885        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
886
887        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
888
889        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
890
891        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
892
893        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
894
895        #[cfg(debug_assertions)]
896        InputRefValidator.validate(plan.clone());
897
898        ctx.may_store_explain_logical(&plan);
899
900        Ok(plan)
901    }
902}