1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42 stage_name: &str,
43 ) -> Result<PlanRef<C>> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage<C>,
69 ) -> Result<PlanRef<C>> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage<C>,
80 ) -> Result<PlanRef<C>> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92 stage_name: String,
93 rules: Vec<BoxedRule<C>>,
94 apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use crate::optimizer::plan_node::generic::GenericPlanRef;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117 OptimizationStage::new(
118 "DAG To Tree",
119 vec![DagToTreeRule::create()],
120 ApplyOrder::TopDown,
121 )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125 OptimizationStage::new(
126 "Convert GENERATE_SERIES Ends With NOW",
127 vec![GenerateSeriesWithNowRule::create()],
128 ApplyOrder::TopDown,
129 )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133 OptimizationStage::new(
134 "Table Function Convert",
135 vec![
136 TableFunctionToFileScanRule::create(),
138 TableFunctionToInternalBackfillProgressRule::create(),
140 TableFunctionToInternalSourceBackfillProgressRule::create(),
142 TableFunctionToInternalGetChannelDeltaStatsRule::create(),
144 TableFunctionToPostgresQueryRule::create(),
146 TableFunctionToMySqlQueryRule::create(),
148 TableFunctionToProjectSetRule::create(),
150 ],
151 ApplyOrder::TopDown,
152 )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156 OptimizationStage::new(
157 "Table Function To FileScan",
158 vec![TableFunctionToFileScanRule::create()],
159 ApplyOrder::TopDown,
160 )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164 OptimizationStage::new(
165 "Table Function To PostgresQuery",
166 vec![TableFunctionToPostgresQueryRule::create()],
167 ApplyOrder::TopDown,
168 )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172 OptimizationStage::new(
173 "Table Function To MySQL",
174 vec![TableFunctionToMySqlQueryRule::create()],
175 ApplyOrder::TopDown,
176 )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180 LazyLock::new(|| {
181 OptimizationStage::new(
182 "Table Function To Internal Backfill Progress",
183 vec![TableFunctionToInternalBackfillProgressRule::create()],
184 ApplyOrder::TopDown,
185 )
186 });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189 LazyLock::new(|| {
190 OptimizationStage::new(
191 "Table Function To Internal Source Backfill Progress",
192 vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193 ApplyOrder::TopDown,
194 )
195 });
196
197static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
198 LazyLock::new(|| {
199 OptimizationStage::new(
200 "Table Function To Internal Get Channel Delta Stats",
201 vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
202 ApplyOrder::TopDown,
203 )
204 });
205
206static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
207 OptimizationStage::new(
208 "Values Extract Project",
209 vec![ValuesExtractProjectRule::create()],
210 ApplyOrder::TopDown,
211 )
212});
213
214static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
215 OptimizationStage::new(
216 "Simple Unnesting",
217 vec![
218 PullUpCorrelatedPredicateRule::create(),
220 PullUpCorrelatedProjectValueRule::create(),
222 PullUpCorrelatedPredicateAggRule::create(),
223 MaxOneRowEliminateRule::create(),
225 ApplyToJoinRule::create(),
227 ],
228 ApplyOrder::BottomUp,
229 )
230});
231
232static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
233 OptimizationStage::new(
234 "Set Operation Merge",
235 vec![
236 UnionMergeRule::create(),
237 IntersectMergeRule::create(),
238 ExceptMergeRule::create(),
239 ],
240 ApplyOrder::BottomUp,
241 )
242});
243
244static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
245 LazyLock::new(|| {
246 OptimizationStage::new(
247 "General Unnesting(Translate Apply)",
248 vec![TranslateApplyRule::create(true)],
249 ApplyOrder::TopDown,
250 )
251 });
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
254 LazyLock::new(|| {
255 OptimizationStage::new(
256 "General Unnesting(Translate Apply)",
257 vec![TranslateApplyRule::create(false)],
258 ApplyOrder::TopDown,
259 )
260 });
261
262static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
263 OptimizationStage::new(
264 "General Unnesting(Push Down Apply)",
265 vec![
266 ApplyEliminateRule::create(),
267 ApplyAggTransposeRule::create(),
268 ApplyDedupTransposeRule::create(),
269 ApplyFilterTransposeRule::create(),
270 ApplyProjectTransposeRule::create(),
271 ApplyProjectSetTransposeRule::create(),
272 ApplyTopNTransposeRule::create(),
273 ApplyLimitTransposeRule::create(),
274 ApplyJoinTransposeRule::create(),
275 ApplyUnionTransposeRule::create(),
276 ApplyOverWindowTransposeRule::create(),
277 ApplyExpandTransposeRule::create(),
278 ApplyHopWindowTransposeRule::create(),
279 CrossJoinEliminateRule::create(),
280 ApplyShareEliminateRule::create(),
281 ],
282 ApplyOrder::TopDown,
283 )
284});
285
286static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
287 OptimizationStage::new(
288 "To MultiJoin",
289 vec![MergeMultiJoinRule::create()],
290 ApplyOrder::TopDown,
291 )
292});
293
294static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
295 OptimizationStage::new(
296 "Join Ordering".to_owned(),
297 vec![LeftDeepTreeJoinOrderingRule::create()],
298 ApplyOrder::TopDown,
299 )
300});
301
302static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
303 OptimizationStage::new(
304 "Join Ordering".to_owned(),
305 vec![BushyTreeJoinOrderingRule::create()],
306 ApplyOrder::TopDown,
307 )
308});
309
310static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
311 OptimizationStage::new(
312 "Push down filter with now into a left semijoin",
313 vec![
314 SplitNowAndRule::create(),
315 SplitNowOrRule::create(),
316 FilterWithNowToJoinRule::create(),
317 ],
318 ApplyOrder::TopDown,
319 )
320});
321
322static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
323 OptimizationStage::new(
324 "Push down the calculation of inputs of join's condition",
325 vec![PushCalculationOfJoinRule::create()],
326 ApplyOrder::TopDown,
327 )
328});
329
330static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
331 OptimizationStage::new(
332 "Convert Distinct Aggregation",
333 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
334 ApplyOrder::TopDown,
335 )
336});
337
338static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
339 OptimizationStage::new(
340 "Convert Distinct Aggregation",
341 vec![
342 UnionToDistinctRule::create(),
343 DistinctAggRule::create(false),
344 ],
345 ApplyOrder::TopDown,
346 )
347});
348
349static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
350 OptimizationStage::new(
351 "Simplify Aggregation",
352 vec![
353 AggGroupBySimplifyRule::create(),
354 AggCallMergeRule::create(),
355 UnifyFirstLastValueRule::create(),
356 ],
357 ApplyOrder::TopDown,
358 )
359});
360
361static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
362 OptimizationStage::new(
363 "Join Commute".to_owned(),
364 vec![JoinCommuteRule::create()],
365 ApplyOrder::TopDown,
366 )
367});
368
369static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
370 OptimizationStage::new(
371 "Constant Output Operator Remove",
372 vec![EmptyAggRemoveRule::create()],
373 ApplyOrder::TopDown,
374 )
375});
376
377static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
378 OptimizationStage::new(
379 "Project Remove",
380 vec![
381 ProjectMergeRule::create(),
383 ProjectEliminateRule::create(),
384 TrivialProjectToValuesRule::create(),
385 UnionInputValuesMergeRule::create(),
386 JoinProjectTransposeRule::create(),
387 ProjectJoinMergeRule::create(),
390 AggProjectMergeRule::create(),
391 ],
392 ApplyOrder::BottomUp,
393 )
394});
395
396static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
397 OptimizationStage::new(
398 "Split Over Window",
399 vec![OverWindowSplitRule::create()],
400 ApplyOrder::TopDown,
401 )
402});
403
404static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
408 OptimizationStage::new(
409 "Convert Over Window",
410 vec![
411 ProjectMergeRule::create(),
412 ProjectEliminateRule::create(),
413 TrivialProjectToValuesRule::create(),
414 UnionInputValuesMergeRule::create(),
415 OverWindowToAggAndJoinRule::create(),
416 OverWindowToTopNRule::create(),
417 ],
418 ApplyOrder::TopDown,
419 )
420});
421
422static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
424 OptimizationStage::new(
425 "Convert Over Window",
426 vec![
427 ProjectMergeRule::create(),
428 ProjectEliminateRule::create(),
429 TrivialProjectToValuesRule::create(),
430 UnionInputValuesMergeRule::create(),
431 OverWindowToAggAndJoinRule::create(),
432 ],
433 ApplyOrder::TopDown,
434 )
435});
436
437static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
438 OptimizationStage::new(
439 "Merge Over Window",
440 vec![OverWindowMergeRule::create()],
441 ApplyOrder::TopDown,
442 )
443});
444
445static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
446 OptimizationStage::new(
447 "Rewrite Like Expr",
448 vec![RewriteLikeExprRule::create()],
449 ApplyOrder::TopDown,
450 )
451});
452
453static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
454 OptimizationStage::new(
455 "TopN/SimpleAgg on Index",
456 vec![TopNOnIndexRule::create(), MinMaxOnIndexRule::create()],
457 ApplyOrder::TopDown,
458 )
459});
460
461static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
462 OptimizationStage::new(
463 "Void always-false filter's downstream",
464 vec![AlwaysFalseFilterRule::create()],
465 ApplyOrder::TopDown,
466 )
467});
468
469static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
470 OptimizationStage::new(
471 "Push Down Limit",
472 vec![LimitPushDownRule::create()],
473 ApplyOrder::TopDown,
474 )
475});
476
477static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
478 OptimizationStage::new(
479 "Pull Up Hop",
480 vec![PullUpHopRule::create()],
481 ApplyOrder::BottomUp,
482 )
483});
484
485static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
486 OptimizationStage::new(
487 "Set Operation To Join",
488 vec![
489 IntersectToSemiJoinRule::create(),
490 ExceptToAntiJoinRule::create(),
491 ],
492 ApplyOrder::BottomUp,
493 )
494});
495
496static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
497 OptimizationStage::new(
498 "Grouping Sets",
499 vec![
500 GroupingSetsToExpandRule::create(),
501 ExpandToProjectRule::create(),
502 ],
503 ApplyOrder::TopDown,
504 )
505});
506
507static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
508 OptimizationStage::new(
509 "Common Sub Expression Extract",
510 vec![CommonSubExprExtractRule::create()],
511 ApplyOrder::TopDown,
512 )
513});
514
515static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
516 OptimizationStage::new(
517 "Logical Filter Expression Simplify",
518 vec![LogicalFilterExpressionSimplifyRule::create()],
519 ApplyOrder::TopDown,
520 )
521});
522
523static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
524 OptimizationStage::new(
525 "Rewrite Source For Batch",
526 vec![
527 SourceToKafkaScanRule::create(),
528 SourceToIcebergScanRule::create(),
529 ],
530 ApplyOrder::TopDown,
531 )
532});
533
534static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
535 OptimizationStage::new(
536 "TopN to Vector Search",
537 vec![TopNToVectorSearchRule::create()],
538 ApplyOrder::BottomUp,
539 )
540});
541
542static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
543 LazyLock::new(|| {
544 OptimizationStage::new(
545 "Correlated TopN to Vector Search",
546 vec![CorrelatedTopNToVectorSearchRule::create(true)],
547 ApplyOrder::BottomUp,
548 )
549 });
550
551static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
552 LazyLock::new(|| {
553 OptimizationStage::new(
554 "Correlated TopN to Vector Search",
555 vec![CorrelatedTopNToVectorSearchRule::create(false)],
556 ApplyOrder::BottomUp,
557 )
558 });
559
560impl LogicalOptimizer {
561 pub fn predicate_pushdown(
562 plan: LogicalPlanRef,
563 explain_trace: bool,
564 ctx: &OptimizerContextRef,
565 ) -> LogicalPlanRef {
566 let plan = plan.predicate_pushdown(
567 Condition::true_cond(),
568 &mut PredicatePushdownContext::new(plan.clone()),
569 );
570 if explain_trace {
571 ctx.trace("Predicate Push Down:");
572 ctx.trace(plan.explain_to_string());
573 }
574 plan
575 }
576
577 pub fn subquery_unnesting(
578 mut plan: LogicalPlanRef,
579 enable_share_plan: bool,
580 explain_trace: bool,
581 ctx: &OptimizerContextRef,
582 ) -> Result<LogicalPlanRef> {
583 if !has_logical_apply(plan.clone()) {
585 return Ok(plan);
586 }
587 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
589 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
590 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
593 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
595 plan = if enable_share_plan {
599 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
600 } else {
601 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
602 };
603 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
604
605 plan.check_apply_elimination()?;
607
608 Ok(plan)
609 }
610
611 pub fn column_pruning(
612 mut plan: LogicalPlanRef,
613 explain_trace: bool,
614 ctx: &OptimizerContextRef,
615 ) -> LogicalPlanRef {
616 let required_cols = (0..plan.schema().len()).collect_vec();
617 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
618 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
619 if explain_trace {
621 ctx.trace("Prune Columns:");
622 ctx.trace(plan.explain_to_string());
623 }
624
625 if column_pruning_ctx.need_second_round() {
626 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
629 if explain_trace {
630 ctx.trace("Prune Columns (For DAG):");
631 ctx.trace(plan.explain_to_string());
632 }
633 }
634 plan
635 }
636
637 pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
638 let mut v = NowProcTimeFinder::default();
640 plan.visit_exprs_recursive(&mut v);
641 if !v.has() {
642 return plan;
643 }
644
645 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
646
647 let plan = plan.rewrite_exprs_recursive(&mut v);
648
649 if ctx.is_explain_trace() {
650 ctx.trace("Inline Now and ProcTime:");
651 ctx.trace(plan.explain_to_string());
652 }
653 plan
654 }
655
656 pub fn gen_optimized_logical_plan_for_stream(
657 mut plan: LogicalPlanRef,
658 ) -> Result<LogicalPlanRef> {
659 let ctx = plan.ctx();
660 let explain_trace = ctx.is_explain_trace();
661
662 if explain_trace {
663 ctx.trace("Begin:");
664 ctx.trace(plan.explain_to_string());
665 }
666
667 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
669 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
671 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
673
674 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
678 if enable_share_plan {
679 plan = plan.common_subplan_sharing();
681 plan = plan.prune_share();
682 if explain_trace {
683 ctx.trace("Common Sub-plan Sharing:");
684 ctx.trace(plan.explain_to_string());
685 }
686 } else {
687 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
688
689 plan = ShareSourceRewriter::share_source(plan);
693 if explain_trace {
694 ctx.trace("Share Source:");
695 ctx.trace(plan.explain_to_string());
696 }
697 }
698 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
699 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
700 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
703 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
705
706 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
707
708 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
709 if has_logical_max_one_row(plan.clone()) {
710 bail!("Scalar subquery might produce more than one row.");
714 }
715
716 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
719
720 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
722
723 if plan.ctx().session_ctx().config().enable_join_ordering() {
724 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
728
729 if plan
731 .ctx()
732 .session_ctx()
733 .config()
734 .streaming_enable_bushy_join()
735 {
736 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
737 } else {
738 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
739 }
740 }
741
742 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
745
746 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
748
749 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
751
752 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
753 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
756 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
757 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
758
759 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
760 plan = if force_split_distinct_agg {
763 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
764 } else {
765 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
766 };
767
768 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
769
770 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
771
772 plan = Self::column_pruning(plan, explain_trace, &ctx);
774 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
775
776 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
777 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
778
779 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
780
781 #[cfg(debug_assertions)]
782 InputRefValidator.validate(plan.clone());
783
784 ctx.may_store_explain_logical(&plan);
785
786 Ok(plan)
787 }
788
789 pub fn gen_optimized_logical_plan_for_batch(
790 mut plan: LogicalPlanRef,
791 ) -> Result<LogicalPlanRef> {
792 let ctx = plan.ctx();
793 let explain_trace = ctx.is_explain_trace();
794
795 if explain_trace {
796 ctx.trace("Begin:");
797 ctx.trace(plan.explain_to_string());
798 }
799
800 plan = Self::inline_now_proc_time(plan, &ctx);
802
803 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
805
806 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
807 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
808 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
809 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
810 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
811 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
812 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
814 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
815 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
816 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
817 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
818 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
819 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
821
822 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
823
824 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
825
826 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
830
831 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
833 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
834
835 if plan.ctx().session_ctx().config().enable_join_ordering() {
836 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
840
841 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
843 }
844
845 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
848 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
849 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
850 }
851
852 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
854
855 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
856 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
859 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
860 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
861 }
862 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
863 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
864
865 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
867
868 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
869
870 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
871
872 plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
873
874 plan = Self::column_pruning(plan, explain_trace, &ctx);
876 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
877 (#[allow(unused_assignments)]
878 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
879 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
880 }
881
882 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
883 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
884
885 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
886
887 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
888
889 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
890
891 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
892
893 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
894
895 #[cfg(debug_assertions)]
896 InputRefValidator.validate(plan.clone());
897
898 ctx.may_store_explain_logical(&plan);
899
900 Ok(plan)
901 }
902}