risingwave_frontend/optimizer/
logical_optimization.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32    HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Binder, Explain, OptimizerContextRef, Planner};
37
38impl<C: ConventionMarker> PlanRef<C> {
39    fn optimize_by_rules_inner(
40        self,
41        heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42        stage_name: &str,
43    ) -> Result<PlanRef<C>> {
44        let ctx = self.ctx();
45
46        let result = heuristic_optimizer.optimize(self);
47        let stats = heuristic_optimizer.get_stats();
48
49        if ctx.is_explain_trace() && stats.has_applied_rule() {
50            ctx.trace(format!("{}:", stage_name));
51            ctx.trace(format!("{}", stats));
52            ctx.trace(match &result {
53                Ok(plan) => plan.explain_to_string(),
54                Err(error) => format!("Optimization failed: {}", error.as_report()),
55            });
56        }
57        ctx.add_rule_applied(stats.total_applied());
58
59        result
60    }
61
62    pub(crate) fn optimize_by_rules(
63        self,
64        OptimizationStage {
65            stage_name,
66            rules,
67            apply_order,
68        }: &OptimizationStage<C>,
69    ) -> Result<PlanRef<C>> {
70        self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71    }
72
73    pub(crate) fn optimize_by_rules_until_fix_point(
74        mut self,
75        OptimizationStage {
76            stage_name,
77            rules,
78            apply_order,
79        }: &OptimizationStage<C>,
80    ) -> Result<PlanRef<C>> {
81        loop {
82            let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83            self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84            if !heuristic_optimizer.get_stats().has_applied_rule() {
85                return Ok(self);
86            }
87        }
88    }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92    stage_name: String,
93    rules: Vec<BoxedRule<C>>,
94    apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98    pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99    where
100        S: Into<String>,
101    {
102        OptimizationStage {
103            stage_name: name.into(),
104            rules,
105            apply_order,
106        }
107    }
108}
109
110use std::collections::{HashMap, HashSet};
111use std::sync::LazyLock;
112
113use risingwave_common::id::ObjectId;
114use risingwave_pb::common::PbObjectType;
115use risingwave_sqlparser::ast::Statement;
116
117use crate::optimizer::plan_node::generic::GenericPlanRef;
118use crate::optimizer::plan_visitor::RelationCollectorVisitor;
119use crate::session::SessionImpl;
120
121pub struct LogicalOptimizer {}
122
123static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
124    OptimizationStage::new(
125        "DAG To Tree",
126        vec![DagToTreeRule::create()],
127        ApplyOrder::TopDown,
128    )
129});
130
131static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
132    OptimizationStage::new(
133        "Convert GENERATE_SERIES Ends With NOW",
134        vec![GenerateSeriesWithNowRule::create()],
135        ApplyOrder::TopDown,
136    )
137});
138
139static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
140    OptimizationStage::new(
141        "Table Function Convert",
142        vec![
143            // Apply file scan rule first
144            TableFunctionToFileScanRule::create(),
145            // Apply internal backfill progress rule first
146            TableFunctionToInternalBackfillProgressRule::create(),
147            // Apply internal source backfill progress rule next
148            TableFunctionToInternalSourceBackfillProgressRule::create(),
149            // Apply internal get channel delta stats rule next
150            TableFunctionToInternalGetChannelDeltaStatsRule::create(),
151            // Apply postgres query rule next
152            TableFunctionToPostgresQueryRule::create(),
153            // Apply mysql query rule next
154            TableFunctionToMySqlQueryRule::create(),
155            // Apply project set rule last
156            TableFunctionToProjectSetRule::create(),
157        ],
158        ApplyOrder::TopDown,
159    )
160});
161
162static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
163    OptimizationStage::new(
164        "Table Function To FileScan",
165        vec![TableFunctionToFileScanRule::create()],
166        ApplyOrder::TopDown,
167    )
168});
169
170static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
171    OptimizationStage::new(
172        "Table Function To PostgresQuery",
173        vec![TableFunctionToPostgresQueryRule::create()],
174        ApplyOrder::TopDown,
175    )
176});
177
178static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
179    OptimizationStage::new(
180        "Table Function To MySQL",
181        vec![TableFunctionToMySqlQueryRule::create()],
182        ApplyOrder::TopDown,
183    )
184});
185
186static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
187    LazyLock::new(|| {
188        OptimizationStage::new(
189            "Table Function To Internal Backfill Progress",
190            vec![TableFunctionToInternalBackfillProgressRule::create()],
191            ApplyOrder::TopDown,
192        )
193    });
194
195static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
196    LazyLock::new(|| {
197        OptimizationStage::new(
198            "Table Function To Internal Source Backfill Progress",
199            vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
200            ApplyOrder::TopDown,
201        )
202    });
203
204static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
205    LazyLock::new(|| {
206        OptimizationStage::new(
207            "Table Function To Internal Get Channel Delta Stats",
208            vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
209            ApplyOrder::TopDown,
210        )
211    });
212
213static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
214    OptimizationStage::new(
215        "Values Extract Project",
216        vec![ValuesExtractProjectRule::create()],
217        ApplyOrder::TopDown,
218    )
219});
220
221static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
222    OptimizationStage::new(
223        "Simple Unnesting",
224        vec![
225            // Pull correlated predicates up the algebra tree to unnest simple subquery.
226            PullUpCorrelatedPredicateRule::create(),
227            // Pull correlated project expressions with values to inline scalar subqueries.
228            PullUpCorrelatedProjectValueRule::create(),
229            PullUpCorrelatedPredicateAggRule::create(),
230            // Eliminate max one row
231            MaxOneRowEliminateRule::create(),
232            // Eliminate lateral table-function apply into a unary ProjectSet.
233            ApplyTableFunctionToProjectSetRule::create(),
234            // Convert apply to join.
235            ApplyToJoinRule::create(),
236        ],
237        ApplyOrder::BottomUp,
238    )
239});
240
241static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
242    OptimizationStage::new(
243        "Set Operation Merge",
244        vec![
245            UnionMergeRule::create(),
246            IntersectMergeRule::create(),
247            ExceptMergeRule::create(),
248        ],
249        ApplyOrder::BottomUp,
250    )
251});
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
254    LazyLock::new(|| {
255        OptimizationStage::new(
256            "General Unnesting(Translate Apply)",
257            vec![TranslateApplyRule::create(true)],
258            ApplyOrder::TopDown,
259        )
260    });
261
262static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
263    LazyLock::new(|| {
264        OptimizationStage::new(
265            "General Unnesting(Translate Apply)",
266            vec![TranslateApplyRule::create(false)],
267            ApplyOrder::TopDown,
268        )
269    });
270
271static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
272    OptimizationStage::new(
273        "General Unnesting(Push Down Apply)",
274        vec![
275            ApplyEliminateRule::create(),
276            ApplyAggTransposeRule::create(),
277            ApplyDedupTransposeRule::create(),
278            ApplyFilterTransposeRule::create(),
279            ApplyProjectTransposeRule::create(),
280            ApplyProjectSetTransposeRule::create(),
281            ApplyTopNTransposeRule::create(),
282            ApplyLimitTransposeRule::create(),
283            ApplyJoinTransposeRule::create(),
284            ApplyUnionTransposeRule::create(),
285            ApplyOverWindowTransposeRule::create(),
286            ApplyExpandTransposeRule::create(),
287            ApplyHopWindowTransposeRule::create(),
288            CrossJoinEliminateRule::create(),
289            ApplyShareEliminateRule::create(),
290        ],
291        ApplyOrder::TopDown,
292    )
293});
294
295static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
296    OptimizationStage::new(
297        "To MultiJoin",
298        vec![MergeMultiJoinRule::create()],
299        ApplyOrder::TopDown,
300    )
301});
302
303static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
304    OptimizationStage::new(
305        "Join Ordering".to_owned(),
306        vec![LeftDeepTreeJoinOrderingRule::create()],
307        ApplyOrder::TopDown,
308    )
309});
310
311static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
312    OptimizationStage::new(
313        "Join Ordering".to_owned(),
314        vec![BushyTreeJoinOrderingRule::create()],
315        ApplyOrder::TopDown,
316    )
317});
318
319static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
320    OptimizationStage::new(
321        "Push down filter with now into a left semijoin",
322        vec![
323            SplitNowAndRule::create(),
324            SplitNowOrRule::create(),
325            FilterWithNowToJoinRule::create(),
326        ],
327        ApplyOrder::TopDown,
328    )
329});
330
331static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
332    OptimizationStage::new(
333        "Push down the calculation of inputs of join's condition",
334        vec![PushCalculationOfJoinRule::create()],
335        ApplyOrder::TopDown,
336    )
337});
338
339static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
340    OptimizationStage::new(
341        "Convert Distinct Aggregation",
342        vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
343        ApplyOrder::TopDown,
344    )
345});
346
347static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
348    OptimizationStage::new(
349        "Convert Distinct Aggregation",
350        vec![
351            UnionToDistinctRule::create(),
352            DistinctAggRule::create(false),
353        ],
354        ApplyOrder::TopDown,
355    )
356});
357
358static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
359    OptimizationStage::new(
360        "Simplify Aggregation",
361        vec![
362            AggGroupBySimplifyRule::create(),
363            AggCallMergeRule::create(),
364            UnifyFirstLastValueRule::create(),
365        ],
366        ApplyOrder::TopDown,
367    )
368});
369
370static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
371    OptimizationStage::new(
372        "Join Commute".to_owned(),
373        vec![JoinCommuteRule::create()],
374        ApplyOrder::TopDown,
375    )
376});
377
378static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
379    OptimizationStage::new(
380        "Constant Output Operator Remove",
381        vec![EmptyAggRemoveRule::create()],
382        ApplyOrder::TopDown,
383    )
384});
385
386static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
387    OptimizationStage::new(
388        "Project Remove",
389        vec![
390            // merge should be applied before eliminate
391            ProjectMergeRule::create(),
392            ProjectEliminateRule::create(),
393            TrivialProjectToValuesRule::create(),
394            UnionInputValuesMergeRule::create(),
395            JoinProjectTransposeRule::create(),
396            // project-join merge should be applied after merge
397            // eliminate and to values
398            ProjectJoinMergeRule::create(),
399            AggProjectMergeRule::create(),
400        ],
401        ApplyOrder::BottomUp,
402    )
403});
404
405static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
406    OptimizationStage::new(
407        "Split Over Window",
408        vec![OverWindowSplitRule::create()],
409        ApplyOrder::TopDown,
410    )
411});
412
413// the `OverWindowToTopNRule` need to match the pattern of Proj-Filter-OverWindow so it is
414// 1. conflict with `ProjectJoinMergeRule`, `AggProjectMergeRule` or other rules
415// 2. should be after merge the multiple projects
416static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
417    OptimizationStage::new(
418        "Convert Over Window",
419        vec![
420            ProjectMergeRule::create(),
421            ProjectEliminateRule::create(),
422            TrivialProjectToValuesRule::create(),
423            UnionInputValuesMergeRule::create(),
424            OverWindowToAggAndJoinRule::create(),
425            OverWindowToTopNRule::create(),
426        ],
427        ApplyOrder::TopDown,
428    )
429});
430
431// DataFusion cannot apply `OverWindowToTopNRule`
432static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
433    OptimizationStage::new(
434        "Convert Over Window",
435        vec![
436            ProjectMergeRule::create(),
437            ProjectEliminateRule::create(),
438            TrivialProjectToValuesRule::create(),
439            UnionInputValuesMergeRule::create(),
440            OverWindowToAggAndJoinRule::create(),
441        ],
442        ApplyOrder::TopDown,
443    )
444});
445
446static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
447    OptimizationStage::new(
448        "Merge Over Window",
449        vec![OverWindowMergeRule::create()],
450        ApplyOrder::TopDown,
451    )
452});
453
454static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
455    OptimizationStage::new(
456        "Rewrite Like Expr",
457        vec![RewriteLikeExprRule::create()],
458        ApplyOrder::TopDown,
459    )
460});
461
462static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
463    OptimizationStage::new(
464        "TopN/SimpleAgg on Index",
465        vec![
466            TopNProjectTransposeRule::create(),
467            TopNOnIndexRule::create(),
468            MinMaxOnIndexRule::create(),
469        ],
470        ApplyOrder::TopDown,
471    )
472});
473
474static PROJECT_TOP_N_TRANSPOSE: LazyLock<OptimizationStage> = LazyLock::new(|| {
475    OptimizationStage::new(
476        "Project TopN Transpose",
477        vec![ProjectTopNTransposeRule::create()],
478        ApplyOrder::TopDown,
479    )
480});
481
482static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
483    OptimizationStage::new(
484        "Void always-false filter's downstream",
485        vec![AlwaysFalseFilterRule::create()],
486        ApplyOrder::TopDown,
487    )
488});
489
490static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
491    OptimizationStage::new(
492        "Push Down Limit",
493        vec![LimitPushDownRule::create()],
494        ApplyOrder::TopDown,
495    )
496});
497
498static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
499    OptimizationStage::new(
500        "Pull Up Hop",
501        vec![PullUpHopRule::create()],
502        ApplyOrder::BottomUp,
503    )
504});
505
506static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
507    OptimizationStage::new(
508        "Set Operation To Join",
509        vec![
510            IntersectToSemiJoinRule::create(),
511            ExceptToAntiJoinRule::create(),
512        ],
513        ApplyOrder::BottomUp,
514    )
515});
516
517static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
518    OptimizationStage::new(
519        "Grouping Sets",
520        vec![
521            GroupingSetsToExpandRule::create(),
522            ExpandToProjectRule::create(),
523        ],
524        ApplyOrder::TopDown,
525    )
526});
527
528static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
529    OptimizationStage::new(
530        "Common Sub Expression Extract",
531        vec![CommonSubExprExtractRule::create()],
532        ApplyOrder::TopDown,
533    )
534});
535
536static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
537    OptimizationStage::new(
538        "Logical Filter Expression Simplify",
539        vec![LogicalFilterExpressionSimplifyRule::create()],
540        ApplyOrder::TopDown,
541    )
542});
543
544static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
545    OptimizationStage::new(
546        "Rewrite Source For Batch",
547        vec![SourceToKafkaScanRule::create()],
548        ApplyOrder::TopDown,
549    )
550});
551
552static MATERIALIZE_ICEBERG_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
553    OptimizationStage::new(
554        "Materialize Iceberg Scan",
555        vec![
556            // When storage mode is auto, may rewrite Iceberg intermediate scan to Hummock scan based on statistics.
557            IcebergEngineStorageSelectionRule::create(),
558            // This converts LogicalIcebergIntermediateScan to LogicalIcebergScan with anti-joins
559            // for delete files.
560            IcebergIntermediateScanRule::create(),
561        ],
562        ApplyOrder::TopDown,
563    )
564});
565
566static ICEBERG_COUNT_STAR: LazyLock<OptimizationStage> = LazyLock::new(|| {
567    OptimizationStage::new(
568        "Iceberg Count Star Optimization",
569        vec![IcebergCountStarRule::create()],
570        ApplyOrder::BottomUp,
571    )
572});
573
574static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
575    OptimizationStage::new(
576        "TopN to Vector Search",
577        vec![TopNToVectorSearchRule::create()],
578        ApplyOrder::BottomUp,
579    )
580});
581
582static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
583    LazyLock::new(|| {
584        OptimizationStage::new(
585            "Correlated TopN to Vector Search",
586            vec![CorrelatedTopNToVectorSearchRule::create(true)],
587            ApplyOrder::BottomUp,
588        )
589    });
590
591static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
592    LazyLock::new(|| {
593        OptimizationStage::new(
594            "Correlated TopN to Vector Search",
595            vec![CorrelatedTopNToVectorSearchRule::create(false)],
596            ApplyOrder::BottomUp,
597        )
598    });
599
600static BATCH_MV_SELECTION: LazyLock<OptimizationStage> = LazyLock::new(|| {
601    OptimizationStage::new(
602        "Batch Mv Selection",
603        vec![MvSelectionRule::create()],
604        ApplyOrder::TopDown,
605    )
606});
607
608impl LogicalOptimizer {
609    pub fn predicate_pushdown(
610        plan: LogicalPlanRef,
611        explain_trace: bool,
612        ctx: &OptimizerContextRef,
613    ) -> LogicalPlanRef {
614        let plan = plan.predicate_pushdown(
615            Condition::true_cond(),
616            &mut PredicatePushdownContext::new(plan.clone()),
617        );
618        if explain_trace {
619            ctx.trace("Predicate Push Down:");
620            ctx.trace(plan.explain_to_string());
621        }
622        plan
623    }
624
625    pub fn subquery_unnesting(
626        mut plan: LogicalPlanRef,
627        enable_share_plan: bool,
628        explain_trace: bool,
629        ctx: &OptimizerContextRef,
630    ) -> Result<LogicalPlanRef> {
631        // Bail our if no apply operators.
632        if !has_logical_apply(plan.clone()) {
633            return Ok(plan);
634        }
635        // Simple Unnesting.
636        plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
637        debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
638        // Predicate push down before translate apply, because we need to calculate the domain
639        // and predicate push down can reduce the size of domain.
640        plan = Self::predicate_pushdown(plan, explain_trace, ctx);
641        // In order to unnest values with correlated input ref, we need to extract project first.
642        plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
643        // General Unnesting.
644        // Translate Apply, push Apply down the plan and finally replace Apply with regular inner
645        // join.
646        plan = if enable_share_plan {
647            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
648        } else {
649            plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
650        };
651        plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
652
653        // Check if all `Apply`s are eliminated and the subquery is unnested.
654        plan.check_apply_elimination()?;
655
656        Ok(plan)
657    }
658
659    pub fn column_pruning(
660        mut plan: LogicalPlanRef,
661        explain_trace: bool,
662        ctx: &OptimizerContextRef,
663    ) -> LogicalPlanRef {
664        let required_cols = (0..plan.schema().len()).collect_vec();
665        let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
666        plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
667        // Column pruning may introduce additional projects, and filter can be pushed again.
668        if explain_trace {
669            ctx.trace("Prune Columns:");
670            ctx.trace(plan.explain_to_string());
671        }
672
673        if column_pruning_ctx.need_second_round() {
674            // Second round of column pruning and reuse the column pruning context.
675            // Try to replace original share operator with the new one.
676            plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
677            if explain_trace {
678                ctx.trace("Prune Columns (For DAG):");
679                ctx.trace(plan.explain_to_string());
680            }
681        }
682        plan
683    }
684
685    pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
686        // If now() and proctime() are not found, bail out.
687        let mut v = NowProcTimeFinder::default();
688        plan.visit_exprs_recursive(&mut v);
689        if !v.has() {
690            return plan;
691        }
692
693        let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
694
695        let plan = plan.rewrite_exprs_recursive(&mut v);
696
697        if ctx.is_explain_trace() {
698            ctx.trace("Inline Now and ProcTime:");
699            ctx.trace(plan.explain_to_string());
700        }
701        plan
702    }
703
704    pub fn gen_optimized_logical_plan_for_stream(
705        mut plan: LogicalPlanRef,
706    ) -> Result<LogicalPlanRef> {
707        let ctx = plan.ctx();
708        let explain_trace = ctx.is_explain_trace();
709
710        if explain_trace {
711            ctx.trace("Begin:");
712            ctx.trace(plan.explain_to_string());
713        }
714
715        // Convert grouping sets at first because other agg rule can't handle grouping sets.
716        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
717        // Remove nodes with constant output.
718        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
719        // Remove project to make common sub-plan sharing easier.
720        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
721
722        // If share plan is disable, we need to remove all the share operator generated by the
723        // binder, e.g. CTE and View. However, we still need to share source to ensure self
724        // source join can return correct result.
725        let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
726        if enable_share_plan {
727            // Common sub-plan sharing.
728            plan = plan.common_subplan_sharing();
729            plan = plan.prune_share();
730            if explain_trace {
731                ctx.trace("Common Sub-plan Sharing:");
732                ctx.trace(plan.explain_to_string());
733            }
734        } else {
735            plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
736
737            // Replace source to share source.
738            // Perform share source at the beginning so that we can benefit from predicate pushdown
739            // and column pruning for the share operator.
740            plan = ShareSourceRewriter::share_source(plan);
741            if explain_trace {
742                ctx.trace("Share Source:");
743                ctx.trace(plan.explain_to_string());
744            }
745        }
746        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
747        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
748        // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
749        // Should be applied before converting table function to project set.
750        plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
751        // In order to unnest a table function, we need to convert it into a `project_set` first.
752        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
753
754        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
755
756        plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
757        if has_logical_max_one_row(plan.clone()) {
758            // `MaxOneRow` is currently only used for the runtime check of
759            // scalar subqueries, while it's not supported in streaming mode, so
760            // we raise a precise error here.
761            bail!("Scalar subquery might produce more than one row.");
762        }
763
764        // Same to batch plan optimization, this rule shall be applied before
765        // predicate push down
766        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
767
768        // Predicate Push-down
769        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
770
771        if plan.ctx().session_ctx().config().enable_join_ordering() {
772            // Merge inner joins and intermediate filters into multijoin
773            // This rule assumes that filters have already been pushed down near to
774            // their relevant joins.
775            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
776
777            // Reorder multijoin into join tree.
778            if plan
779                .ctx()
780                .session_ctx()
781                .config()
782                .streaming_enable_bushy_join()
783            {
784                plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
785            } else {
786                plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
787            }
788        }
789
790        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
791        // conditions into a filter above the multijoin.
792        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
793
794        // For stream, push down predicates with now into a left-semi join
795        plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
796
797        // Push down the calculation of inputs of join's condition.
798        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
799
800        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
801        // Must push down predicates again after split over window so that OverWindow can be
802        // optimized to TopN.
803        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
804        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
805        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
806
807        let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
808        // TODO: better naming of the OptimizationStage
809        // Convert distinct aggregates.
810        plan = if force_split_distinct_agg {
811            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
812        } else {
813            plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
814        };
815
816        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
817
818        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
819
820        // Do a final column pruning and predicate pushing down to clean up the plan.
821        plan = Self::column_pruning(plan, explain_trace, &ctx);
822        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
823
824        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
825        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
826
827        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
828
829        #[cfg(debug_assertions)]
830        InputRefValidator.validate(plan.clone());
831
832        ctx.may_store_explain_logical(&plan);
833
834        Ok(plan)
835    }
836
837    pub fn gen_optimized_logical_plan_for_batch(
838        mut plan: LogicalPlanRef,
839    ) -> Result<LogicalPlanRef> {
840        let ctx = plan.ctx();
841        let explain_trace = ctx.is_explain_trace();
842
843        if explain_trace {
844            ctx.trace("Begin:");
845            ctx.trace(plan.explain_to_string());
846        }
847
848        if ctx.session_ctx().config().enable_mv_selection() {
849            let query_relations =
850                RelationCollectorVisitor::collect_with(HashSet::new(), plan.clone());
851            Self::register_batch_mview_candidates(ctx.session_ctx(), &ctx, &query_relations);
852            plan = plan.optimize_by_rules(&BATCH_MV_SELECTION)?;
853        }
854
855        // Inline `NOW()` and `PROCTIME()`, only for batch queries.
856        plan = Self::inline_now_proc_time(plan, &ctx);
857
858        // Convert the dag back to the tree, because we don't support DAG plan for batch.
859        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
860
861        plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
862        plan = plan.optimize_by_rules(&GROUPING_SETS)?;
863        plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
864        plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
865        plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
866        plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
867        // Table function should be converted into `file_scan` before `project_set`.
868        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
869        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
870        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
871        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
872        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
873        plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
874        // In order to unnest a table function, we need to convert it into a `project_set` first.
875        plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
876
877        plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
878
879        plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
880
881        // Filter simplification must be applied before predicate push-down
882        // otherwise the filter for some nodes (e.g., `LogicalScan`)
883        // may not be properly applied.
884        plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
885
886        // Predicate Push-down
887        let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
888        plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
889
890        if plan.ctx().session_ctx().config().enable_join_ordering() {
891            // Merge inner joins and intermediate filters into multijoin
892            // This rule assumes that filters have already been pushed down near to
893            // their relevant joins.
894            plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
895
896            // Reorder multijoin into left-deep join tree.
897            plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
898        }
899
900        // Predicate Push-down: apply filter pushdown rules again since we pullup all join
901        // conditions into a filter above the multijoin.
902        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
903            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
904            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
905        }
906
907        // Push down the calculation of inputs of join's condition.
908        plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
909
910        plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
911        // Must push down predicates again after split over window so that OverWindow can be
912        // optimized to TopN.
913        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
914            last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
915            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
916        }
917        plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
918        plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
919
920        // Convert distinct aggregates.
921        plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
922
923        plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
924
925        plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
926
927        plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
928
929        // Do a final column pruning and predicate pushing down to clean up the plan.
930        plan = Self::column_pruning(plan, explain_trace, &ctx);
931        if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
932            (#[allow(unused_assignments)]
933            last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
934            plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
935        }
936
937        // Materialize Iceberg intermediate scans after predicate pushdown and column pruning.
938        plan = plan.optimize_by_rules(&MATERIALIZE_ICEBERG_SCAN)?;
939
940        plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
941        plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
942
943        plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
944
945        // This need to be apply after PROJECT_REMOVE to ensure there is no projection between agg and iceberg scan.
946        plan = plan.optimize_by_rules(&ICEBERG_COUNT_STAR)?;
947
948        plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
949
950        plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
951
952        plan = plan.optimize_by_rules(&PROJECT_TOP_N_TRANSPOSE)?;
953
954        plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
955
956        plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
957
958        #[cfg(debug_assertions)]
959        InputRefValidator.validate(plan.clone());
960
961        ctx.may_store_explain_logical(&plan);
962
963        Ok(plan)
964    }
965
966    fn register_batch_mview_candidates(
967        session: &SessionImpl,
968        context: &OptimizerContextRef,
969        query_relations: &HashSet<ObjectId>,
970    ) {
971        let catalog_reader = session.env().catalog_reader().read_guard();
972        let user_reader = session.env().user_info_reader().read_guard();
973        let Some(current_user) = user_reader.get_user_by_name(&session.user_name()) else {
974            return;
975        };
976        let mut mv_dependencies: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
977        let mut mviews_with_source_dependency: HashSet<ObjectId> = HashSet::new();
978        for dep in catalog_reader.iter_object_dependencies() {
979            mv_dependencies
980                .entry(dep.object_id)
981                .or_default()
982                .insert(dep.referenced_object_id);
983            if dep.referenced_object_type == PbObjectType::Source {
984                mviews_with_source_dependency.insert(dep.object_id);
985            }
986        }
987        let db_name = session.database();
988        let Ok(schemas) = catalog_reader.iter_schemas(&db_name) else {
989            return;
990        };
991
992        for schema in schemas {
993            for mv in schema.iter_created_mvs_with_acl(current_user) {
994                let mv_object_id = mv.id().as_object_id();
995                if mviews_with_source_dependency.contains(&mv_object_id) {
996                    continue;
997                }
998                let is_subset = mv_dependencies
999                    .get(&mv_object_id)
1000                    .is_some_and(|deps| deps.is_subset(query_relations));
1001                if !is_subset {
1002                    continue;
1003                }
1004                let Ok(stmt) = mv.create_sql_ast() else {
1005                    continue;
1006                };
1007                let Statement::CreateView {
1008                    materialized: true,
1009                    query,
1010                    ..
1011                } = stmt
1012                else {
1013                    continue;
1014                };
1015                let mut binder = Binder::new_for_batch(session);
1016                let Ok(bound_query) = binder.bind_query(&query) else {
1017                    continue;
1018                };
1019                let mut planner = Planner::new_for_batch_dql(context.clone());
1020                let Ok(plan_root) = planner.plan_query(bound_query) else {
1021                    continue;
1022                };
1023                context.add_batch_mview_candidate(mv.clone(), plan_root.plan.clone());
1024            }
1025        }
1026    }
1027}