1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42 stage_name: &str,
43 ) -> Result<PlanRef> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage,
69 ) -> Result<PlanRef> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage,
80 ) -> Result<PlanRef> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage {
92 stage_name: String,
93 rules: Vec<BoxedRule>,
94 apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117 OptimizationStage::new(
118 "DAG To Tree",
119 vec![DagToTreeRule::create()],
120 ApplyOrder::TopDown,
121 )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125 OptimizationStage::new(
126 "Convert GENERATE_SERIES Ends With NOW",
127 vec![GenerateSeriesWithNowRule::create()],
128 ApplyOrder::TopDown,
129 )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133 OptimizationStage::new(
134 "Table Function Convert",
135 vec![
136 TableFunctionToFileScanRule::create(),
138 TableFunctionToInternalBackfillProgressRule::create(),
140 TableFunctionToInternalSourceBackfillProgressRule::create(),
142 TableFunctionToPostgresQueryRule::create(),
144 TableFunctionToMySqlQueryRule::create(),
146 TableFunctionToProjectSetRule::create(),
148 ],
149 ApplyOrder::TopDown,
150 )
151});
152
153static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
154 OptimizationStage::new(
155 "Table Function To FileScan",
156 vec![TableFunctionToFileScanRule::create()],
157 ApplyOrder::TopDown,
158 )
159});
160
161static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
162 OptimizationStage::new(
163 "Table Function To PostgresQuery",
164 vec![TableFunctionToPostgresQueryRule::create()],
165 ApplyOrder::TopDown,
166 )
167});
168
169static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
170 OptimizationStage::new(
171 "Table Function To MySQL",
172 vec![TableFunctionToMySqlQueryRule::create()],
173 ApplyOrder::TopDown,
174 )
175});
176
177static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
178 LazyLock::new(|| {
179 OptimizationStage::new(
180 "Table Function To Internal Backfill Progress",
181 vec![TableFunctionToInternalBackfillProgressRule::create()],
182 ApplyOrder::TopDown,
183 )
184 });
185
186static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
187 LazyLock::new(|| {
188 OptimizationStage::new(
189 "Table Function To Internal Source Backfill Progress",
190 vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
191 ApplyOrder::TopDown,
192 )
193 });
194
195static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
196 OptimizationStage::new(
197 "Values Extract Project",
198 vec![ValuesExtractProjectRule::create()],
199 ApplyOrder::TopDown,
200 )
201});
202
203static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
204 OptimizationStage::new(
205 "Simple Unnesting",
206 vec![
207 MaxOneRowEliminateRule::create(),
209 ApplyToJoinRule::create(),
211 PullUpCorrelatedPredicateRule::create(),
213 PullUpCorrelatedPredicateAggRule::create(),
214 ],
215 ApplyOrder::BottomUp,
216 )
217});
218
219static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
220 OptimizationStage::new(
221 "Set Operation Merge",
222 vec![
223 UnionMergeRule::create(),
224 IntersectMergeRule::create(),
225 ExceptMergeRule::create(),
226 ],
227 ApplyOrder::BottomUp,
228 )
229});
230
231static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
232 LazyLock::new(|| {
233 OptimizationStage::new(
234 "General Unnesting(Translate Apply)",
235 vec![TranslateApplyRule::create(true)],
236 ApplyOrder::TopDown,
237 )
238 });
239
240static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
241 LazyLock::new(|| {
242 OptimizationStage::new(
243 "General Unnesting(Translate Apply)",
244 vec![TranslateApplyRule::create(false)],
245 ApplyOrder::TopDown,
246 )
247 });
248
249static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
250 OptimizationStage::new(
251 "General Unnesting(Push Down Apply)",
252 vec![
253 ApplyEliminateRule::create(),
254 ApplyAggTransposeRule::create(),
255 ApplyDedupTransposeRule::create(),
256 ApplyFilterTransposeRule::create(),
257 ApplyProjectTransposeRule::create(),
258 ApplyProjectSetTransposeRule::create(),
259 ApplyTopNTransposeRule::create(),
260 ApplyLimitTransposeRule::create(),
261 ApplyJoinTransposeRule::create(),
262 ApplyUnionTransposeRule::create(),
263 ApplyOverWindowTransposeRule::create(),
264 ApplyExpandTransposeRule::create(),
265 ApplyHopWindowTransposeRule::create(),
266 CrossJoinEliminateRule::create(),
267 ApplyShareEliminateRule::create(),
268 ],
269 ApplyOrder::TopDown,
270 )
271});
272
273static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
274 OptimizationStage::new(
275 "To MultiJoin",
276 vec![MergeMultiJoinRule::create()],
277 ApplyOrder::TopDown,
278 )
279});
280
281static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
282 OptimizationStage::new(
283 "Join Ordering".to_owned(),
284 vec![LeftDeepTreeJoinOrderingRule::create()],
285 ApplyOrder::TopDown,
286 )
287});
288
289static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
290 OptimizationStage::new(
291 "Join Ordering".to_owned(),
292 vec![BushyTreeJoinOrderingRule::create()],
293 ApplyOrder::TopDown,
294 )
295});
296
297static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
298 OptimizationStage::new(
299 "Push down filter with now into a left semijoin",
300 vec![
301 SplitNowAndRule::create(),
302 SplitNowOrRule::create(),
303 FilterWithNowToJoinRule::create(),
304 ],
305 ApplyOrder::TopDown,
306 )
307});
308
309static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
310 OptimizationStage::new(
311 "Push down the calculation of inputs of join's condition",
312 vec![PushCalculationOfJoinRule::create()],
313 ApplyOrder::TopDown,
314 )
315});
316
317static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
318 OptimizationStage::new(
319 "Convert Distinct Aggregation",
320 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
321 ApplyOrder::TopDown,
322 )
323});
324
325static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
326 OptimizationStage::new(
327 "Convert Distinct Aggregation",
328 vec![
329 UnionToDistinctRule::create(),
330 DistinctAggRule::create(false),
331 ],
332 ApplyOrder::TopDown,
333 )
334});
335
336static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
337 OptimizationStage::new(
338 "Simplify Aggregation",
339 vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
340 ApplyOrder::TopDown,
341 )
342});
343
344static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
345 OptimizationStage::new(
346 "Join Commute".to_owned(),
347 vec![JoinCommuteRule::create()],
348 ApplyOrder::TopDown,
349 )
350});
351
352static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
353 OptimizationStage::new(
354 "Constant Output Operator Remove",
355 vec![EmptyAggRemoveRule::create()],
356 ApplyOrder::TopDown,
357 )
358});
359
360static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
361 OptimizationStage::new(
362 "Project Remove",
363 vec![
364 ProjectMergeRule::create(),
366 ProjectEliminateRule::create(),
367 TrivialProjectToValuesRule::create(),
368 UnionInputValuesMergeRule::create(),
369 JoinProjectTransposeRule::create(),
370 ProjectJoinMergeRule::create(),
373 AggProjectMergeRule::create(),
374 ],
375 ApplyOrder::BottomUp,
376 )
377});
378
379static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
380 OptimizationStage::new(
381 "Split Over Window",
382 vec![OverWindowSplitRule::create()],
383 ApplyOrder::TopDown,
384 )
385});
386
387static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
391 OptimizationStage::new(
392 "Convert Over Window",
393 vec![
394 ProjectMergeRule::create(),
395 ProjectEliminateRule::create(),
396 TrivialProjectToValuesRule::create(),
397 UnionInputValuesMergeRule::create(),
398 OverWindowToAggAndJoinRule::create(),
399 OverWindowToTopNRule::create(),
400 ],
401 ApplyOrder::TopDown,
402 )
403});
404
405static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
406 OptimizationStage::new(
407 "Merge Over Window",
408 vec![OverWindowMergeRule::create()],
409 ApplyOrder::TopDown,
410 )
411});
412
413static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
414 OptimizationStage::new(
415 "Rewrite Like Expr",
416 vec![RewriteLikeExprRule::create()],
417 ApplyOrder::TopDown,
418 )
419});
420
421static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
422 OptimizationStage::new(
423 "TopN/SimpleAgg on Index",
424 vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
425 ApplyOrder::TopDown,
426 )
427});
428
429static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
430 OptimizationStage::new(
431 "Void always-false filter's downstream",
432 vec![AlwaysFalseFilterRule::create()],
433 ApplyOrder::TopDown,
434 )
435});
436
437static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
438 OptimizationStage::new(
439 "Push Down Limit",
440 vec![LimitPushDownRule::create()],
441 ApplyOrder::TopDown,
442 )
443});
444
445static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
446 OptimizationStage::new(
447 "Pull Up Hop",
448 vec![PullUpHopRule::create()],
449 ApplyOrder::BottomUp,
450 )
451});
452
453static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
454 OptimizationStage::new(
455 "Set Operation To Join",
456 vec![
457 IntersectToSemiJoinRule::create(),
458 ExceptToAntiJoinRule::create(),
459 ],
460 ApplyOrder::BottomUp,
461 )
462});
463
464static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
465 OptimizationStage::new(
466 "Grouping Sets",
467 vec![
468 GroupingSetsToExpandRule::create(),
469 ExpandToProjectRule::create(),
470 ],
471 ApplyOrder::TopDown,
472 )
473});
474
475static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
476 OptimizationStage::new(
477 "Common Sub Expression Extract",
478 vec![CommonSubExprExtractRule::create()],
479 ApplyOrder::TopDown,
480 )
481});
482
483static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
484 OptimizationStage::new(
485 "Logical Filter Expression Simplify",
486 vec![LogicalFilterExpressionSimplifyRule::create()],
487 ApplyOrder::TopDown,
488 )
489});
490
491static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
492 OptimizationStage::new(
493 "Rewrite Source For Batch",
494 vec![
495 SourceToKafkaScanRule::create(),
496 SourceToIcebergScanRule::create(),
497 ],
498 ApplyOrder::TopDown,
499 )
500});
501
502impl LogicalOptimizer {
503 pub fn predicate_pushdown(
504 plan: PlanRef,
505 explain_trace: bool,
506 ctx: &OptimizerContextRef,
507 ) -> PlanRef {
508 let plan = plan.predicate_pushdown(
509 Condition::true_cond(),
510 &mut PredicatePushdownContext::new(plan.clone()),
511 );
512 if explain_trace {
513 ctx.trace("Predicate Push Down:");
514 ctx.trace(plan.explain_to_string());
515 }
516 plan
517 }
518
519 pub fn subquery_unnesting(
520 mut plan: PlanRef,
521 enable_share_plan: bool,
522 explain_trace: bool,
523 ctx: &OptimizerContextRef,
524 ) -> Result<PlanRef> {
525 if !has_logical_apply(plan.clone()) {
527 return Ok(plan);
528 }
529 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
531 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
532 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
535 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
537 plan = if enable_share_plan {
541 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
542 } else {
543 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
544 };
545 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
546
547 plan.check_apply_elimination()?;
549
550 Ok(plan)
551 }
552
553 pub fn column_pruning(
554 mut plan: PlanRef,
555 explain_trace: bool,
556 ctx: &OptimizerContextRef,
557 ) -> PlanRef {
558 let required_cols = (0..plan.schema().len()).collect_vec();
559 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
560 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
561 if explain_trace {
563 ctx.trace("Prune Columns:");
564 ctx.trace(plan.explain_to_string());
565 }
566
567 if column_pruning_ctx.need_second_round() {
568 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
571 if explain_trace {
572 ctx.trace("Prune Columns (For DAG):");
573 ctx.trace(plan.explain_to_string());
574 }
575 }
576 plan
577 }
578
579 pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
580 let mut v = NowProcTimeFinder::default();
582 plan.visit_exprs_recursive(&mut v);
583 if !v.has() {
584 return plan;
585 }
586
587 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
588
589 let plan = plan.rewrite_exprs_recursive(&mut v);
590
591 if ctx.is_explain_trace() {
592 ctx.trace("Inline Now and ProcTime:");
593 ctx.trace(plan.explain_to_string());
594 }
595 plan
596 }
597
598 pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
599 let ctx = plan.ctx();
600 let explain_trace = ctx.is_explain_trace();
601
602 if explain_trace {
603 ctx.trace("Begin:");
604 ctx.trace(plan.explain_to_string());
605 }
606
607 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
609 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
611 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
613
614 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
618 if enable_share_plan {
619 plan = plan.common_subplan_sharing();
621 plan = plan.prune_share();
622 if explain_trace {
623 ctx.trace("Common Sub-plan Sharing:");
624 ctx.trace(plan.explain_to_string());
625 }
626 } else {
627 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
628
629 plan = ShareSourceRewriter::share_source(plan);
633 if explain_trace {
634 ctx.trace("Share Source:");
635 ctx.trace(plan.explain_to_string());
636 }
637 }
638 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
639 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
640 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
643 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
645
646 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
647 if has_logical_max_one_row(plan.clone()) {
648 bail!("Scalar subquery might produce more than one row.");
652 }
653
654 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
657
658 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
660
661 if plan.ctx().session_ctx().config().enable_join_ordering() {
662 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
666
667 if plan
669 .ctx()
670 .session_ctx()
671 .config()
672 .streaming_enable_bushy_join()
673 {
674 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
675 } else {
676 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
677 }
678 }
679
680 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
683
684 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
686
687 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
689
690 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
691 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
694 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
695 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
696
697 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
698 plan = if force_split_distinct_agg {
701 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
702 } else {
703 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
704 };
705
706 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
707
708 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
709
710 plan = Self::column_pruning(plan, explain_trace, &ctx);
712 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
713
714 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
715 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
716
717 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
718
719 #[cfg(debug_assertions)]
720 InputRefValidator.validate(plan.clone());
721
722 if ctx.is_explain_logical() {
723 match ctx.explain_format() {
724 ExplainFormat::Text => {
725 ctx.store_logical(plan.explain_to_string());
726 }
727 ExplainFormat::Json => {
728 ctx.store_logical(plan.explain_to_json());
729 }
730 ExplainFormat::Xml => {
731 ctx.store_logical(plan.explain_to_xml());
732 }
733 ExplainFormat::Yaml => {
734 ctx.store_logical(plan.explain_to_yaml());
735 }
736 ExplainFormat::Dot => {
737 ctx.store_logical(plan.explain_to_dot());
738 }
739 }
740 }
741
742 Ok(plan)
743 }
744
745 pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
746 let ctx = plan.ctx();
747 let explain_trace = ctx.is_explain_trace();
748
749 if explain_trace {
750 ctx.trace("Begin:");
751 ctx.trace(plan.explain_to_string());
752 }
753
754 plan = Self::inline_now_proc_time(plan, &ctx);
756
757 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
759
760 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
761 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
762 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
763 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
764 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
765 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
766 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
768 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
769 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
770 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
771 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
772 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
774
775 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
776
777 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
781
782 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
784 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
785
786 if plan.ctx().session_ctx().config().enable_join_ordering() {
787 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
791
792 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
794 }
795
796 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
799 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
800 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
801 }
802
803 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
805
806 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
807 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
810 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
811 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
812 }
813 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
814 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
815
816 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
818
819 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
820
821 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
822
823 plan = Self::column_pruning(plan, explain_trace, &ctx);
825 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
826 (#[allow(unused_assignments)]
827 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
828 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
829 }
830
831 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
832 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
833
834 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
835
836 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
837
838 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
839
840 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
841
842 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
843
844 #[cfg(debug_assertions)]
845 InputRefValidator.validate(plan.clone());
846
847 if ctx.is_explain_logical() {
848 match ctx.explain_format() {
849 ExplainFormat::Text => {
850 ctx.store_logical(plan.explain_to_string());
851 }
852 ExplainFormat::Json => {
853 ctx.store_logical(plan.explain_to_json());
854 }
855 ExplainFormat::Xml => {
856 ctx.store_logical(plan.explain_to_xml());
857 }
858 ExplainFormat::Yaml => {
859 ctx.store_logical(plan.explain_to_yaml());
860 }
861 ExplainFormat::Dot => {
862 ctx.store_logical(plan.explain_to_dot());
863 }
864 }
865 }
866
867 Ok(plan)
868 }
869}