risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Binder, Explain, OptimizerContextRef, Planner};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::collections::{HashMap, HashSet};
111use std::sync::LazyLock;
112
113use risingwave_common::id::ObjectId;
114use risingwave_pb::common::PbObjectType;
115use risingwave_sqlparser::ast::Statement;
116
117use crate::optimizer::plan_node::generic::GenericPlanRef;
118use crate::optimizer::plan_visitor::RelationCollectorVisitor;
119use crate::session::SessionImpl;
120
121pub struct LogicalOptimizer {}
122
123static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
124    OptimizationStage::new(
125        "DAG To Tree",
126        vec![DagToTreeRule::create()],
127        ApplyOrder::TopDown,
128    )
129});
130
131static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
132    OptimizationStage::new(
133        "Convert GENERATE_SERIES Ends With NOW",
134        vec![GenerateSeriesWithNowRule::create()],
135        ApplyOrder::TopDown,
136    )
137});
138
139static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
140    OptimizationStage::new(
141        "Table Function Convert",
142        vec![
143            // Apply file scan rule first
144            TableFunctionToFileScanRule::create(),
145            // Apply internal backfill progress rule first
146            TableFunctionToInternalBackfillProgressRule::create(),
147            // Apply internal source backfill progress rule next
148            TableFunctionToInternalSourceBackfillProgressRule::create(),
149            // Apply internal get channel delta stats rule next
150            TableFunctionToInternalGetChannelDeltaStatsRule::create(),
151            // Apply postgres query rule next
152            TableFunctionToPostgresQueryRule::create(),
153            // Apply mysql query rule next
154            TableFunctionToMySqlQueryRule::create(),
155            // Apply project set rule last
156            TableFunctionToProjectSetRule::create(),
157        ],
158        ApplyOrder::TopDown,
159    )
160});
161
162static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
163    OptimizationStage::new(
164        "Table Function To FileScan",
165        vec![TableFunctionToFileScanRule::create()],
166        ApplyOrder::TopDown,
167    )
168});
169
170static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
171    OptimizationStage::new(
172        "Table Function To PostgresQuery",
173        vec![TableFunctionToPostgresQueryRule::create()],
174        ApplyOrder::TopDown,
175    )
176});
177
178static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
179    OptimizationStage::new(
180        "Table Function To MySQL",
181        vec![TableFunctionToMySqlQueryRule::create()],
182        ApplyOrder::TopDown,
183    )
184});
185
186static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
187    LazyLock::new(|| {
188        OptimizationStage::new(
189            "Table Function To Internal Backfill Progress",
190            vec![TableFunctionToInternalBackfillProgressRule::create()],
191            ApplyOrder::TopDown,
192        )
193    });
194
195static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
196    LazyLock::new(|| {
197        OptimizationStage::new(
198            "Table Function To Internal Source Backfill Progress",
199            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
200            ApplyOrder::TopDown,
201        )
202    });
203
204static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
205    LazyLock::new(|| {
206        OptimizationStage::new(
207            "Table Function To Internal Get Channel Delta Stats",
208            vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
209            ApplyOrder::TopDown,
210        )
211    });
212
213static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
214    OptimizationStage::new(
215        "Values Extract Project",
216        vec![ValuesExtractProjectRule::create()],
217        ApplyOrder::TopDown,
218    )
219});
220
221static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
222    OptimizationStage::new(
223        "Simple Unnesting",
224        vec![
225            // Pull correlated predicates up the algebra tree to unnest simple subquery.
226            PullUpCorrelatedPredicateRule::create(),
227            // Pull correlated project expressions with values to inline scalar subqueries.
228            PullUpCorrelatedProjectValueRule::create(),
229            PullUpCorrelatedPredicateAggRule::create(),
230            // Eliminate max one row
231            MaxOneRowEliminateRule::create(),
232            // Eliminate lateral table-function apply into a unary ProjectSet.
233            ApplyTableFunctionToProjectSetRule::create(),
234            // Convert apply to join.
235            ApplyToJoinRule::create(),
236        ],
237        ApplyOrder::BottomUp,
238    )
239});
240
241static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
242    OptimizationStage::new(
243        "Set Operation Merge",
244        vec![
245            UnionMergeRule::create(),
246            IntersectMergeRule::create(),
247            ExceptMergeRule::create(),
248        ],
249        ApplyOrder::BottomUp,
250    )
251});
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
254    LazyLock::new(|| {
255        OptimizationStage::new(
256            "General Unnesting(Translate Apply)",
257            vec![TranslateApplyRule::create(true)],
258            ApplyOrder::TopDown,
259        )
260    });
261
262static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
263    LazyLock::new(|| {
264        OptimizationStage::new(
265            "General Unnesting(Translate Apply)",
266            vec![TranslateApplyRule::create(false)],
267            ApplyOrder::TopDown,
268        )
269    });
270
271static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
272    OptimizationStage::new(
273        "General Unnesting(Push Down Apply)",
274        vec![
275            ApplyEliminateRule::create(),
276            ApplyAggTransposeRule::create(),
277            ApplyDedupTransposeRule::create(),
278            ApplyFilterTransposeRule::create(),
279            ApplyProjectTransposeRule::create(),
280            ApplyProjectSetTransposeRule::create(),
281            ApplyTopNTransposeRule::create(),
282            ApplyLimitTransposeRule::create(),
283            ApplyJoinTransposeRule::create(),
284            ApplyUnionTransposeRule::create(),
285            ApplyOverWindowTransposeRule::create(),
286            ApplyExpandTransposeRule::create(),
287            ApplyHopWindowTransposeRule::create(),
288            CrossJoinEliminateRule::create(),
289            ApplyShareEliminateRule::create(),
290        ],
291        ApplyOrder::TopDown,
292    )
293});
294
295static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
296    OptimizationStage::new(
297        "To MultiJoin",
298        vec![MergeMultiJoinRule::create()],
299        ApplyOrder::TopDown,
300    )
301});
302
303static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
304    OptimizationStage::new(
305        "Join Ordering".to_owned(),
306        vec![LeftDeepTreeJoinOrderingRule::create()],
307        ApplyOrder::TopDown,
308    )
309});
310
311static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
312    OptimizationStage::new(
313        "Join Ordering".to_owned(),
314        vec![BushyTreeJoinOrderingRule::create()],
315        ApplyOrder::TopDown,
316    )
317});
318
319static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
320    OptimizationStage::new(
321        "Push down filter with now into a left semijoin",
322        vec![
323            SplitNowAndRule::create(),
324            SplitNowOrRule::create(),
325            FilterWithNowToJoinRule::create(),
326        ],
327        ApplyOrder::TopDown,
328    )
329});
330
331static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
332    OptimizationStage::new(
333        "Push down the calculation of inputs of join's condition",
334        vec![PushCalculationOfJoinRule::create()],
335        ApplyOrder::TopDown,
336    )
337});
338
339static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
340    OptimizationStage::new(
341        "Convert Distinct Aggregation",
342        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
343        ApplyOrder::TopDown,
344    )
345});
346
347static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
348    OptimizationStage::new(
349        "Convert Distinct Aggregation",
350        vec![
351            UnionToDistinctRule::create(),
352            DistinctAggRule::create(false),
353        ],
354        ApplyOrder::TopDown,
355    )
356});
357
358static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
359    OptimizationStage::new(
360        "Simplify Aggregation",
361        vec![
362            AggGroupBySimplifyRule::create(),
363            AggCallMergeRule::create(),
364            UnifyFirstLastValueRule::create(),
365        ],
366        ApplyOrder::TopDown,
367    )
368});
369
370static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
371    OptimizationStage::new(
372        "Join Commute".to_owned(),
373        vec![JoinCommuteRule::create()],
374        ApplyOrder::TopDown,
375    )
376});
377
378static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
379    OptimizationStage::new(
380        "Constant Output Operator Remove",
381        vec![EmptyAggRemoveRule::create()],
382        ApplyOrder::TopDown,
383    )
384});
385
386static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
387    OptimizationStage::new(
388        "Project Remove",
389        vec![
390            // merge should be applied before eliminate
391            ProjectMergeRule::create(),
392            ProjectEliminateRule::create(),
393            TrivialProjectToValuesRule::create(),
394            UnionInputValuesMergeRule::create(),
395            JoinProjectTransposeRule::create(),
396            // project-join merge should be applied after merge
397            // eliminate and to values
398            ProjectJoinMergeRule::create(),
399            AggProjectMergeRule::create(),
400        ],
401        ApplyOrder::BottomUp,
402    )
403});
404
405static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
406    OptimizationStage::new(
407        "Split Over Window",
408        vec![OverWindowSplitRule::create()],
409        ApplyOrder::TopDown,
410    )
411});
412
413// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
414// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
415// 2. should be after merge the multiple projects
416static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
417    OptimizationStage::new(
418        "Convert Over Window",
419        vec![
420            ProjectMergeRule::create(),
421            ProjectEliminateRule::create(),
422            TrivialProjectToValuesRule::create(),
423            UnionInputValuesMergeRule::create(),
424            OverWindowToAggAndJoinRule::create(),
425            OverWindowToTopNRule::create(),
426        ],
427        ApplyOrder::TopDown,
428    )
429});
430
431// DataFusion cannot apply `OverWindowToTopNRule`
432static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
433    OptimizationStage::new(
434        "Convert Over Window",
435        vec![
436            ProjectMergeRule::create(),
437            ProjectEliminateRule::create(),
438            TrivialProjectToValuesRule::create(),
439            UnionInputValuesMergeRule::create(),
440            OverWindowToAggAndJoinRule::create(),
441        ],
442        ApplyOrder::TopDown,
443    )
444});
445
446static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
447    OptimizationStage::new(
448        "Merge Over Window",
449        vec![OverWindowMergeRule::create()],
450        ApplyOrder::TopDown,
451    )
452});
453
454static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
455    OptimizationStage::new(
456        "Rewrite Like Expr",
457        vec![RewriteLikeExprRule::create()],
458        ApplyOrder::TopDown,
459    )
460});
461
462static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
463    OptimizationStage::new(
464        "TopN/SimpleAgg on Index",
465        vec![
466            TopNProjectTransposeRule::create(),
467            TopNOnIndexRule::create(),
468            MinMaxOnIndexRule::create(),
469        ],
470        ApplyOrder::TopDown,
471    )
472});
473
474static PROJECT_TOP_N_TRANSPOSE: LazyLock<OptimizationStage> = LazyLock::new(|| {
475    OptimizationStage::new(
476        "Project TopN Transpose",
477        vec![ProjectTopNTransposeRule::create()],
478        ApplyOrder::TopDown,
479    )
480});
481
482static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
483    OptimizationStage::new(
484        "Void always-false filter's downstream",
485        vec![AlwaysFalseFilterRule::create()],
486        ApplyOrder::TopDown,
487    )
488});
489
490static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
491    OptimizationStage::new(
492        "Push Down Limit",
493        vec![LimitPushDownRule::create()],
494        ApplyOrder::TopDown,
495    )
496});
497
498static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
499    OptimizationStage::new(
500        "Pull Up Hop",
501        vec![PullUpHopRule::create()],
502        ApplyOrder::BottomUp,
503    )
504});
505
506static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
507    OptimizationStage::new(
508        "Set Operation To Join",
509        vec![
510            IntersectToSemiJoinRule::create(),
511            ExceptToAntiJoinRule::create(),
512        ],
513        ApplyOrder::BottomUp,
514    )
515});
516
517static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
518    OptimizationStage::new(
519        "Grouping Sets",
520        vec![
521            GroupingSetsToExpandRule::create(),
522            ExpandToProjectRule::create(),
523        ],
524        ApplyOrder::TopDown,
525    )
526});
527
528static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
529    OptimizationStage::new(
530        "Common Sub Expression Extract",
531        vec![CommonSubExprExtractRule::create()],
532        ApplyOrder::TopDown,
533    )
534});
535
536static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
537    OptimizationStage::new(
538        "Logical Filter Expression Simplify",
539        vec![LogicalFilterExpressionSimplifyRule::create()],
540        ApplyOrder::TopDown,
541    )
542});
543
544static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
545    OptimizationStage::new(
546        "Rewrite Source For Batch",
547        vec![
548            SourceToKafkaScanRule::create(),
549            // For Iceberg, we use the intermediate scan to defer metadata reading
550            // until after predicate pushdown and column pruning
551            SourceToIcebergIntermediateScanRule::create(),
552        ],
553        ApplyOrder::TopDown,
554    )
555});
556
557static MATERIALIZE_ICEBERG_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
558    OptimizationStage::new(
559        "Materialize Iceberg Scan",
560        vec![IcebergIntermediateScanRule::create()],
561        ApplyOrder::TopDown,
562    )
563});
564
565static ICEBERG_COUNT_STAR: LazyLock<OptimizationStage> = LazyLock::new(|| {
566    OptimizationStage::new(
567        "Iceberg Count Star Optimization",
568        vec![IcebergCountStarRule::create()],
569        ApplyOrder::BottomUp,
570    )
571});
572
573static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
574    OptimizationStage::new(
575        "TopN to Vector Search",
576        vec![TopNToVectorSearchRule::create()],
577        ApplyOrder::BottomUp,
578    )
579});
580
581static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
582    LazyLock::new(|| {
583        OptimizationStage::new(
584            "Correlated TopN to Vector Search",
585            vec![CorrelatedTopNToVectorSearchRule::create(true)],
586            ApplyOrder::BottomUp,
587        )
588    });
589
590static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
591    LazyLock::new(|| {
592        OptimizationStage::new(
593            "Correlated TopN to Vector Search",
594            vec![CorrelatedTopNToVectorSearchRule::create(false)],
595            ApplyOrder::BottomUp,
596        )
597    });
598
599static BATCH_MV_SELECTION: LazyLock<OptimizationStage> = LazyLock::new(|| {
600    OptimizationStage::new(
601        "Batch Mv Selection",
602        vec![MvSelectionRule::create()],
603        ApplyOrder::TopDown,
604    )
605});
606
607impl LogicalOptimizer {
608    pub fn predicate_pushdown(
609        plan: LogicalPlanRef,
610        explain_trace: bool,
611        ctx: &OptimizerContextRef,
612    ) -> LogicalPlanRef {
613        let plan = plan.predicate_pushdown(
614            Condition::true_cond(),
615            &mut PredicatePushdownContext::new(plan.clone()),
616        );
617        if explain_trace {
618            ctx.trace("Predicate Push Down:");
619            ctx.trace(plan.explain_to_string());
620        }
621        plan
622    }
623
624    pub fn subquery_unnesting(
625        mut plan: LogicalPlanRef,
626        enable_share_plan: bool,
627        explain_trace: bool,
628        ctx: &OptimizerContextRef,
629    ) -> Result<LogicalPlanRef> {
630        // Bail our if no apply operators.
631        if !has_logical_apply(plan.clone()) {
632            return Ok(plan);
633        }
634        // Simple Unnesting.
635        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
636        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
637        // Predicate push down before translate apply, because we need to calculate the domain
638        // and predicate push down can reduce the size of domain.
639        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
640        // In order to unnest values with correlated input ref, we need to extract project first.
641        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
642        // General Unnesting.
643        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
644        // join.
645        plan = if enable_share_plan {
646            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
647        } else {
648            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
649        };
650        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
651
652        // Check if all `Apply`s are eliminated and the subquery is unnested.
653        plan.check_apply_elimination()?;
654
655        Ok(plan)
656    }
657
658    pub fn column_pruning(
659        mut plan: LogicalPlanRef,
660        explain_trace: bool,
661        ctx: &OptimizerContextRef,
662    ) -> LogicalPlanRef {
663        let required_cols = (0..plan.schema().len()).collect_vec();
664        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
665        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
666        // Column pruning may introduce additional projects, and filter can be pushed again.
667        if explain_trace {
668            ctx.trace("Prune Columns:");
669            ctx.trace(plan.explain_to_string());
670        }
671
672        if column_pruning_ctx.need_second_round() {
673            // Second round of column pruning and reuse the column pruning context.
674            // Try to replace original share operator with the new one.
675            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
676            if explain_trace {
677                ctx.trace("Prune Columns (For DAG):");
678                ctx.trace(plan.explain_to_string());
679            }
680        }
681        plan
682    }
683
684    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
685        // If now() and proctime() are not found, bail out.
686        let mut v = NowProcTimeFinder::default();
687        plan.visit_exprs_recursive(&mut v);
688        if !v.has() {
689            return plan;
690        }
691
692        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
693
694        let plan = plan.rewrite_exprs_recursive(&mut v);
695
696        if ctx.is_explain_trace() {
697            ctx.trace("Inline Now and ProcTime:");
698            ctx.trace(plan.explain_to_string());
699        }
700        plan
701    }
702
703    pub fn gen_optimized_logical_plan_for_stream(
704        mut plan: LogicalPlanRef,
705    ) -> Result<LogicalPlanRef> {
706        let ctx = plan.ctx();
707        let explain_trace = ctx.is_explain_trace();
708
709        if explain_trace {
710            ctx.trace("Begin:");
711            ctx.trace(plan.explain_to_string());
712        }
713
714        // Convert grouping sets at first because other agg rule can't handle grouping sets.
715        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
716        // Remove nodes with constant output.
717        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
718        // Remove project to make common sub-plan sharing easier.
719        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
720
721        // If share plan is disable, we need to remove all the share operator generated by the
722        // binder, e.g. CTE and View. However, we still need to share source to ensure self
723        // source join can return correct result.
724        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
725        if enable_share_plan {
726            // Common sub-plan sharing.
727            plan = plan.common_subplan_sharing();
728            plan = plan.prune_share();
729            if explain_trace {
730                ctx.trace("Common Sub-plan Sharing:");
731                ctx.trace(plan.explain_to_string());
732            }
733        } else {
734            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
735
736            // Replace source to share source.
737            // Perform share source at the beginning so that we can benefit from predicate pushdown
738            // and column pruning for the share operator.
739            plan = ShareSourceRewriter::share_source(plan);
740            if explain_trace {
741                ctx.trace("Share Source:");
742                ctx.trace(plan.explain_to_string());
743            }
744        }
745        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
746        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
747        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
748        // Should be applied before converting table function to project set.
749        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
750        // In order to unnest a table function, we need to convert it into a `project_set` first.
751        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
752
753        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
754
755        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
756        if has_logical_max_one_row(plan.clone()) {
757            // `MaxOneRow` is currently only used for the runtime check of
758            // scalar subqueries, while it's not supported in streaming mode, so
759            // we raise a precise error here.
760            bail!("Scalar subquery might produce more than one row.");
761        }
762
763        // Same to batch plan optimization, this rule shall be applied before
764        // predicate push down
765        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
766
767        // Predicate Push-down
768        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
769
770        if plan.ctx().session_ctx().config().enable_join_ordering() {
771            // Merge inner joins and intermediate filters into multijoin
772            // This rule assumes that filters have already been pushed down near to
773            // their relevant joins.
774            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
775
776            // Reorder multijoin into join tree.
777            if plan
778                .ctx()
779                .session_ctx()
780                .config()
781                .streaming_enable_bushy_join()
782            {
783                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
784            } else {
785                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
786            }
787        }
788
789        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
790        // conditions into a filter above the multijoin.
791        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
792
793        // For stream, push down predicates with now into a left-semi join
794        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
795
796        // Push down the calculation of inputs of join's condition.
797        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
798
799        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
800        // Must push down predicates again after split over window so that OverWindow can be
801        // optimized to TopN.
802        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
803        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
804        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
805
806        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
807        // TODO: better naming of the OptimizationStage
808        // Convert distinct aggregates.
809        plan = if force_split_distinct_agg {
810            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
811        } else {
812            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
813        };
814
815        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
816
817        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
818
819        // Do a final column pruning and predicate pushing down to clean up the plan.
820        plan = Self::column_pruning(plan, explain_trace, &ctx);
821        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
822
823        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
824        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
825
826        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
827
828        #[cfg(debug_assertions)]
829        InputRefValidator.validate(plan.clone());
830
831        ctx.may_store_explain_logical(&plan);
832
833        Ok(plan)
834    }
835
836    pub fn gen_optimized_logical_plan_for_batch(
837        mut plan: LogicalPlanRef,
838    ) -> Result<LogicalPlanRef> {
839        let ctx = plan.ctx();
840        let explain_trace = ctx.is_explain_trace();
841
842        if explain_trace {
843            ctx.trace("Begin:");
844            ctx.trace(plan.explain_to_string());
845        }
846
847        if ctx.session_ctx().config().enable_mv_selection() {
848            let query_relations =
849                RelationCollectorVisitor::collect_with(HashSet::new(), plan.clone());
850            Self::register_batch_mview_candidates(ctx.session_ctx(), &ctx, &query_relations);
851            plan = plan.optimize_by_rules(&BATCH_MV_SELECTION)?;
852        }
853
854        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
855        plan = Self::inline_now_proc_time(plan, &ctx);
856
857        // Convert the dag back to the tree, because we don't support DAG plan for batch.
858        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
859
860        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
861        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
862        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
863        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
864        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
865        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
866        // Table function should be converted into `file_scan` before `project_set`.
867        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
868        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
869        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
870        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
871        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
872        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
873        // In order to unnest a table function, we need to convert it into a `project_set` first.
874        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
875
876        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
877
878        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
879
880        // Filter simplification must be applied before predicate push-down
881        // otherwise the filter for some nodes (e.g., `LogicalScan`)
882        // may not be properly applied.
883        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
884
885        // Predicate Push-down
886        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
887        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
888
889        if plan.ctx().session_ctx().config().enable_join_ordering() {
890            // Merge inner joins and intermediate filters into multijoin
891            // This rule assumes that filters have already been pushed down near to
892            // their relevant joins.
893            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
894
895            // Reorder multijoin into left-deep join tree.
896            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
897        }
898
899        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
900        // conditions into a filter above the multijoin.
901        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
902            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
903            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
904        }
905
906        // Push down the calculation of inputs of join's condition.
907        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
908
909        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
910        // Must push down predicates again after split over window so that OverWindow can be
911        // optimized to TopN.
912        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
913            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
914            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
915        }
916        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
917        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
918
919        // Convert distinct aggregates.
920        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
921
922        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
923
924        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
925
926        plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
927
928        // Do a final column pruning and predicate pushing down to clean up the plan.
929        plan = Self::column_pruning(plan, explain_trace, &ctx);
930        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
931            (#[allow(unused_assignments)]
932            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
933            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
934        }
935
936        // Materialize Iceberg intermediate scans after predicate pushdown and column pruning.
937        // This converts LogicalIcebergIntermediateScan to LogicalIcebergScan with anti-joins
938        // for delete files.
939        plan = plan.optimize_by_rules(&MATERIALIZE_ICEBERG_SCAN)?;
940
941        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
942        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
943
944        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
945
946        // This need to be apply after PROJECT_REMOVE to ensure there is no projection between agg and iceberg scan.
947        plan = plan.optimize_by_rules(&ICEBERG_COUNT_STAR)?;
948
949        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
950
951        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
952
953        plan = plan.optimize_by_rules(&PROJECT_TOP_N_TRANSPOSE)?;
954
955        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
956
957        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
958
959        #[cfg(debug_assertions)]
960        InputRefValidator.validate(plan.clone());
961
962        ctx.may_store_explain_logical(&plan);
963
964        Ok(plan)
965    }
966
967    fn register_batch_mview_candidates(
968        session: &SessionImpl,
969        context: &OptimizerContextRef,
970        query_relations: &HashSet<ObjectId>,
971    ) {
972        let catalog_reader = session.env().catalog_reader().read_guard();
973        let user_reader = session.env().user_info_reader().read_guard();
974        let Some(current_user) = user_reader.get_user_by_name(&session.user_name()) else {
975            return;
976        };
977        let mut mv_dependencies: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
978        let mut mviews_with_source_dependency: HashSet<ObjectId> = HashSet::new();
979        for dep in catalog_reader.iter_object_dependencies() {
980            mv_dependencies
981                .entry(dep.object_id)
982                .or_default()
983                .insert(dep.referenced_object_id);
984            if dep.referenced_object_type == PbObjectType::Source {
985                mviews_with_source_dependency.insert(dep.object_id);
986            }
987        }
988        let db_name = session.database();
989        let Ok(schemas) = catalog_reader.iter_schemas(&db_name) else {
990            return;
991        };
992
993        for schema in schemas {
994            for mv in schema.iter_created_mvs_with_acl(current_user) {
995                let mv_object_id = mv.id().as_object_id();
996                if mviews_with_source_dependency.contains(&mv_object_id) {
997                    continue;
998                }
999                let is_subset = mv_dependencies
1000                    .get(&mv_object_id)
1001                    .is_some_and(|deps| deps.is_subset(query_relations));
1002                if !is_subset {
1003                    continue;
1004                }
1005                let Ok(stmt) = mv.create_sql_ast() else {
1006                    continue;
1007                };
1008                let Statement::CreateView {
1009                    materialized: true,
1010                    query,
1011                    ..
1012                } = stmt
1013                else {
1014                    continue;
1015                };
1016                let mut binder = Binder::new_for_batch(session);
1017                let Ok(bound_query) = binder.bind_query(&query) else {
1018                    continue;
1019                };
1020                let mut planner = Planner::new_for_batch_dql(context.clone());
1021                let Ok(plan_root) = planner.plan_query(bound_query) else {
1022                    continue;
1023                };
1024                context.add_batch_mview_candidate(mv.clone(), plan_root.plan.clone());
1025            }
1026        }
1027    }
1028}