1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::RewriteExprsRecursive;
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl PlanRef {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_>,
42 stage_name: &str,
43 ) -> Result<PlanRef> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage,
69 ) -> Result<PlanRef> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage,
80 ) -> Result<PlanRef> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage {
92 stage_name: String,
93 rules: Vec<BoxedRule>,
94 apply_order: ApplyOrder,
95}
96
97impl OptimizationStage {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117 OptimizationStage::new(
118 "DAG To Tree",
119 vec![DagToTreeRule::create()],
120 ApplyOrder::TopDown,
121 )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125 OptimizationStage::new(
126 "Convert GENERATE_SERIES Ends With NOW",
127 vec![GenerateSeriesWithNowRule::create()],
128 ApplyOrder::TopDown,
129 )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133 OptimizationStage::new(
134 "Table Function Convert",
135 vec![
136 TableFunctionToFileScanRule::create(),
138 TableFunctionToInternalBackfillProgressRule::create(),
140 TableFunctionToPostgresQueryRule::create(),
142 TableFunctionToMySqlQueryRule::create(),
144 TableFunctionToProjectSetRule::create(),
146 ],
147 ApplyOrder::TopDown,
148 )
149});
150
151static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
152 OptimizationStage::new(
153 "Table Function To FileScan",
154 vec![TableFunctionToFileScanRule::create()],
155 ApplyOrder::TopDown,
156 )
157});
158
159static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
160 OptimizationStage::new(
161 "Table Function To PostgresQuery",
162 vec![TableFunctionToPostgresQueryRule::create()],
163 ApplyOrder::TopDown,
164 )
165});
166
167static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
168 OptimizationStage::new(
169 "Table Function To MySQL",
170 vec![TableFunctionToMySqlQueryRule::create()],
171 ApplyOrder::TopDown,
172 )
173});
174
175static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
176 LazyLock::new(|| {
177 OptimizationStage::new(
178 "Table Function To Internal Backfill Progress",
179 vec![TableFunctionToInternalBackfillProgressRule::create()],
180 ApplyOrder::TopDown,
181 )
182 });
183
184static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
185 OptimizationStage::new(
186 "Values Extract Project",
187 vec![ValuesExtractProjectRule::create()],
188 ApplyOrder::TopDown,
189 )
190});
191
192static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
193 OptimizationStage::new(
194 "Simple Unnesting",
195 vec![
196 MaxOneRowEliminateRule::create(),
198 ApplyToJoinRule::create(),
200 PullUpCorrelatedPredicateRule::create(),
202 PullUpCorrelatedPredicateAggRule::create(),
203 ],
204 ApplyOrder::BottomUp,
205 )
206});
207
208static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
209 OptimizationStage::new(
210 "Set Operation Merge",
211 vec![
212 UnionMergeRule::create(),
213 IntersectMergeRule::create(),
214 ExceptMergeRule::create(),
215 ],
216 ApplyOrder::BottomUp,
217 )
218});
219
220static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
221 LazyLock::new(|| {
222 OptimizationStage::new(
223 "General Unnesting(Translate Apply)",
224 vec![TranslateApplyRule::create(true)],
225 ApplyOrder::TopDown,
226 )
227 });
228
229static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
230 LazyLock::new(|| {
231 OptimizationStage::new(
232 "General Unnesting(Translate Apply)",
233 vec![TranslateApplyRule::create(false)],
234 ApplyOrder::TopDown,
235 )
236 });
237
238static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
239 OptimizationStage::new(
240 "General Unnesting(Push Down Apply)",
241 vec![
242 ApplyEliminateRule::create(),
243 ApplyAggTransposeRule::create(),
244 ApplyDedupTransposeRule::create(),
245 ApplyFilterTransposeRule::create(),
246 ApplyProjectTransposeRule::create(),
247 ApplyProjectSetTransposeRule::create(),
248 ApplyTopNTransposeRule::create(),
249 ApplyLimitTransposeRule::create(),
250 ApplyJoinTransposeRule::create(),
251 ApplyUnionTransposeRule::create(),
252 ApplyOverWindowTransposeRule::create(),
253 ApplyExpandTransposeRule::create(),
254 ApplyHopWindowTransposeRule::create(),
255 CrossJoinEliminateRule::create(),
256 ApplyShareEliminateRule::create(),
257 ],
258 ApplyOrder::TopDown,
259 )
260});
261
262static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
263 OptimizationStage::new(
264 "To MultiJoin",
265 vec![MergeMultiJoinRule::create()],
266 ApplyOrder::TopDown,
267 )
268});
269
270static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
271 OptimizationStage::new(
272 "Join Ordering".to_owned(),
273 vec![LeftDeepTreeJoinOrderingRule::create()],
274 ApplyOrder::TopDown,
275 )
276});
277
278static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
279 OptimizationStage::new(
280 "Join Ordering".to_owned(),
281 vec![BushyTreeJoinOrderingRule::create()],
282 ApplyOrder::TopDown,
283 )
284});
285
286static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
287 OptimizationStage::new(
288 "Push down filter with now into a left semijoin",
289 vec![
290 SplitNowAndRule::create(),
291 SplitNowOrRule::create(),
292 FilterWithNowToJoinRule::create(),
293 ],
294 ApplyOrder::TopDown,
295 )
296});
297
298static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
299 OptimizationStage::new(
300 "Push down the calculation of inputs of join's condition",
301 vec![PushCalculationOfJoinRule::create()],
302 ApplyOrder::TopDown,
303 )
304});
305
306static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
307 OptimizationStage::new(
308 "Convert Distinct Aggregation",
309 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
310 ApplyOrder::TopDown,
311 )
312});
313
314static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
315 OptimizationStage::new(
316 "Convert Distinct Aggregation",
317 vec![
318 UnionToDistinctRule::create(),
319 DistinctAggRule::create(false),
320 ],
321 ApplyOrder::TopDown,
322 )
323});
324
325static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
326 OptimizationStage::new(
327 "Simplify Aggregation",
328 vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
329 ApplyOrder::TopDown,
330 )
331});
332
333static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
334 OptimizationStage::new(
335 "Join Commute".to_owned(),
336 vec![JoinCommuteRule::create()],
337 ApplyOrder::TopDown,
338 )
339});
340
341static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
342 OptimizationStage::new(
343 "Constant Output Operator Remove",
344 vec![EmptyAggRemoveRule::create()],
345 ApplyOrder::TopDown,
346 )
347});
348
349static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
350 OptimizationStage::new(
351 "Project Remove",
352 vec![
353 ProjectMergeRule::create(),
355 ProjectEliminateRule::create(),
356 TrivialProjectToValuesRule::create(),
357 UnionInputValuesMergeRule::create(),
358 JoinProjectTransposeRule::create(),
359 ProjectJoinMergeRule::create(),
362 AggProjectMergeRule::create(),
363 ],
364 ApplyOrder::BottomUp,
365 )
366});
367
368static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
369 OptimizationStage::new(
370 "Split Over Window",
371 vec![OverWindowSplitRule::create()],
372 ApplyOrder::TopDown,
373 )
374});
375
376static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
380 OptimizationStage::new(
381 "Convert Over Window",
382 vec![
383 ProjectMergeRule::create(),
384 ProjectEliminateRule::create(),
385 TrivialProjectToValuesRule::create(),
386 UnionInputValuesMergeRule::create(),
387 OverWindowToAggAndJoinRule::create(),
388 OverWindowToTopNRule::create(),
389 ],
390 ApplyOrder::TopDown,
391 )
392});
393
394static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
395 OptimizationStage::new(
396 "Merge Over Window",
397 vec![OverWindowMergeRule::create()],
398 ApplyOrder::TopDown,
399 )
400});
401
402static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
403 OptimizationStage::new(
404 "Rewrite Like Expr",
405 vec![RewriteLikeExprRule::create()],
406 ApplyOrder::TopDown,
407 )
408});
409
410static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
411 OptimizationStage::new(
412 "TopN/SimpleAgg on Index",
413 vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
414 ApplyOrder::TopDown,
415 )
416});
417
418static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
419 OptimizationStage::new(
420 "Void always-false filter's downstream",
421 vec![AlwaysFalseFilterRule::create()],
422 ApplyOrder::TopDown,
423 )
424});
425
426static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
427 OptimizationStage::new(
428 "Push Down Limit",
429 vec![LimitPushDownRule::create()],
430 ApplyOrder::TopDown,
431 )
432});
433
434static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
435 OptimizationStage::new(
436 "Pull Up Hop",
437 vec![PullUpHopRule::create()],
438 ApplyOrder::BottomUp,
439 )
440});
441
442static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
443 OptimizationStage::new(
444 "Set Operation To Join",
445 vec![
446 IntersectToSemiJoinRule::create(),
447 ExceptToAntiJoinRule::create(),
448 ],
449 ApplyOrder::BottomUp,
450 )
451});
452
453static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
454 OptimizationStage::new(
455 "Grouping Sets",
456 vec![
457 GroupingSetsToExpandRule::create(),
458 ExpandToProjectRule::create(),
459 ],
460 ApplyOrder::TopDown,
461 )
462});
463
464static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
465 OptimizationStage::new(
466 "Common Sub Expression Extract",
467 vec![CommonSubExprExtractRule::create()],
468 ApplyOrder::TopDown,
469 )
470});
471
472static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
473 OptimizationStage::new(
474 "Logical Filter Expression Simplify",
475 vec![LogicalFilterExpressionSimplifyRule::create()],
476 ApplyOrder::TopDown,
477 )
478});
479
480static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
481 OptimizationStage::new(
482 "Rewrite Source For Batch",
483 vec![
484 SourceToKafkaScanRule::create(),
485 SourceToIcebergScanRule::create(),
486 ],
487 ApplyOrder::TopDown,
488 )
489});
490
491impl LogicalOptimizer {
492 pub fn predicate_pushdown(
493 plan: PlanRef,
494 explain_trace: bool,
495 ctx: &OptimizerContextRef,
496 ) -> PlanRef {
497 let plan = plan.predicate_pushdown(
498 Condition::true_cond(),
499 &mut PredicatePushdownContext::new(plan.clone()),
500 );
501 if explain_trace {
502 ctx.trace("Predicate Push Down:");
503 ctx.trace(plan.explain_to_string());
504 }
505 plan
506 }
507
508 pub fn subquery_unnesting(
509 mut plan: PlanRef,
510 enable_share_plan: bool,
511 explain_trace: bool,
512 ctx: &OptimizerContextRef,
513 ) -> Result<PlanRef> {
514 if !has_logical_apply(plan.clone()) {
516 return Ok(plan);
517 }
518 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
520 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
521 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
524 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
526 plan = if enable_share_plan {
530 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
531 } else {
532 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
533 };
534 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
535
536 plan.check_apply_elimination()?;
538
539 Ok(plan)
540 }
541
542 pub fn column_pruning(
543 mut plan: PlanRef,
544 explain_trace: bool,
545 ctx: &OptimizerContextRef,
546 ) -> PlanRef {
547 let required_cols = (0..plan.schema().len()).collect_vec();
548 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
549 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
550 if explain_trace {
552 ctx.trace("Prune Columns:");
553 ctx.trace(plan.explain_to_string());
554 }
555
556 if column_pruning_ctx.need_second_round() {
557 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
560 if explain_trace {
561 ctx.trace("Prune Columns (For DAG):");
562 ctx.trace(plan.explain_to_string());
563 }
564 }
565 plan
566 }
567
568 pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
569 let mut v = NowProcTimeFinder::default();
571 plan.visit_exprs_recursive(&mut v);
572 if !v.has() {
573 return plan;
574 }
575
576 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
577
578 let plan = plan.rewrite_exprs_recursive(&mut v);
579
580 if ctx.is_explain_trace() {
581 ctx.trace("Inline Now and ProcTime:");
582 ctx.trace(plan.explain_to_string());
583 }
584 plan
585 }
586
587 pub fn gen_optimized_logical_plan_for_stream(mut plan: PlanRef) -> Result<PlanRef> {
588 let ctx = plan.ctx();
589 let explain_trace = ctx.is_explain_trace();
590
591 if explain_trace {
592 ctx.trace("Begin:");
593 ctx.trace(plan.explain_to_string());
594 }
595
596 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
598 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
600 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
602
603 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
607 if enable_share_plan {
608 plan = plan.common_subplan_sharing();
610 plan = plan.prune_share();
611 if explain_trace {
612 ctx.trace("Common Sub-plan Sharing:");
613 ctx.trace(plan.explain_to_string());
614 }
615 } else {
616 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
617
618 plan = ShareSourceRewriter::share_source(plan);
622 if explain_trace {
623 ctx.trace("Share Source:");
624 ctx.trace(plan.explain_to_string());
625 }
626 }
627 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
628 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
629 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
632 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
634
635 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
636 if has_logical_max_one_row(plan.clone()) {
637 bail!("Scalar subquery might produce more than one row.");
641 }
642
643 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
646
647 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
649
650 if plan.ctx().session_ctx().config().enable_join_ordering() {
651 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
655
656 if plan
658 .ctx()
659 .session_ctx()
660 .config()
661 .streaming_enable_bushy_join()
662 {
663 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
664 } else {
665 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
666 }
667 }
668
669 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
672
673 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
675
676 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
678
679 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
680 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
683 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
684 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
685
686 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
687 plan = if force_split_distinct_agg {
690 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
691 } else {
692 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
693 };
694
695 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
696
697 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
698
699 plan = Self::column_pruning(plan, explain_trace, &ctx);
701 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
702
703 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
704 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
705
706 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
707
708 #[cfg(debug_assertions)]
709 InputRefValidator.validate(plan.clone());
710
711 if ctx.is_explain_logical() {
712 match ctx.explain_format() {
713 ExplainFormat::Text => {
714 ctx.store_logical(plan.explain_to_string());
715 }
716 ExplainFormat::Json => {
717 ctx.store_logical(plan.explain_to_json());
718 }
719 ExplainFormat::Xml => {
720 ctx.store_logical(plan.explain_to_xml());
721 }
722 ExplainFormat::Yaml => {
723 ctx.store_logical(plan.explain_to_yaml());
724 }
725 ExplainFormat::Dot => {
726 ctx.store_logical(plan.explain_to_dot());
727 }
728 }
729 }
730
731 Ok(plan)
732 }
733
734 pub fn gen_optimized_logical_plan_for_batch(mut plan: PlanRef) -> Result<PlanRef> {
735 let ctx = plan.ctx();
736 let explain_trace = ctx.is_explain_trace();
737
738 if explain_trace {
739 ctx.trace("Begin:");
740 ctx.trace(plan.explain_to_string());
741 }
742
743 plan = Self::inline_now_proc_time(plan, &ctx);
745
746 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
748
749 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
750 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
751 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
752 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
753 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
754 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
755 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
757 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
758 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
759 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
760 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
762
763 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
764
765 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
769
770 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
772 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
773
774 if plan.ctx().session_ctx().config().enable_join_ordering() {
775 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
779
780 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
782 }
783
784 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
787 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
788 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
789 }
790
791 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
793
794 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
795 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
798 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
799 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
800 }
801 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
802 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
803
804 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
806
807 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
808
809 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
810
811 plan = Self::column_pruning(plan, explain_trace, &ctx);
813 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
814 (#[allow(unused_assignments)]
815 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
816 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
817 }
818
819 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
820 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
821
822 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
823
824 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
825
826 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
827
828 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
829
830 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
831
832 #[cfg(debug_assertions)]
833 InputRefValidator.validate(plan.clone());
834
835 if ctx.is_explain_logical() {
836 match ctx.explain_format() {
837 ExplainFormat::Text => {
838 ctx.store_logical(plan.explain_to_string());
839 }
840 ExplainFormat::Json => {
841 ctx.store_logical(plan.explain_to_json());
842 }
843 ExplainFormat::Xml => {
844 ctx.store_logical(plan.explain_to_xml());
845 }
846 ExplainFormat::Yaml => {
847 ctx.store_logical(plan.explain_to_yaml());
848 }
849 ExplainFormat::Dot => {
850 ctx.store_logical(plan.explain_to_dot());
851 }
852 }
853 }
854
855 Ok(plan)
856 }
857}