1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42 stage_name: &str,
43 ) -> Result<PlanRef> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage,
69 ) -> Result<PlanRef> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage,
80 ) -> Result<PlanRef> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage {
92 stage_name: String,
93 rules: Vec<BoxedRule>,
94 apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117 OptimizationStage::new(
118 "DAG To Tree",
119 vec![DagToTreeRule::create()],
120 ApplyOrder::TopDown,
121 )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125 OptimizationStage::new(
126 "Convert GENERATE_SERIES Ends With NOW",
127 vec![GenerateSeriesWithNowRule::create()],
128 ApplyOrder::TopDown,
129 )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133 OptimizationStage::new(
134 "Table Function Convert",
135 vec![
136 TableFunctionToFileScanRule::create(),
138 TableFunctionToPostgresQueryRule::create(),
140 TableFunctionToMySqlQueryRule::create(),
142 TableFunctionToProjectSetRule::create(),
144 ],
145 ApplyOrder::TopDown,
146 )
147});
148
149static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
150 OptimizationStage::new(
151 "Table Function To FileScan",
152 vec![TableFunctionToFileScanRule::create()],
153 ApplyOrder::TopDown,
154 )
155});
156
157static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
158 OptimizationStage::new(
159 "Table Function To PostgresQuery",
160 vec![TableFunctionToPostgresQueryRule::create()],
161 ApplyOrder::TopDown,
162 )
163});
164
165static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
166 OptimizationStage::new(
167 "Table Function To MySQL",
168 vec![TableFunctionToMySqlQueryRule::create()],
169 ApplyOrder::TopDown,
170 )
171});
172
173static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
174 OptimizationStage::new(
175 "Values Extract Project",
176 vec![ValuesExtractProjectRule::create()],
177 ApplyOrder::TopDown,
178 )
179});
180
181static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
182 OptimizationStage::new(
183 "Simple Unnesting",
184 vec![
185 MaxOneRowEliminateRule::create(),
187 ApplyToJoinRule::create(),
189 PullUpCorrelatedPredicateRule::create(),
191 PullUpCorrelatedPredicateAggRule::create(),
192 ],
193 ApplyOrder::BottomUp,
194 )
195});
196
197static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
198 OptimizationStage::new(
199 "Set Operation Merge",
200 vec![
201 UnionMergeRule::create(),
202 IntersectMergeRule::create(),
203 ExceptMergeRule::create(),
204 ],
205 ApplyOrder::BottomUp,
206 )
207});
208
209static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
210 LazyLock::new(|| {
211 OptimizationStage::new(
212 "General Unnesting(Translate Apply)",
213 vec![TranslateApplyRule::create(true)],
214 ApplyOrder::TopDown,
215 )
216 });
217
218static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
219 LazyLock::new(|| {
220 OptimizationStage::new(
221 "General Unnesting(Translate Apply)",
222 vec![TranslateApplyRule::create(false)],
223 ApplyOrder::TopDown,
224 )
225 });
226
227static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
228 OptimizationStage::new(
229 "General Unnesting(Push Down Apply)",
230 vec![
231 ApplyEliminateRule::create(),
232 ApplyAggTransposeRule::create(),
233 ApplyDedupTransposeRule::create(),
234 ApplyFilterTransposeRule::create(),
235 ApplyProjectTransposeRule::create(),
236 ApplyProjectSetTransposeRule::create(),
237 ApplyTopNTransposeRule::create(),
238 ApplyLimitTransposeRule::create(),
239 ApplyJoinTransposeRule::create(),
240 ApplyUnionTransposeRule::create(),
241 ApplyOverWindowTransposeRule::create(),
242 ApplyExpandTransposeRule::create(),
243 ApplyHopWindowTransposeRule::create(),
244 CrossJoinEliminateRule::create(),
245 ApplyShareEliminateRule::create(),
246 ],
247 ApplyOrder::TopDown,
248 )
249});
250
251static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
252 OptimizationStage::new(
253 "To MultiJoin",
254 vec![MergeMultiJoinRule::create()],
255 ApplyOrder::TopDown,
256 )
257});
258
259static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
260 OptimizationStage::new(
261 "Join Ordering".to_owned(),
262 vec![LeftDeepTreeJoinOrderingRule::create()],
263 ApplyOrder::TopDown,
264 )
265});
266
267static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
268 OptimizationStage::new(
269 "Join Ordering".to_owned(),
270 vec![BushyTreeJoinOrderingRule::create()],
271 ApplyOrder::TopDown,
272 )
273});
274
275static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
276 OptimizationStage::new(
277 "Push down filter with now into a left semijoin",
278 vec![
279 SplitNowAndRule::create(),
280 SplitNowOrRule::create(),
281 FilterWithNowToJoinRule::create(),
282 ],
283 ApplyOrder::TopDown,
284 )
285});
286
287static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
288 OptimizationStage::new(
289 "Push down the calculation of inputs of join's condition",
290 vec![PushCalculationOfJoinRule::create()],
291 ApplyOrder::TopDown,
292 )
293});
294
295static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
296 OptimizationStage::new(
297 "Convert Distinct Aggregation",
298 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
299 ApplyOrder::TopDown,
300 )
301});
302
303static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
304 OptimizationStage::new(
305 "Convert Distinct Aggregation",
306 vec![
307 UnionToDistinctRule::create(),
308 DistinctAggRule::create(false),
309 ],
310 ApplyOrder::TopDown,
311 )
312});
313
314static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
315 OptimizationStage::new(
316 "Simplify Aggregation",
317 vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
318 ApplyOrder::TopDown,
319 )
320});
321
322static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
323 OptimizationStage::new(
324 "Join Commute".to_owned(),
325 vec![JoinCommuteRule::create()],
326 ApplyOrder::TopDown,
327 )
328});
329
330static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
331 OptimizationStage::new(
332 "Project Remove",
333 vec![
334 ProjectMergeRule::create(),
336 ProjectEliminateRule::create(),
337 TrivialProjectToValuesRule::create(),
338 UnionInputValuesMergeRule::create(),
339 JoinProjectTransposeRule::create(),
340 ProjectJoinMergeRule::create(),
343 AggProjectMergeRule::create(),
344 ],
345 ApplyOrder::BottomUp,
346 )
347});
348
349static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
350 OptimizationStage::new(
351 "Split Over Window",
352 vec![OverWindowSplitRule::create()],
353 ApplyOrder::TopDown,
354 )
355});
356
357static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
361 OptimizationStage::new(
362 "Convert Over Window",
363 vec![
364 ProjectMergeRule::create(),
365 ProjectEliminateRule::create(),
366 TrivialProjectToValuesRule::create(),
367 UnionInputValuesMergeRule::create(),
368 OverWindowToAggAndJoinRule::create(),
369 OverWindowToTopNRule::create(),
370 ],
371 ApplyOrder::TopDown,
372 )
373});
374
375static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
376 OptimizationStage::new(
377 "Merge Over Window",
378 vec![OverWindowMergeRule::create()],
379 ApplyOrder::TopDown,
380 )
381});
382
383static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
384 OptimizationStage::new(
385 "Rewrite Like Expr",
386 vec![RewriteLikeExprRule::create()],
387 ApplyOrder::TopDown,
388 )
389});
390
391static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
392 OptimizationStage::new(
393 "TopN/SimpleAgg on Index",
394 vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
395 ApplyOrder::TopDown,
396 )
397});
398
399static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
400 OptimizationStage::new(
401 "Void always-false filter's downstream",
402 vec![AlwaysFalseFilterRule::create()],
403 ApplyOrder::TopDown,
404 )
405});
406
407static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
408 OptimizationStage::new(
409 "Push Down Limit",
410 vec![LimitPushDownRule::create()],
411 ApplyOrder::TopDown,
412 )
413});
414
415static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
416 OptimizationStage::new(
417 "Pull Up Hop",
418 vec![PullUpHopRule::create()],
419 ApplyOrder::BottomUp,
420 )
421});
422
423static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
424 OptimizationStage::new(
425 "Set Operation To Join",
426 vec![
427 IntersectToSemiJoinRule::create(),
428 ExceptToAntiJoinRule::create(),
429 ],
430 ApplyOrder::BottomUp,
431 )
432});
433
434static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
435 OptimizationStage::new(
436 "Grouping Sets",
437 vec![
438 GroupingSetsToExpandRule::create(),
439 ExpandToProjectRule::create(),
440 ],
441 ApplyOrder::TopDown,
442 )
443});
444
445static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
446 OptimizationStage::new(
447 "Common Sub Expression Extract",
448 vec![CommonSubExprExtractRule::create()],
449 ApplyOrder::TopDown,
450 )
451});
452
453static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
454 OptimizationStage::new(
455 "Logical Filter Expression Simplify",
456 vec![LogicalFilterExpressionSimplifyRule::create()],
457 ApplyOrder::TopDown,
458 )
459});
460
461static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
462 OptimizationStage::new(
463 "Rewrite Source For Batch",
464 vec![
465 SourceToKafkaScanRule::create(),
466 SourceToIcebergScanRule::create(),
467 ],
468 ApplyOrder::TopDown,
469 )
470});
471
472impl LogicalOptimizer {
473 pub fn predicate_pushdown(
474 plan: PlanRef,
475 explain_trace: bool,
476 ctx: &OptimizerContextRef,
477 ) -> PlanRef {
478 let plan = plan.predicate_pushdown(
479 Condition::true_cond(),
480 &mut PredicatePushdownContext::new(plan.clone()),
481 );
482 if explain_trace {
483 ctx.trace("Predicate Push Down:");
484 ctx.trace(plan.explain_to_string());
485 }
486 plan
487 }
488
489 pub fn subquery_unnesting(
490 mut plan: PlanRef,
491 enable_share_plan: bool,
492 explain_trace: bool,
493 ctx: &OptimizerContextRef,
494 ) -> Result<PlanRef> {
495 if !has_logical_apply(plan.clone()) {
497 return Ok(plan);
498 }
499 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
501 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
502 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
505 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
507 plan = if enable_share_plan {
511 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
512 } else {
513 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
514 };
515 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
516
517 plan.check_apply_elimination()?;
519
520 Ok(plan)
521 }
522
523 pub fn column_pruning(
524 mut plan: PlanRef,
525 explain_trace: bool,
526 ctx: &OptimizerContextRef,
527 ) -> PlanRef {
528 let required_cols = (0..plan.schema().len()).collect_vec();
529 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
530 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
531 if explain_trace {
533 ctx.trace("Prune Columns:");
534 ctx.trace(plan.explain_to_string());
535 }
536
537 if column_pruning_ctx.need_second_round() {
538 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
541 if explain_trace {
542 ctx.trace("Prune Columns (For DAG):");
543 ctx.trace(plan.explain_to_string());
544 }
545 }
546 plan
547 }
548
549 pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
550 let mut v = NowProcTimeFinder::default();
552 plan.visit_exprs_recursive(&mut v);
553 if !v.has() {
554 return plan;
555 }
556
557 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
558
559 let plan = plan.rewrite_exprs_recursive(&mut v);
560
561 if ctx.is_explain_trace() {
562 ctx.trace("Inline Now and ProcTime:");
563 ctx.trace(plan.explain_to_string());
564 }
565 plan
566 }
567
568 pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
569 let ctx = plan.ctx();
570 let explain_trace = ctx.is_explain_trace();
571
572 if explain_trace {
573 ctx.trace("Begin:");
574 ctx.trace(plan.explain_to_string());
575 }
576
577 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
579 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
581
582 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
586 if enable_share_plan {
587 plan = plan.common_subplan_sharing();
589 plan = plan.prune_share();
590 if explain_trace {
591 ctx.trace("Common Sub-plan Sharing:");
592 ctx.trace(plan.explain_to_string());
593 }
594 } else {
595 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
596
597 plan = ShareSourceRewriter::share_source(plan);
601 if explain_trace {
602 ctx.trace("Share Source:");
603 ctx.trace(plan.explain_to_string());
604 }
605 }
606 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
607 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
608 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
611 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
613
614 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
615 if has_logical_max_one_row(plan.clone()) {
616 bail!("Scalar subquery might produce more than one row.");
620 }
621
622 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
625
626 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
628
629 if plan.ctx().session_ctx().config().enable_join_ordering() {
630 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
634
635 if plan
637 .ctx()
638 .session_ctx()
639 .config()
640 .streaming_enable_bushy_join()
641 {
642 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
643 } else {
644 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
645 }
646 }
647
648 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
651
652 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
654
655 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
657
658 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
659 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
662 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
663 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
664
665 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
666 plan = if force_split_distinct_agg {
669 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
670 } else {
671 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
672 };
673
674 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
675
676 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
677
678 plan = Self::column_pruning(plan, explain_trace, &ctx);
680 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
681
682 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
683
684 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
685
686 #[cfg(debug_assertions)]
687 InputRefValidator.validate(plan.clone());
688
689 if ctx.is_explain_logical() {
690 match ctx.explain_format() {
691 ExplainFormat::Text => {
692 ctx.store_logical(plan.explain_to_string());
693 }
694 ExplainFormat::Json => {
695 ctx.store_logical(plan.explain_to_json());
696 }
697 ExplainFormat::Xml => {
698 ctx.store_logical(plan.explain_to_xml());
699 }
700 ExplainFormat::Yaml => {
701 ctx.store_logical(plan.explain_to_yaml());
702 }
703 ExplainFormat::Dot => {
704 ctx.store_logical(plan.explain_to_dot());
705 }
706 }
707 }
708
709 Ok(plan)
710 }
711
712 pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
713 let ctx = plan.ctx();
714 let explain_trace = ctx.is_explain_trace();
715
716 if explain_trace {
717 ctx.trace("Begin:");
718 ctx.trace(plan.explain_to_string());
719 }
720
721 plan = Self::inline_now_proc_time(plan, &ctx);
723
724 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
726
727 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
728 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
729 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
730 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
731 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
732 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
733 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
735 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
736 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
737 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
739
740 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
741
742 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
746
747 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
749 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
750
751 if plan.ctx().session_ctx().config().enable_join_ordering() {
752 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
756
757 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
759 }
760
761 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
764 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
765 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
766 }
767
768 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
770
771 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
772 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
775 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
776 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
777 }
778 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
779 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
780
781 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
783
784 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
785
786 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
787
788 plan = Self::column_pruning(plan, explain_trace, &ctx);
790 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
791 (#[allow(unused_assignments)]
792 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
793 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
794 }
795
796 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
797
798 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
799
800 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
801
802 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
803
804 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
805
806 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
807
808 #[cfg(debug_assertions)]
809 InputRefValidator.validate(plan.clone());
810
811 if ctx.is_explain_logical() {
812 match ctx.explain_format() {
813 ExplainFormat::Text => {
814 ctx.store_logical(plan.explain_to_string());
815 }
816 ExplainFormat::Json => {
817 ctx.store_logical(plan.explain_to_json());
818 }
819 ExplainFormat::Xml => {
820 ctx.store_logical(plan.explain_to_xml());
821 }
822 ExplainFormat::Yaml => {
823 ctx.store_logical(plan.explain_to_yaml());
824 }
825 ExplainFormat::Dot => {
826 ctx.store_logical(plan.explain_to_dot());
827 }
828 }
829 }
830
831 Ok(plan)
832 }
833}