risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::sync::LazyLock;
111
112use crate::optimizer::plan_node::generic::GenericPlanRef;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117    OptimizationStage::new(
118        "DAG To Tree",
119        vec![DagToTreeRule::create()],
120        ApplyOrder::TopDown,
121    )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125    OptimizationStage::new(
126        "Convert GENERATE_SERIES Ends With NOW",
127        vec![GenerateSeriesWithNowRule::create()],
128        ApplyOrder::TopDown,
129    )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133    OptimizationStage::new(
134        "Table Function Convert",
135        vec![
136            // Apply file scan rule first
137            TableFunctionToFileScanRule::create(),
138            // Apply internal backfill progress rule first
139            TableFunctionToInternalBackfillProgressRule::create(),
140            // Apply internal source backfill progress rule next
141            TableFunctionToInternalSourceBackfillProgressRule::create(),
142            // Apply internal get channel delta stats rule next
143            TableFunctionToInternalGetChannelDeltaStatsRule::create(),
144            // Apply postgres query rule next
145            TableFunctionToPostgresQueryRule::create(),
146            // Apply mysql query rule next
147            TableFunctionToMySqlQueryRule::create(),
148            // Apply project set rule last
149            TableFunctionToProjectSetRule::create(),
150        ],
151        ApplyOrder::TopDown,
152    )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156    OptimizationStage::new(
157        "Table Function To FileScan",
158        vec![TableFunctionToFileScanRule::create()],
159        ApplyOrder::TopDown,
160    )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164    OptimizationStage::new(
165        "Table Function To PostgresQuery",
166        vec![TableFunctionToPostgresQueryRule::create()],
167        ApplyOrder::TopDown,
168    )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172    OptimizationStage::new(
173        "Table Function To MySQL",
174        vec![TableFunctionToMySqlQueryRule::create()],
175        ApplyOrder::TopDown,
176    )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180    LazyLock::new(|| {
181        OptimizationStage::new(
182            "Table Function To Internal Backfill Progress",
183            vec![TableFunctionToInternalBackfillProgressRule::create()],
184            ApplyOrder::TopDown,
185        )
186    });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189    LazyLock::new(|| {
190        OptimizationStage::new(
191            "Table Function To Internal Source Backfill Progress",
192            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193            ApplyOrder::TopDown,
194        )
195    });
196
197static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
198    LazyLock::new(|| {
199        OptimizationStage::new(
200            "Table Function To Internal Get Channel Delta Stats",
201            vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
202            ApplyOrder::TopDown,
203        )
204    });
205
206static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
207    OptimizationStage::new(
208        "Values Extract Project",
209        vec![ValuesExtractProjectRule::create()],
210        ApplyOrder::TopDown,
211    )
212});
213
214static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
215    OptimizationStage::new(
216        "Simple Unnesting",
217        vec![
218            // Pull correlated predicates up the algebra tree to unnest simple subquery.
219            PullUpCorrelatedPredicateRule::create(),
220            // Pull correlated project expressions with values to inline scalar subqueries.
221            PullUpCorrelatedProjectValueRule::create(),
222            PullUpCorrelatedPredicateAggRule::create(),
223            // Eliminate max one row
224            MaxOneRowEliminateRule::create(),
225            // Eliminate lateral table-function apply into a unary ProjectSet.
226            ApplyTableFunctionToProjectSetRule::create(),
227            // Convert apply to join.
228            ApplyToJoinRule::create(),
229        ],
230        ApplyOrder::BottomUp,
231    )
232});
233
234static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
235    OptimizationStage::new(
236        "Set Operation Merge",
237        vec![
238            UnionMergeRule::create(),
239            IntersectMergeRule::create(),
240            ExceptMergeRule::create(),
241        ],
242        ApplyOrder::BottomUp,
243    )
244});
245
246static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
247    LazyLock::new(|| {
248        OptimizationStage::new(
249            "General Unnesting(Translate Apply)",
250            vec![TranslateApplyRule::create(true)],
251            ApplyOrder::TopDown,
252        )
253    });
254
255static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
256    LazyLock::new(|| {
257        OptimizationStage::new(
258            "General Unnesting(Translate Apply)",
259            vec![TranslateApplyRule::create(false)],
260            ApplyOrder::TopDown,
261        )
262    });
263
264static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
265    OptimizationStage::new(
266        "General Unnesting(Push Down Apply)",
267        vec![
268            ApplyEliminateRule::create(),
269            ApplyAggTransposeRule::create(),
270            ApplyDedupTransposeRule::create(),
271            ApplyFilterTransposeRule::create(),
272            ApplyProjectTransposeRule::create(),
273            ApplyProjectSetTransposeRule::create(),
274            ApplyTopNTransposeRule::create(),
275            ApplyLimitTransposeRule::create(),
276            ApplyJoinTransposeRule::create(),
277            ApplyUnionTransposeRule::create(),
278            ApplyOverWindowTransposeRule::create(),
279            ApplyExpandTransposeRule::create(),
280            ApplyHopWindowTransposeRule::create(),
281            CrossJoinEliminateRule::create(),
282            ApplyShareEliminateRule::create(),
283        ],
284        ApplyOrder::TopDown,
285    )
286});
287
288static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
289    OptimizationStage::new(
290        "To MultiJoin",
291        vec![MergeMultiJoinRule::create()],
292        ApplyOrder::TopDown,
293    )
294});
295
296static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
297    OptimizationStage::new(
298        "Join Ordering".to_owned(),
299        vec![LeftDeepTreeJoinOrderingRule::create()],
300        ApplyOrder::TopDown,
301    )
302});
303
304static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
305    OptimizationStage::new(
306        "Join Ordering".to_owned(),
307        vec![BushyTreeJoinOrderingRule::create()],
308        ApplyOrder::TopDown,
309    )
310});
311
312static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
313    OptimizationStage::new(
314        "Push down filter with now into a left semijoin",
315        vec![
316            SplitNowAndRule::create(),
317            SplitNowOrRule::create(),
318            FilterWithNowToJoinRule::create(),
319        ],
320        ApplyOrder::TopDown,
321    )
322});
323
324static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
325    OptimizationStage::new(
326        "Push down the calculation of inputs of join's condition",
327        vec![PushCalculationOfJoinRule::create()],
328        ApplyOrder::TopDown,
329    )
330});
331
332static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
333    OptimizationStage::new(
334        "Convert Distinct Aggregation",
335        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
336        ApplyOrder::TopDown,
337    )
338});
339
340static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
341    OptimizationStage::new(
342        "Convert Distinct Aggregation",
343        vec![
344            UnionToDistinctRule::create(),
345            DistinctAggRule::create(false),
346        ],
347        ApplyOrder::TopDown,
348    )
349});
350
351static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
352    OptimizationStage::new(
353        "Simplify Aggregation",
354        vec![
355            AggGroupBySimplifyRule::create(),
356            AggCallMergeRule::create(),
357            UnifyFirstLastValueRule::create(),
358        ],
359        ApplyOrder::TopDown,
360    )
361});
362
363static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
364    OptimizationStage::new(
365        "Join Commute".to_owned(),
366        vec![JoinCommuteRule::create()],
367        ApplyOrder::TopDown,
368    )
369});
370
371static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
372    OptimizationStage::new(
373        "Constant Output Operator Remove",
374        vec![EmptyAggRemoveRule::create()],
375        ApplyOrder::TopDown,
376    )
377});
378
379static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
380    OptimizationStage::new(
381        "Project Remove",
382        vec![
383            // merge should be applied before eliminate
384            ProjectMergeRule::create(),
385            ProjectEliminateRule::create(),
386            TrivialProjectToValuesRule::create(),
387            UnionInputValuesMergeRule::create(),
388            JoinProjectTransposeRule::create(),
389            // project-join merge should be applied after merge
390            // eliminate and to values
391            ProjectJoinMergeRule::create(),
392            AggProjectMergeRule::create(),
393        ],
394        ApplyOrder::BottomUp,
395    )
396});
397
398static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
399    OptimizationStage::new(
400        "Split Over Window",
401        vec![OverWindowSplitRule::create()],
402        ApplyOrder::TopDown,
403    )
404});
405
406// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
407// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
408// 2. should be after merge the multiple projects
409static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
410    OptimizationStage::new(
411        "Convert Over Window",
412        vec![
413            ProjectMergeRule::create(),
414            ProjectEliminateRule::create(),
415            TrivialProjectToValuesRule::create(),
416            UnionInputValuesMergeRule::create(),
417            OverWindowToAggAndJoinRule::create(),
418            OverWindowToTopNRule::create(),
419        ],
420        ApplyOrder::TopDown,
421    )
422});
423
424// DataFusion cannot apply `OverWindowToTopNRule`
425static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
426    OptimizationStage::new(
427        "Convert Over Window",
428        vec![
429            ProjectMergeRule::create(),
430            ProjectEliminateRule::create(),
431            TrivialProjectToValuesRule::create(),
432            UnionInputValuesMergeRule::create(),
433            OverWindowToAggAndJoinRule::create(),
434        ],
435        ApplyOrder::TopDown,
436    )
437});
438
439static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
440    OptimizationStage::new(
441        "Merge Over Window",
442        vec![OverWindowMergeRule::create()],
443        ApplyOrder::TopDown,
444    )
445});
446
447static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
448    OptimizationStage::new(
449        "Rewrite Like Expr",
450        vec![RewriteLikeExprRule::create()],
451        ApplyOrder::TopDown,
452    )
453});
454
455static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
456    OptimizationStage::new(
457        "TopN/SimpleAgg on Index",
458        vec![
459            TopNProjectTransposeRule::create(),
460            TopNOnIndexRule::create(),
461            MinMaxOnIndexRule::create(),
462        ],
463        ApplyOrder::TopDown,
464    )
465});
466
467static PROJECT_TOP_N_TRANSPOSE: LazyLock<OptimizationStage> = LazyLock::new(|| {
468    OptimizationStage::new(
469        "Project TopN Transpose",
470        vec![ProjectTopNTransposeRule::create()],
471        ApplyOrder::TopDown,
472    )
473});
474
475static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
476    OptimizationStage::new(
477        "Void always-false filter's downstream",
478        vec![AlwaysFalseFilterRule::create()],
479        ApplyOrder::TopDown,
480    )
481});
482
483static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
484    OptimizationStage::new(
485        "Push Down Limit",
486        vec![LimitPushDownRule::create()],
487        ApplyOrder::TopDown,
488    )
489});
490
491static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
492    OptimizationStage::new(
493        "Pull Up Hop",
494        vec![PullUpHopRule::create()],
495        ApplyOrder::BottomUp,
496    )
497});
498
499static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
500    OptimizationStage::new(
501        "Set Operation To Join",
502        vec![
503            IntersectToSemiJoinRule::create(),
504            ExceptToAntiJoinRule::create(),
505        ],
506        ApplyOrder::BottomUp,
507    )
508});
509
510static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
511    OptimizationStage::new(
512        "Grouping Sets",
513        vec![
514            GroupingSetsToExpandRule::create(),
515            ExpandToProjectRule::create(),
516        ],
517        ApplyOrder::TopDown,
518    )
519});
520
521static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
522    OptimizationStage::new(
523        "Common Sub Expression Extract",
524        vec![CommonSubExprExtractRule::create()],
525        ApplyOrder::TopDown,
526    )
527});
528
529static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
530    OptimizationStage::new(
531        "Logical Filter Expression Simplify",
532        vec![LogicalFilterExpressionSimplifyRule::create()],
533        ApplyOrder::TopDown,
534    )
535});
536
537static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
538    OptimizationStage::new(
539        "Rewrite Source For Batch",
540        vec![
541            SourceToKafkaScanRule::create(),
542            // For Iceberg, we use the intermediate scan to defer metadata reading
543            // until after predicate pushdown and column pruning
544            SourceToIcebergIntermediateScanRule::create(),
545        ],
546        ApplyOrder::TopDown,
547    )
548});
549
550static MATERIALIZE_ICEBERG_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
551    OptimizationStage::new(
552        "Materialize Iceberg Scan",
553        vec![IcebergIntermediateScanRule::create()],
554        ApplyOrder::TopDown,
555    )
556});
557
558static ICEBERG_COUNT_STAR: LazyLock<OptimizationStage> = LazyLock::new(|| {
559    OptimizationStage::new(
560        "Iceberg Count Star Optimization",
561        vec![IcebergCountStarRule::create()],
562        ApplyOrder::BottomUp,
563    )
564});
565
566static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
567    OptimizationStage::new(
568        "TopN to Vector Search",
569        vec![TopNToVectorSearchRule::create()],
570        ApplyOrder::BottomUp,
571    )
572});
573
574static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
575    LazyLock::new(|| {
576        OptimizationStage::new(
577            "Correlated TopN to Vector Search",
578            vec![CorrelatedTopNToVectorSearchRule::create(true)],
579            ApplyOrder::BottomUp,
580        )
581    });
582
583static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
584    LazyLock::new(|| {
585        OptimizationStage::new(
586            "Correlated TopN to Vector Search",
587            vec![CorrelatedTopNToVectorSearchRule::create(false)],
588            ApplyOrder::BottomUp,
589        )
590    });
591
592impl LogicalOptimizer {
593    pub fn predicate_pushdown(
594        plan: LogicalPlanRef,
595        explain_trace: bool,
596        ctx: &OptimizerContextRef,
597    ) -> LogicalPlanRef {
598        let plan = plan.predicate_pushdown(
599            Condition::true_cond(),
600            &mut PredicatePushdownContext::new(plan.clone()),
601        );
602        if explain_trace {
603            ctx.trace("Predicate Push Down:");
604            ctx.trace(plan.explain_to_string());
605        }
606        plan
607    }
608
609    pub fn subquery_unnesting(
610        mut plan: LogicalPlanRef,
611        enable_share_plan: bool,
612        explain_trace: bool,
613        ctx: &OptimizerContextRef,
614    ) -> Result<LogicalPlanRef> {
615        // Bail our if no apply operators.
616        if !has_logical_apply(plan.clone()) {
617            return Ok(plan);
618        }
619        // Simple Unnesting.
620        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
621        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
622        // Predicate push down before translate apply, because we need to calculate the domain
623        // and predicate push down can reduce the size of domain.
624        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
625        // In order to unnest values with correlated input ref, we need to extract project first.
626        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
627        // General Unnesting.
628        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
629        // join.
630        plan = if enable_share_plan {
631            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
632        } else {
633            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
634        };
635        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
636
637        // Check if all `Apply`s are eliminated and the subquery is unnested.
638        plan.check_apply_elimination()?;
639
640        Ok(plan)
641    }
642
643    pub fn column_pruning(
644        mut plan: LogicalPlanRef,
645        explain_trace: bool,
646        ctx: &OptimizerContextRef,
647    ) -> LogicalPlanRef {
648        let required_cols = (0..plan.schema().len()).collect_vec();
649        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
650        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
651        // Column pruning may introduce additional projects, and filter can be pushed again.
652        if explain_trace {
653            ctx.trace("Prune Columns:");
654            ctx.trace(plan.explain_to_string());
655        }
656
657        if column_pruning_ctx.need_second_round() {
658            // Second round of column pruning and reuse the column pruning context.
659            // Try to replace original share operator with the new one.
660            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
661            if explain_trace {
662                ctx.trace("Prune Columns (For DAG):");
663                ctx.trace(plan.explain_to_string());
664            }
665        }
666        plan
667    }
668
669    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
670        // If now() and proctime() are not found, bail out.
671        let mut v = NowProcTimeFinder::default();
672        plan.visit_exprs_recursive(&mut v);
673        if !v.has() {
674            return plan;
675        }
676
677        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
678
679        let plan = plan.rewrite_exprs_recursive(&mut v);
680
681        if ctx.is_explain_trace() {
682            ctx.trace("Inline Now and ProcTime:");
683            ctx.trace(plan.explain_to_string());
684        }
685        plan
686    }
687
688    pub fn gen_optimized_logical_plan_for_stream(
689        mut plan: LogicalPlanRef,
690    ) -> Result<LogicalPlanRef> {
691        let ctx = plan.ctx();
692        let explain_trace = ctx.is_explain_trace();
693
694        if explain_trace {
695            ctx.trace("Begin:");
696            ctx.trace(plan.explain_to_string());
697        }
698
699        // Convert grouping sets at first because other agg rule can't handle grouping sets.
700        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
701        // Remove nodes with constant output.
702        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
703        // Remove project to make common sub-plan sharing easier.
704        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
705
706        // If share plan is disable, we need to remove all the share operator generated by the
707        // binder, e.g. CTE and View. However, we still need to share source to ensure self
708        // source join can return correct result.
709        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
710        if enable_share_plan {
711            // Common sub-plan sharing.
712            plan = plan.common_subplan_sharing();
713            plan = plan.prune_share();
714            if explain_trace {
715                ctx.trace("Common Sub-plan Sharing:");
716                ctx.trace(plan.explain_to_string());
717            }
718        } else {
719            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
720
721            // Replace source to share source.
722            // Perform share source at the beginning so that we can benefit from predicate pushdown
723            // and column pruning for the share operator.
724            plan = ShareSourceRewriter::share_source(plan);
725            if explain_trace {
726                ctx.trace("Share Source:");
727                ctx.trace(plan.explain_to_string());
728            }
729        }
730        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
731        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
732        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
733        // Should be applied before converting table function to project set.
734        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
735        // In order to unnest a table function, we need to convert it into a `project_set` first.
736        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
737
738        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
739
740        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
741        if has_logical_max_one_row(plan.clone()) {
742            // `MaxOneRow` is currently only used for the runtime check of
743            // scalar subqueries, while it's not supported in streaming mode, so
744            // we raise a precise error here.
745            bail!("Scalar subquery might produce more than one row.");
746        }
747
748        // Same to batch plan optimization, this rule shall be applied before
749        // predicate push down
750        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
751
752        // Predicate Push-down
753        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
754
755        if plan.ctx().session_ctx().config().enable_join_ordering() {
756            // Merge inner joins and intermediate filters into multijoin
757            // This rule assumes that filters have already been pushed down near to
758            // their relevant joins.
759            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
760
761            // Reorder multijoin into join tree.
762            if plan
763                .ctx()
764                .session_ctx()
765                .config()
766                .streaming_enable_bushy_join()
767            {
768                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
769            } else {
770                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
771            }
772        }
773
774        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
775        // conditions into a filter above the multijoin.
776        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
777
778        // For stream, push down predicates with now into a left-semi join
779        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
780
781        // Push down the calculation of inputs of join's condition.
782        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
783
784        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
785        // Must push down predicates again after split over window so that OverWindow can be
786        // optimized to TopN.
787        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
788        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
789        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
790
791        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
792        // TODO: better naming of the OptimizationStage
793        // Convert distinct aggregates.
794        plan = if force_split_distinct_agg {
795            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
796        } else {
797            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
798        };
799
800        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
801
802        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
803
804        // Do a final column pruning and predicate pushing down to clean up the plan.
805        plan = Self::column_pruning(plan, explain_trace, &ctx);
806        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
807
808        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
809        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
810
811        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
812
813        #[cfg(debug_assertions)]
814        InputRefValidator.validate(plan.clone());
815
816        ctx.may_store_explain_logical(&plan);
817
818        Ok(plan)
819    }
820
821    pub fn gen_optimized_logical_plan_for_batch(
822        mut plan: LogicalPlanRef,
823    ) -> Result<LogicalPlanRef> {
824        let ctx = plan.ctx();
825        let explain_trace = ctx.is_explain_trace();
826
827        if explain_trace {
828            ctx.trace("Begin:");
829            ctx.trace(plan.explain_to_string());
830        }
831
832        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
833        plan = Self::inline_now_proc_time(plan, &ctx);
834
835        // Convert the dag back to the tree, because we don't support DAG plan for batch.
836        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
837
838        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
839        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
840        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
841        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
842        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
843        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
844        // Table function should be converted into `file_scan` before `project_set`.
845        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
846        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
847        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
848        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
849        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
850        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
851        // In order to unnest a table function, we need to convert it into a `project_set` first.
852        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
853
854        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
855
856        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
857
858        // Filter simplification must be applied before predicate push-down
859        // otherwise the filter for some nodes (e.g., `LogicalScan`)
860        // may not be properly applied.
861        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
862
863        // Predicate Push-down
864        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
865        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
866
867        if plan.ctx().session_ctx().config().enable_join_ordering() {
868            // Merge inner joins and intermediate filters into multijoin
869            // This rule assumes that filters have already been pushed down near to
870            // their relevant joins.
871            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
872
873            // Reorder multijoin into left-deep join tree.
874            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
875        }
876
877        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
878        // conditions into a filter above the multijoin.
879        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
880            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
881            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
882        }
883
884        // Push down the calculation of inputs of join's condition.
885        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
886
887        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
888        // Must push down predicates again after split over window so that OverWindow can be
889        // optimized to TopN.
890        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
891            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
892            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
893        }
894        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
895        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
896
897        // Convert distinct aggregates.
898        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
899
900        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
901
902        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
903
904        plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
905
906        // Do a final column pruning and predicate pushing down to clean up the plan.
907        plan = Self::column_pruning(plan, explain_trace, &ctx);
908        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
909            (#[allow(unused_assignments)]
910            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
911            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
912        }
913
914        // Materialize Iceberg intermediate scans after predicate pushdown and column pruning.
915        // This converts LogicalIcebergIntermediateScan to LogicalIcebergScan with anti-joins
916        // for delete files.
917        plan = plan.optimize_by_rules(&MATERIALIZE_ICEBERG_SCAN)?;
918
919        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
920        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
921
922        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
923
924        // This need to be apply after PROJECT_REMOVE to ensure there is no projection between agg and iceberg scan.
925        plan = plan.optimize_by_rules(&ICEBERG_COUNT_STAR)?;
926
927        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
928
929        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
930
931        plan = plan.optimize_by_rules(&PROJECT_TOP_N_TRANSPOSE)?;
932
933        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
934
935        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
936
937        #[cfg(debug_assertions)]
938        InputRefValidator.validate(plan.clone());
939
940        ctx.may_store_explain_logical(&plan);
941
942        Ok(plan)
943    }
944}