1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42 stage_name: &str,
43 ) -> Result<PlanRef> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage,
69 ) -> Result<PlanRef> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage,
80 ) -> Result<PlanRef> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage {
92 stage_name: String,
93 rules: Vec<BoxedRule>,
94 apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117 OptimizationStage::new(
118 "DAG To Tree",
119 vec![DagToTreeRule::create()],
120 ApplyOrder::TopDown,
121 )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125 OptimizationStage::new(
126 "Convert GENERATE_SERIES Ends With NOW",
127 vec![GenerateSeriesWithNowRule::create()],
128 ApplyOrder::TopDown,
129 )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133 OptimizationStage::new(
134 "Table Function Convert",
135 vec![
136 TableFunctionToFileScanRule::create(),
138 TableFunctionToPostgresQueryRule::create(),
140 TableFunctionToMySqlQueryRule::create(),
142 TableFunctionToProjectSetRule::create(),
144 ],
145 ApplyOrder::TopDown,
146 )
147});
148
149static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
150 OptimizationStage::new(
151 "Table Function To FileScan",
152 vec![TableFunctionToFileScanRule::create()],
153 ApplyOrder::TopDown,
154 )
155});
156
157static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
158 OptimizationStage::new(
159 "Table Function To PostgresQuery",
160 vec![TableFunctionToPostgresQueryRule::create()],
161 ApplyOrder::TopDown,
162 )
163});
164
165static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
166 OptimizationStage::new(
167 "Table Function To MySQL",
168 vec![TableFunctionToMySqlQueryRule::create()],
169 ApplyOrder::TopDown,
170 )
171});
172
173static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
174 OptimizationStage::new(
175 "Values Extract Project",
176 vec![ValuesExtractProjectRule::create()],
177 ApplyOrder::TopDown,
178 )
179});
180
181static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
182 OptimizationStage::new(
183 "Simple Unnesting",
184 vec![
185 MaxOneRowEliminateRule::create(),
187 ApplyToJoinRule::create(),
189 PullUpCorrelatedPredicateRule::create(),
191 PullUpCorrelatedPredicateAggRule::create(),
192 ],
193 ApplyOrder::BottomUp,
194 )
195});
196
197static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
198 OptimizationStage::new(
199 "Set Operation Merge",
200 vec![
201 UnionMergeRule::create(),
202 IntersectMergeRule::create(),
203 ExceptMergeRule::create(),
204 ],
205 ApplyOrder::BottomUp,
206 )
207});
208
209static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
210 LazyLock::new(|| {
211 OptimizationStage::new(
212 "General Unnesting(Translate Apply)",
213 vec![TranslateApplyRule::create(true)],
214 ApplyOrder::TopDown,
215 )
216 });
217
218static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
219 LazyLock::new(|| {
220 OptimizationStage::new(
221 "General Unnesting(Translate Apply)",
222 vec![TranslateApplyRule::create(false)],
223 ApplyOrder::TopDown,
224 )
225 });
226
227static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
228 OptimizationStage::new(
229 "General Unnesting(Push Down Apply)",
230 vec![
231 ApplyEliminateRule::create(),
232 ApplyAggTransposeRule::create(),
233 ApplyDedupTransposeRule::create(),
234 ApplyFilterTransposeRule::create(),
235 ApplyProjectTransposeRule::create(),
236 ApplyProjectSetTransposeRule::create(),
237 ApplyTopNTransposeRule::create(),
238 ApplyLimitTransposeRule::create(),
239 ApplyJoinTransposeRule::create(),
240 ApplyUnionTransposeRule::create(),
241 ApplyOverWindowTransposeRule::create(),
242 ApplyExpandTransposeRule::create(),
243 ApplyHopWindowTransposeRule::create(),
244 CrossJoinEliminateRule::create(),
245 ApplyShareEliminateRule::create(),
246 ],
247 ApplyOrder::TopDown,
248 )
249});
250
251static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
252 OptimizationStage::new(
253 "To MultiJoin",
254 vec![MergeMultiJoinRule::create()],
255 ApplyOrder::TopDown,
256 )
257});
258
259static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
260 OptimizationStage::new(
261 "Join Ordering".to_owned(),
262 vec![LeftDeepTreeJoinOrderingRule::create()],
263 ApplyOrder::TopDown,
264 )
265});
266
267static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
268 OptimizationStage::new(
269 "Join Ordering".to_owned(),
270 vec![BushyTreeJoinOrderingRule::create()],
271 ApplyOrder::TopDown,
272 )
273});
274
275static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
276 OptimizationStage::new(
277 "Push down filter with now into a left semijoin",
278 vec![
279 SplitNowAndRule::create(),
280 SplitNowOrRule::create(),
281 FilterWithNowToJoinRule::create(),
282 ],
283 ApplyOrder::TopDown,
284 )
285});
286
287static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
288 OptimizationStage::new(
289 "Push down the calculation of inputs of join's condition",
290 vec![PushCalculationOfJoinRule::create()],
291 ApplyOrder::TopDown,
292 )
293});
294
295static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
296 OptimizationStage::new(
297 "Convert Distinct Aggregation",
298 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
299 ApplyOrder::TopDown,
300 )
301});
302
303static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
304 OptimizationStage::new(
305 "Convert Distinct Aggregation",
306 vec![
307 UnionToDistinctRule::create(),
308 DistinctAggRule::create(false),
309 ],
310 ApplyOrder::TopDown,
311 )
312});
313
314static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
315 OptimizationStage::new(
316 "Simplify Aggregation",
317 vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
318 ApplyOrder::TopDown,
319 )
320});
321
322static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
323 OptimizationStage::new(
324 "Join Commute".to_owned(),
325 vec![JoinCommuteRule::create()],
326 ApplyOrder::TopDown,
327 )
328});
329
330static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
331 OptimizationStage::new(
332 "Constant Output Operator Remove",
333 vec![EmptyAggRemoveRule::create()],
334 ApplyOrder::TopDown,
335 )
336});
337
338static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
339 OptimizationStage::new(
340 "Project Remove",
341 vec![
342 ProjectMergeRule::create(),
344 ProjectEliminateRule::create(),
345 TrivialProjectToValuesRule::create(),
346 UnionInputValuesMergeRule::create(),
347 JoinProjectTransposeRule::create(),
348 ProjectJoinMergeRule::create(),
351 AggProjectMergeRule::create(),
352 ],
353 ApplyOrder::BottomUp,
354 )
355});
356
357static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
358 OptimizationStage::new(
359 "Split Over Window",
360 vec![OverWindowSplitRule::create()],
361 ApplyOrder::TopDown,
362 )
363});
364
365static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
369 OptimizationStage::new(
370 "Convert Over Window",
371 vec![
372 ProjectMergeRule::create(),
373 ProjectEliminateRule::create(),
374 TrivialProjectToValuesRule::create(),
375 UnionInputValuesMergeRule::create(),
376 OverWindowToAggAndJoinRule::create(),
377 OverWindowToTopNRule::create(),
378 ],
379 ApplyOrder::TopDown,
380 )
381});
382
383static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
384 OptimizationStage::new(
385 "Merge Over Window",
386 vec![OverWindowMergeRule::create()],
387 ApplyOrder::TopDown,
388 )
389});
390
391static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
392 OptimizationStage::new(
393 "Rewrite Like Expr",
394 vec![RewriteLikeExprRule::create()],
395 ApplyOrder::TopDown,
396 )
397});
398
399static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
400 OptimizationStage::new(
401 "TopN/SimpleAgg on Index",
402 vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
403 ApplyOrder::TopDown,
404 )
405});
406
407static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
408 OptimizationStage::new(
409 "Void always-false filter's downstream",
410 vec![AlwaysFalseFilterRule::create()],
411 ApplyOrder::TopDown,
412 )
413});
414
415static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
416 OptimizationStage::new(
417 "Push Down Limit",
418 vec![LimitPushDownRule::create()],
419 ApplyOrder::TopDown,
420 )
421});
422
423static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
424 OptimizationStage::new(
425 "Pull Up Hop",
426 vec![PullUpHopRule::create()],
427 ApplyOrder::BottomUp,
428 )
429});
430
431static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
432 OptimizationStage::new(
433 "Set Operation To Join",
434 vec![
435 IntersectToSemiJoinRule::create(),
436 ExceptToAntiJoinRule::create(),
437 ],
438 ApplyOrder::BottomUp,
439 )
440});
441
442static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
443 OptimizationStage::new(
444 "Grouping Sets",
445 vec![
446 GroupingSetsToExpandRule::create(),
447 ExpandToProjectRule::create(),
448 ],
449 ApplyOrder::TopDown,
450 )
451});
452
453static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
454 OptimizationStage::new(
455 "Common Sub Expression Extract",
456 vec![CommonSubExprExtractRule::create()],
457 ApplyOrder::TopDown,
458 )
459});
460
461static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
462 OptimizationStage::new(
463 "Logical Filter Expression Simplify",
464 vec![LogicalFilterExpressionSimplifyRule::create()],
465 ApplyOrder::TopDown,
466 )
467});
468
469static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
470 OptimizationStage::new(
471 "Rewrite Source For Batch",
472 vec![
473 SourceToKafkaScanRule::create(),
474 SourceToIcebergScanRule::create(),
475 ],
476 ApplyOrder::TopDown,
477 )
478});
479
480impl LogicalOptimizer {
481 pub fn predicate_pushdown(
482 plan: PlanRef,
483 explain_trace: bool,
484 ctx: &OptimizerContextRef,
485 ) -> PlanRef {
486 let plan = plan.predicate_pushdown(
487 Condition::true_cond(),
488 &mut PredicatePushdownContext::new(plan.clone()),
489 );
490 if explain_trace {
491 ctx.trace("Predicate Push Down:");
492 ctx.trace(plan.explain_to_string());
493 }
494 plan
495 }
496
497 pub fn subquery_unnesting(
498 mut plan: PlanRef,
499 enable_share_plan: bool,
500 explain_trace: bool,
501 ctx: &OptimizerContextRef,
502 ) -> Result<PlanRef> {
503 if !has_logical_apply(plan.clone()) {
505 return Ok(plan);
506 }
507 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
509 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
510 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
513 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
515 plan = if enable_share_plan {
519 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
520 } else {
521 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
522 };
523 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
524
525 plan.check_apply_elimination()?;
527
528 Ok(plan)
529 }
530
531 pub fn column_pruning(
532 mut plan: PlanRef,
533 explain_trace: bool,
534 ctx: &OptimizerContextRef,
535 ) -> PlanRef {
536 let required_cols = (0..plan.schema().len()).collect_vec();
537 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
538 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
539 if explain_trace {
541 ctx.trace("Prune Columns:");
542 ctx.trace(plan.explain_to_string());
543 }
544
545 if column_pruning_ctx.need_second_round() {
546 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
549 if explain_trace {
550 ctx.trace("Prune Columns (For DAG):");
551 ctx.trace(plan.explain_to_string());
552 }
553 }
554 plan
555 }
556
557 pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
558 let mut v = NowProcTimeFinder::default();
560 plan.visit_exprs_recursive(&mut v);
561 if !v.has() {
562 return plan;
563 }
564
565 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
566
567 let plan = plan.rewrite_exprs_recursive(&mut v);
568
569 if ctx.is_explain_trace() {
570 ctx.trace("Inline Now and ProcTime:");
571 ctx.trace(plan.explain_to_string());
572 }
573 plan
574 }
575
576 pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
577 let ctx = plan.ctx();
578 let explain_trace = ctx.is_explain_trace();
579
580 if explain_trace {
581 ctx.trace("Begin:");
582 ctx.trace(plan.explain_to_string());
583 }
584
585 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
587 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
589 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
591
592 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
596 if enable_share_plan {
597 plan = plan.common_subplan_sharing();
599 plan = plan.prune_share();
600 if explain_trace {
601 ctx.trace("Common Sub-plan Sharing:");
602 ctx.trace(plan.explain_to_string());
603 }
604 } else {
605 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
606
607 plan = ShareSourceRewriter::share_source(plan);
611 if explain_trace {
612 ctx.trace("Share Source:");
613 ctx.trace(plan.explain_to_string());
614 }
615 }
616 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
617 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
618 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
621 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
623
624 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
625 if has_logical_max_one_row(plan.clone()) {
626 bail!("Scalar subquery might produce more than one row.");
630 }
631
632 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
635
636 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
638
639 if plan.ctx().session_ctx().config().enable_join_ordering() {
640 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
644
645 if plan
647 .ctx()
648 .session_ctx()
649 .config()
650 .streaming_enable_bushy_join()
651 {
652 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
653 } else {
654 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
655 }
656 }
657
658 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
661
662 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
664
665 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
667
668 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
669 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
672 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
673 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
674
675 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
676 plan = if force_split_distinct_agg {
679 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
680 } else {
681 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
682 };
683
684 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
685
686 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
687
688 plan = Self::column_pruning(plan, explain_trace, &ctx);
690 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
691
692 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
693 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
694
695 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
696
697 #[cfg(debug_assertions)]
698 InputRefValidator.validate(plan.clone());
699
700 if ctx.is_explain_logical() {
701 match ctx.explain_format() {
702 ExplainFormat::Text => {
703 ctx.store_logical(plan.explain_to_string());
704 }
705 ExplainFormat::Json => {
706 ctx.store_logical(plan.explain_to_json());
707 }
708 ExplainFormat::Xml => {
709 ctx.store_logical(plan.explain_to_xml());
710 }
711 ExplainFormat::Yaml => {
712 ctx.store_logical(plan.explain_to_yaml());
713 }
714 ExplainFormat::Dot => {
715 ctx.store_logical(plan.explain_to_dot());
716 }
717 }
718 }
719
720 Ok(plan)
721 }
722
723 pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
724 let ctx = plan.ctx();
725 let explain_trace = ctx.is_explain_trace();
726
727 if explain_trace {
728 ctx.trace("Begin:");
729 ctx.trace(plan.explain_to_string());
730 }
731
732 plan = Self::inline_now_proc_time(plan, &ctx);
734
735 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
737
738 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
739 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
740 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
741 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
742 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
743 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
744 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
746 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
747 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
748 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
750
751 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
752
753 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
757
758 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
760 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
761
762 if plan.ctx().session_ctx().config().enable_join_ordering() {
763 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
767
768 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
770 }
771
772 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
775 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
776 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
777 }
778
779 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
781
782 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
783 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
786 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
787 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
788 }
789 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
790 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
791
792 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
794
795 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
796
797 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
798
799 plan = Self::column_pruning(plan, explain_trace, &ctx);
801 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
802 (#[allow(unused_assignments)]
803 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
804 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
805 }
806
807 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
808 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
809
810 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
811
812 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
813
814 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
815
816 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
817
818 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
819
820 #[cfg(debug_assertions)]
821 InputRefValidator.validate(plan.clone());
822
823 if ctx.is_explain_logical() {
824 match ctx.explain_format() {
825 ExplainFormat::Text => {
826 ctx.store_logical(plan.explain_to_string());
827 }
828 ExplainFormat::Json => {
829 ctx.store_logical(plan.explain_to_json());
830 }
831 ExplainFormat::Xml => {
832 ctx.store_logical(plan.explain_to_xml());
833 }
834 ExplainFormat::Yaml => {
835 ctx.store_logical(plan.explain_to_yaml());
836 }
837 ExplainFormat::Dot => {
838 ctx.store_logical(plan.explain_to_dot());
839 }
840 }
841 }
842
843 Ok(plan)
844 }
845}