1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42 stage_name: &str,
43 ) -> Result<PlanRef<C>> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage<C>,
69 ) -> Result<PlanRef<C>> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage<C>,
80 ) -> Result<PlanRef<C>> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92 stage_name: String,
93 rules: Vec<BoxedRule<C>>,
94 apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use risingwave_sqlparser::ast::ExplainFormat;
113
114use crate::optimizer::plan_node::generic::GenericPlanRef;
115
116pub struct LogicalOptimizer {}
117
118static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
119 OptimizationStage::new(
120 "DAG To Tree",
121 vec![DagToTreeRule::create()],
122 ApplyOrder::TopDown,
123 )
124});
125
126static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
127 OptimizationStage::new(
128 "Convert GENERATE_SERIES Ends With NOW",
129 vec![GenerateSeriesWithNowRule::create()],
130 ApplyOrder::TopDown,
131 )
132});
133
134static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
135 OptimizationStage::new(
136 "Table Function Convert",
137 vec![
138 TableFunctionToFileScanRule::create(),
140 TableFunctionToInternalBackfillProgressRule::create(),
142 TableFunctionToInternalSourceBackfillProgressRule::create(),
144 TableFunctionToPostgresQueryRule::create(),
146 TableFunctionToMySqlQueryRule::create(),
148 TableFunctionToProjectSetRule::create(),
150 ],
151 ApplyOrder::TopDown,
152 )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156 OptimizationStage::new(
157 "Table Function To FileScan",
158 vec![TableFunctionToFileScanRule::create()],
159 ApplyOrder::TopDown,
160 )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164 OptimizationStage::new(
165 "Table Function To PostgresQuery",
166 vec![TableFunctionToPostgresQueryRule::create()],
167 ApplyOrder::TopDown,
168 )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172 OptimizationStage::new(
173 "Table Function To MySQL",
174 vec![TableFunctionToMySqlQueryRule::create()],
175 ApplyOrder::TopDown,
176 )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180 LazyLock::new(|| {
181 OptimizationStage::new(
182 "Table Function To Internal Backfill Progress",
183 vec![TableFunctionToInternalBackfillProgressRule::create()],
184 ApplyOrder::TopDown,
185 )
186 });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189 LazyLock::new(|| {
190 OptimizationStage::new(
191 "Table Function To Internal Source Backfill Progress",
192 vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193 ApplyOrder::TopDown,
194 )
195 });
196
197static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
198 OptimizationStage::new(
199 "Values Extract Project",
200 vec![ValuesExtractProjectRule::create()],
201 ApplyOrder::TopDown,
202 )
203});
204
205static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
206 OptimizationStage::new(
207 "Simple Unnesting",
208 vec![
209 PullUpCorrelatedPredicateRule::create(),
211 PullUpCorrelatedProjectValueRule::create(),
213 PullUpCorrelatedPredicateAggRule::create(),
214 MaxOneRowEliminateRule::create(),
216 ApplyToJoinRule::create(),
218 ],
219 ApplyOrder::BottomUp,
220 )
221});
222
223static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
224 OptimizationStage::new(
225 "Set Operation Merge",
226 vec![
227 UnionMergeRule::create(),
228 IntersectMergeRule::create(),
229 ExceptMergeRule::create(),
230 ],
231 ApplyOrder::BottomUp,
232 )
233});
234
235static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
236 LazyLock::new(|| {
237 OptimizationStage::new(
238 "General Unnesting(Translate Apply)",
239 vec![TranslateApplyRule::create(true)],
240 ApplyOrder::TopDown,
241 )
242 });
243
244static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
245 LazyLock::new(|| {
246 OptimizationStage::new(
247 "General Unnesting(Translate Apply)",
248 vec![TranslateApplyRule::create(false)],
249 ApplyOrder::TopDown,
250 )
251 });
252
253static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
254 OptimizationStage::new(
255 "General Unnesting(Push Down Apply)",
256 vec![
257 ApplyEliminateRule::create(),
258 ApplyAggTransposeRule::create(),
259 ApplyDedupTransposeRule::create(),
260 ApplyFilterTransposeRule::create(),
261 ApplyProjectTransposeRule::create(),
262 ApplyProjectSetTransposeRule::create(),
263 ApplyTopNTransposeRule::create(),
264 ApplyLimitTransposeRule::create(),
265 ApplyJoinTransposeRule::create(),
266 ApplyUnionTransposeRule::create(),
267 ApplyOverWindowTransposeRule::create(),
268 ApplyExpandTransposeRule::create(),
269 ApplyHopWindowTransposeRule::create(),
270 CrossJoinEliminateRule::create(),
271 ApplyShareEliminateRule::create(),
272 ],
273 ApplyOrder::TopDown,
274 )
275});
276
277static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
278 OptimizationStage::new(
279 "To MultiJoin",
280 vec![MergeMultiJoinRule::create()],
281 ApplyOrder::TopDown,
282 )
283});
284
285static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
286 OptimizationStage::new(
287 "Join Ordering".to_owned(),
288 vec![LeftDeepTreeJoinOrderingRule::create()],
289 ApplyOrder::TopDown,
290 )
291});
292
293static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
294 OptimizationStage::new(
295 "Join Ordering".to_owned(),
296 vec![BushyTreeJoinOrderingRule::create()],
297 ApplyOrder::TopDown,
298 )
299});
300
301static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
302 OptimizationStage::new(
303 "Push down filter with now into a left semijoin",
304 vec![
305 SplitNowAndRule::create(),
306 SplitNowOrRule::create(),
307 FilterWithNowToJoinRule::create(),
308 ],
309 ApplyOrder::TopDown,
310 )
311});
312
313static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
314 OptimizationStage::new(
315 "Push down the calculation of inputs of join's condition",
316 vec![PushCalculationOfJoinRule::create()],
317 ApplyOrder::TopDown,
318 )
319});
320
321static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
322 OptimizationStage::new(
323 "Convert Distinct Aggregation",
324 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
325 ApplyOrder::TopDown,
326 )
327});
328
329static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
330 OptimizationStage::new(
331 "Convert Distinct Aggregation",
332 vec![
333 UnionToDistinctRule::create(),
334 DistinctAggRule::create(false),
335 ],
336 ApplyOrder::TopDown,
337 )
338});
339
340static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
341 OptimizationStage::new(
342 "Simplify Aggregation",
343 vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
344 ApplyOrder::TopDown,
345 )
346});
347
348static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
349 OptimizationStage::new(
350 "Join Commute".to_owned(),
351 vec![JoinCommuteRule::create()],
352 ApplyOrder::TopDown,
353 )
354});
355
356static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
357 OptimizationStage::new(
358 "Constant Output Operator Remove",
359 vec![EmptyAggRemoveRule::create()],
360 ApplyOrder::TopDown,
361 )
362});
363
364static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
365 OptimizationStage::new(
366 "Project Remove",
367 vec![
368 ProjectMergeRule::create(),
370 ProjectEliminateRule::create(),
371 TrivialProjectToValuesRule::create(),
372 UnionInputValuesMergeRule::create(),
373 JoinProjectTransposeRule::create(),
374 ProjectJoinMergeRule::create(),
377 AggProjectMergeRule::create(),
378 ],
379 ApplyOrder::BottomUp,
380 )
381});
382
383static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
384 OptimizationStage::new(
385 "Split Over Window",
386 vec![OverWindowSplitRule::create()],
387 ApplyOrder::TopDown,
388 )
389});
390
391static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
395 OptimizationStage::new(
396 "Convert Over Window",
397 vec![
398 ProjectMergeRule::create(),
399 ProjectEliminateRule::create(),
400 TrivialProjectToValuesRule::create(),
401 UnionInputValuesMergeRule::create(),
402 OverWindowToAggAndJoinRule::create(),
403 OverWindowToTopNRule::create(),
404 ],
405 ApplyOrder::TopDown,
406 )
407});
408
409static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
410 OptimizationStage::new(
411 "Merge Over Window",
412 vec![OverWindowMergeRule::create()],
413 ApplyOrder::TopDown,
414 )
415});
416
417static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
418 OptimizationStage::new(
419 "Rewrite Like Expr",
420 vec![RewriteLikeExprRule::create()],
421 ApplyOrder::TopDown,
422 )
423});
424
425static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
426 OptimizationStage::new(
427 "TopN/SimpleAgg on Index",
428 vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
429 ApplyOrder::TopDown,
430 )
431});
432
433static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
434 OptimizationStage::new(
435 "Void always-false filter's downstream",
436 vec![AlwaysFalseFilterRule::create()],
437 ApplyOrder::TopDown,
438 )
439});
440
441static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
442 OptimizationStage::new(
443 "Push Down Limit",
444 vec![LimitPushDownRule::create()],
445 ApplyOrder::TopDown,
446 )
447});
448
449static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
450 OptimizationStage::new(
451 "Pull Up Hop",
452 vec![PullUpHopRule::create()],
453 ApplyOrder::BottomUp,
454 )
455});
456
457static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
458 OptimizationStage::new(
459 "Set Operation To Join",
460 vec![
461 IntersectToSemiJoinRule::create(),
462 ExceptToAntiJoinRule::create(),
463 ],
464 ApplyOrder::BottomUp,
465 )
466});
467
468static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
469 OptimizationStage::new(
470 "Grouping Sets",
471 vec![
472 GroupingSetsToExpandRule::create(),
473 ExpandToProjectRule::create(),
474 ],
475 ApplyOrder::TopDown,
476 )
477});
478
479static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
480 OptimizationStage::new(
481 "Common Sub Expression Extract",
482 vec![CommonSubExprExtractRule::create()],
483 ApplyOrder::TopDown,
484 )
485});
486
487static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
488 OptimizationStage::new(
489 "Logical Filter Expression Simplify",
490 vec![LogicalFilterExpressionSimplifyRule::create()],
491 ApplyOrder::TopDown,
492 )
493});
494
495static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
496 OptimizationStage::new(
497 "Rewrite Source For Batch",
498 vec![
499 SourceToKafkaScanRule::create(),
500 SourceToIcebergScanRule::create(),
501 ],
502 ApplyOrder::TopDown,
503 )
504});
505
506static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
507 OptimizationStage::new(
508 "TopN to Vector Search",
509 vec![TopNToVectorSearchRule::create()],
510 ApplyOrder::BottomUp,
511 )
512});
513
514impl LogicalOptimizer {
515 pub fn predicate_pushdown(
516 plan: LogicalPlanRef,
517 explain_trace: bool,
518 ctx: &OptimizerContextRef,
519 ) -> LogicalPlanRef {
520 let plan = plan.predicate_pushdown(
521 Condition::true_cond(),
522 &mut PredicatePushdownContext::new(plan.clone()),
523 );
524 if explain_trace {
525 ctx.trace("Predicate Push Down:");
526 ctx.trace(plan.explain_to_string());
527 }
528 plan
529 }
530
531 pub fn subquery_unnesting(
532 mut plan: LogicalPlanRef,
533 enable_share_plan: bool,
534 explain_trace: bool,
535 ctx: &OptimizerContextRef,
536 ) -> Result<LogicalPlanRef> {
537 if !has_logical_apply(plan.clone()) {
539 return Ok(plan);
540 }
541 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
543 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
544 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
547 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
549 plan = if enable_share_plan {
553 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
554 } else {
555 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
556 };
557 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
558
559 plan.check_apply_elimination()?;
561
562 Ok(plan)
563 }
564
565 pub fn column_pruning(
566 mut plan: LogicalPlanRef,
567 explain_trace: bool,
568 ctx: &OptimizerContextRef,
569 ) -> LogicalPlanRef {
570 let required_cols = (0..plan.schema().len()).collect_vec();
571 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
572 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
573 if explain_trace {
575 ctx.trace("Prune Columns:");
576 ctx.trace(plan.explain_to_string());
577 }
578
579 if column_pruning_ctx.need_second_round() {
580 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
583 if explain_trace {
584 ctx.trace("Prune Columns (For DAG):");
585 ctx.trace(plan.explain_to_string());
586 }
587 }
588 plan
589 }
590
591 pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
592 let mut v = NowProcTimeFinder::default();
594 plan.visit_exprs_recursive(&mut v);
595 if !v.has() {
596 return plan;
597 }
598
599 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
600
601 let plan = plan.rewrite_exprs_recursive(&mut v);
602
603 if ctx.is_explain_trace() {
604 ctx.trace("Inline Now and ProcTime:");
605 ctx.trace(plan.explain_to_string());
606 }
607 plan
608 }
609
610 pub fn gen_optimized_logical_plan_for_stream(
611 mut plan: LogicalPlanRef,
612 ) -> Result<LogicalPlanRef> {
613 let ctx = plan.ctx();
614 let explain_trace = ctx.is_explain_trace();
615
616 if explain_trace {
617 ctx.trace("Begin:");
618 ctx.trace(plan.explain_to_string());
619 }
620
621 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
623 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
625 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
627
628 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
632 if enable_share_plan {
633 plan = plan.common_subplan_sharing();
635 plan = plan.prune_share();
636 if explain_trace {
637 ctx.trace("Common Sub-plan Sharing:");
638 ctx.trace(plan.explain_to_string());
639 }
640 } else {
641 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
642
643 plan = ShareSourceRewriter::share_source(plan);
647 if explain_trace {
648 ctx.trace("Share Source:");
649 ctx.trace(plan.explain_to_string());
650 }
651 }
652 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
653 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
654 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
657 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
659
660 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
661 if has_logical_max_one_row(plan.clone()) {
662 bail!("Scalar subquery might produce more than one row.");
666 }
667
668 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
671
672 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
674
675 if plan.ctx().session_ctx().config().enable_join_ordering() {
676 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
680
681 if plan
683 .ctx()
684 .session_ctx()
685 .config()
686 .streaming_enable_bushy_join()
687 {
688 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
689 } else {
690 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
691 }
692 }
693
694 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
697
698 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
700
701 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
703
704 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
705 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
708 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
709 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
710
711 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
712 plan = if force_split_distinct_agg {
715 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
716 } else {
717 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
718 };
719
720 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
721
722 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
723
724 plan = Self::column_pruning(plan, explain_trace, &ctx);
726 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
727
728 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
729 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
730
731 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
732
733 #[cfg(debug_assertions)]
734 InputRefValidator.validate(plan.clone());
735
736 if ctx.is_explain_logical() {
737 match ctx.explain_format() {
738 ExplainFormat::Text => {
739 ctx.store_logical(plan.explain_to_string());
740 }
741 ExplainFormat::Json => {
742 ctx.store_logical(plan.explain_to_json());
743 }
744 ExplainFormat::Xml => {
745 ctx.store_logical(plan.explain_to_xml());
746 }
747 ExplainFormat::Yaml => {
748 ctx.store_logical(plan.explain_to_yaml());
749 }
750 ExplainFormat::Dot => {
751 ctx.store_logical(plan.explain_to_dot());
752 }
753 }
754 }
755
756 Ok(plan)
757 }
758
759 pub fn gen_optimized_logical_plan_for_batch(
760 mut plan: LogicalPlanRef,
761 ) -> Result<LogicalPlanRef> {
762 let ctx = plan.ctx();
763 let explain_trace = ctx.is_explain_trace();
764
765 if explain_trace {
766 ctx.trace("Begin:");
767 ctx.trace(plan.explain_to_string());
768 }
769
770 plan = Self::inline_now_proc_time(plan, &ctx);
772
773 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
775
776 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
777 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
778 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
779 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
780 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
781 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
782 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
784 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
785 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
786 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
787 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
788 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
790
791 plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
792
793 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
794
795 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
799
800 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
802 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
803
804 if plan.ctx().session_ctx().config().enable_join_ordering() {
805 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
809
810 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
812 }
813
814 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
817 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
818 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
819 }
820
821 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
823
824 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
825 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
828 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
829 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
830 }
831 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
832 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
833
834 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
836
837 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
838
839 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
840
841 plan = Self::column_pruning(plan, explain_trace, &ctx);
843 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
844 (#[allow(unused_assignments)]
845 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
846 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
847 }
848
849 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
850 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
851
852 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
853
854 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
855
856 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
857
858 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
859
860 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
861
862 #[cfg(debug_assertions)]
863 InputRefValidator.validate(plan.clone());
864
865 if ctx.is_explain_logical() {
866 match ctx.explain_format() {
867 ExplainFormat::Text => {
868 ctx.store_logical(plan.explain_to_string());
869 }
870 ExplainFormat::Json => {
871 ctx.store_logical(plan.explain_to_json());
872 }
873 ExplainFormat::Xml => {
874 ctx.store_logical(plan.explain_to_xml());
875 }
876 ExplainFormat::Yaml => {
877 ctx.store_logical(plan.explain_to_yaml());
878 }
879 ExplainFormat::Dot => {
880 ctx.store_logical(plan.explain_to_dot());
881 }
882 }
883 }
884
885 Ok(plan)
886 }
887}