1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Explain, OptimizerContextRef};
37
38impl<C: ConventionMarker> PlanRef<C> {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42 stage_name: &str,
43 ) -> Result<PlanRef<C>> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage<C>,
69 ) -> Result<PlanRef<C>> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage<C>,
80 ) -> Result<PlanRef<C>> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92 stage_name: String,
93 rules: Vec<BoxedRule<C>>,
94 apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::sync::LazyLock;
111
112use crate::optimizer::plan_node::generic::GenericPlanRef;
113
114pub struct LogicalOptimizer {}
115
116static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
117 OptimizationStage::new(
118 "DAG To Tree",
119 vec![DagToTreeRule::create()],
120 ApplyOrder::TopDown,
121 )
122});
123
124static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
125 OptimizationStage::new(
126 "Convert GENERATE_SERIES Ends With NOW",
127 vec![GenerateSeriesWithNowRule::create()],
128 ApplyOrder::TopDown,
129 )
130});
131
132static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
133 OptimizationStage::new(
134 "Table Function Convert",
135 vec![
136 TableFunctionToFileScanRule::create(),
138 TableFunctionToInternalBackfillProgressRule::create(),
140 TableFunctionToInternalSourceBackfillProgressRule::create(),
142 TableFunctionToInternalGetChannelDeltaStatsRule::create(),
144 TableFunctionToPostgresQueryRule::create(),
146 TableFunctionToMySqlQueryRule::create(),
148 TableFunctionToProjectSetRule::create(),
150 ],
151 ApplyOrder::TopDown,
152 )
153});
154
155static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
156 OptimizationStage::new(
157 "Table Function To FileScan",
158 vec![TableFunctionToFileScanRule::create()],
159 ApplyOrder::TopDown,
160 )
161});
162
163static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
164 OptimizationStage::new(
165 "Table Function To PostgresQuery",
166 vec![TableFunctionToPostgresQueryRule::create()],
167 ApplyOrder::TopDown,
168 )
169});
170
171static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
172 OptimizationStage::new(
173 "Table Function To MySQL",
174 vec![TableFunctionToMySqlQueryRule::create()],
175 ApplyOrder::TopDown,
176 )
177});
178
179static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
180 LazyLock::new(|| {
181 OptimizationStage::new(
182 "Table Function To Internal Backfill Progress",
183 vec![TableFunctionToInternalBackfillProgressRule::create()],
184 ApplyOrder::TopDown,
185 )
186 });
187
188static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
189 LazyLock::new(|| {
190 OptimizationStage::new(
191 "Table Function To Internal Source Backfill Progress",
192 vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
193 ApplyOrder::TopDown,
194 )
195 });
196
197static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
198 LazyLock::new(|| {
199 OptimizationStage::new(
200 "Table Function To Internal Get Channel Delta Stats",
201 vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
202 ApplyOrder::TopDown,
203 )
204 });
205
206static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
207 OptimizationStage::new(
208 "Values Extract Project",
209 vec![ValuesExtractProjectRule::create()],
210 ApplyOrder::TopDown,
211 )
212});
213
214static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
215 OptimizationStage::new(
216 "Simple Unnesting",
217 vec![
218 PullUpCorrelatedPredicateRule::create(),
220 PullUpCorrelatedProjectValueRule::create(),
222 PullUpCorrelatedPredicateAggRule::create(),
223 MaxOneRowEliminateRule::create(),
225 ApplyTableFunctionToProjectSetRule::create(),
227 ApplyToJoinRule::create(),
229 ],
230 ApplyOrder::BottomUp,
231 )
232});
233
234static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
235 OptimizationStage::new(
236 "Set Operation Merge",
237 vec![
238 UnionMergeRule::create(),
239 IntersectMergeRule::create(),
240 ExceptMergeRule::create(),
241 ],
242 ApplyOrder::BottomUp,
243 )
244});
245
246static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
247 LazyLock::new(|| {
248 OptimizationStage::new(
249 "General Unnesting(Translate Apply)",
250 vec![TranslateApplyRule::create(true)],
251 ApplyOrder::TopDown,
252 )
253 });
254
255static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
256 LazyLock::new(|| {
257 OptimizationStage::new(
258 "General Unnesting(Translate Apply)",
259 vec![TranslateApplyRule::create(false)],
260 ApplyOrder::TopDown,
261 )
262 });
263
264static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
265 OptimizationStage::new(
266 "General Unnesting(Push Down Apply)",
267 vec![
268 ApplyEliminateRule::create(),
269 ApplyAggTransposeRule::create(),
270 ApplyDedupTransposeRule::create(),
271 ApplyFilterTransposeRule::create(),
272 ApplyProjectTransposeRule::create(),
273 ApplyProjectSetTransposeRule::create(),
274 ApplyTopNTransposeRule::create(),
275 ApplyLimitTransposeRule::create(),
276 ApplyJoinTransposeRule::create(),
277 ApplyUnionTransposeRule::create(),
278 ApplyOverWindowTransposeRule::create(),
279 ApplyExpandTransposeRule::create(),
280 ApplyHopWindowTransposeRule::create(),
281 CrossJoinEliminateRule::create(),
282 ApplyShareEliminateRule::create(),
283 ],
284 ApplyOrder::TopDown,
285 )
286});
287
288static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
289 OptimizationStage::new(
290 "To MultiJoin",
291 vec![MergeMultiJoinRule::create()],
292 ApplyOrder::TopDown,
293 )
294});
295
296static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
297 OptimizationStage::new(
298 "Join Ordering".to_owned(),
299 vec![LeftDeepTreeJoinOrderingRule::create()],
300 ApplyOrder::TopDown,
301 )
302});
303
304static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
305 OptimizationStage::new(
306 "Join Ordering".to_owned(),
307 vec![BushyTreeJoinOrderingRule::create()],
308 ApplyOrder::TopDown,
309 )
310});
311
312static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
313 OptimizationStage::new(
314 "Push down filter with now into a left semijoin",
315 vec![
316 SplitNowAndRule::create(),
317 SplitNowOrRule::create(),
318 FilterWithNowToJoinRule::create(),
319 ],
320 ApplyOrder::TopDown,
321 )
322});
323
324static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
325 OptimizationStage::new(
326 "Push down the calculation of inputs of join's condition",
327 vec![PushCalculationOfJoinRule::create()],
328 ApplyOrder::TopDown,
329 )
330});
331
332static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
333 OptimizationStage::new(
334 "Convert Distinct Aggregation",
335 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
336 ApplyOrder::TopDown,
337 )
338});
339
340static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
341 OptimizationStage::new(
342 "Convert Distinct Aggregation",
343 vec![
344 UnionToDistinctRule::create(),
345 DistinctAggRule::create(false),
346 ],
347 ApplyOrder::TopDown,
348 )
349});
350
351static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
352 OptimizationStage::new(
353 "Simplify Aggregation",
354 vec![
355 AggGroupBySimplifyRule::create(),
356 AggCallMergeRule::create(),
357 UnifyFirstLastValueRule::create(),
358 ],
359 ApplyOrder::TopDown,
360 )
361});
362
363static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
364 OptimizationStage::new(
365 "Join Commute".to_owned(),
366 vec![JoinCommuteRule::create()],
367 ApplyOrder::TopDown,
368 )
369});
370
371static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
372 OptimizationStage::new(
373 "Constant Output Operator Remove",
374 vec![EmptyAggRemoveRule::create()],
375 ApplyOrder::TopDown,
376 )
377});
378
379static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
380 OptimizationStage::new(
381 "Project Remove",
382 vec![
383 ProjectMergeRule::create(),
385 ProjectEliminateRule::create(),
386 TrivialProjectToValuesRule::create(),
387 UnionInputValuesMergeRule::create(),
388 JoinProjectTransposeRule::create(),
389 ProjectJoinMergeRule::create(),
392 AggProjectMergeRule::create(),
393 ],
394 ApplyOrder::BottomUp,
395 )
396});
397
398static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
399 OptimizationStage::new(
400 "Split Over Window",
401 vec![OverWindowSplitRule::create()],
402 ApplyOrder::TopDown,
403 )
404});
405
406static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
410 OptimizationStage::new(
411 "Convert Over Window",
412 vec![
413 ProjectMergeRule::create(),
414 ProjectEliminateRule::create(),
415 TrivialProjectToValuesRule::create(),
416 UnionInputValuesMergeRule::create(),
417 OverWindowToAggAndJoinRule::create(),
418 OverWindowToTopNRule::create(),
419 ],
420 ApplyOrder::TopDown,
421 )
422});
423
424static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
426 OptimizationStage::new(
427 "Convert Over Window",
428 vec![
429 ProjectMergeRule::create(),
430 ProjectEliminateRule::create(),
431 TrivialProjectToValuesRule::create(),
432 UnionInputValuesMergeRule::create(),
433 OverWindowToAggAndJoinRule::create(),
434 ],
435 ApplyOrder::TopDown,
436 )
437});
438
439static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
440 OptimizationStage::new(
441 "Merge Over Window",
442 vec![OverWindowMergeRule::create()],
443 ApplyOrder::TopDown,
444 )
445});
446
447static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
448 OptimizationStage::new(
449 "Rewrite Like Expr",
450 vec![RewriteLikeExprRule::create()],
451 ApplyOrder::TopDown,
452 )
453});
454
455static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
456 OptimizationStage::new(
457 "TopN/SimpleAgg on Index",
458 vec![
459 TopNProjectTransposeRule::create(),
460 TopNOnIndexRule::create(),
461 MinMaxOnIndexRule::create(),
462 ],
463 ApplyOrder::TopDown,
464 )
465});
466
467static PROJECT_TOP_N_TRANSPOSE: LazyLock<OptimizationStage> = LazyLock::new(|| {
468 OptimizationStage::new(
469 "Project TopN Transpose",
470 vec![ProjectTopNTransposeRule::create()],
471 ApplyOrder::TopDown,
472 )
473});
474
475static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
476 OptimizationStage::new(
477 "Void always-false filter's downstream",
478 vec![AlwaysFalseFilterRule::create()],
479 ApplyOrder::TopDown,
480 )
481});
482
483static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
484 OptimizationStage::new(
485 "Push Down Limit",
486 vec![LimitPushDownRule::create()],
487 ApplyOrder::TopDown,
488 )
489});
490
491static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
492 OptimizationStage::new(
493 "Pull Up Hop",
494 vec![PullUpHopRule::create()],
495 ApplyOrder::BottomUp,
496 )
497});
498
499static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
500 OptimizationStage::new(
501 "Set Operation To Join",
502 vec![
503 IntersectToSemiJoinRule::create(),
504 ExceptToAntiJoinRule::create(),
505 ],
506 ApplyOrder::BottomUp,
507 )
508});
509
510static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
511 OptimizationStage::new(
512 "Grouping Sets",
513 vec![
514 GroupingSetsToExpandRule::create(),
515 ExpandToProjectRule::create(),
516 ],
517 ApplyOrder::TopDown,
518 )
519});
520
521static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
522 OptimizationStage::new(
523 "Common Sub Expression Extract",
524 vec![CommonSubExprExtractRule::create()],
525 ApplyOrder::TopDown,
526 )
527});
528
529static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
530 OptimizationStage::new(
531 "Logical Filter Expression Simplify",
532 vec![LogicalFilterExpressionSimplifyRule::create()],
533 ApplyOrder::TopDown,
534 )
535});
536
537static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
538 OptimizationStage::new(
539 "Rewrite Source For Batch",
540 vec![
541 SourceToKafkaScanRule::create(),
542 SourceToIcebergIntermediateScanRule::create(),
545 ],
546 ApplyOrder::TopDown,
547 )
548});
549
550static MATERIALIZE_ICEBERG_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
551 OptimizationStage::new(
552 "Materialize Iceberg Scan",
553 vec![IcebergIntermediateScanRule::create()],
554 ApplyOrder::TopDown,
555 )
556});
557
558static ICEBERG_COUNT_STAR: LazyLock<OptimizationStage> = LazyLock::new(|| {
559 OptimizationStage::new(
560 "Iceberg Count Star Optimization",
561 vec![IcebergCountStarRule::create()],
562 ApplyOrder::BottomUp,
563 )
564});
565
566static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
567 OptimizationStage::new(
568 "TopN to Vector Search",
569 vec![TopNToVectorSearchRule::create()],
570 ApplyOrder::BottomUp,
571 )
572});
573
574static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
575 LazyLock::new(|| {
576 OptimizationStage::new(
577 "Correlated TopN to Vector Search",
578 vec![CorrelatedTopNToVectorSearchRule::create(true)],
579 ApplyOrder::BottomUp,
580 )
581 });
582
583static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
584 LazyLock::new(|| {
585 OptimizationStage::new(
586 "Correlated TopN to Vector Search",
587 vec![CorrelatedTopNToVectorSearchRule::create(false)],
588 ApplyOrder::BottomUp,
589 )
590 });
591
592impl LogicalOptimizer {
593 pub fn predicate_pushdown(
594 plan: LogicalPlanRef,
595 explain_trace: bool,
596 ctx: &OptimizerContextRef,
597 ) -> LogicalPlanRef {
598 let plan = plan.predicate_pushdown(
599 Condition::true_cond(),
600 &mut PredicatePushdownContext::new(plan.clone()),
601 );
602 if explain_trace {
603 ctx.trace("Predicate Push Down:");
604 ctx.trace(plan.explain_to_string());
605 }
606 plan
607 }
608
609 pub fn subquery_unnesting(
610 mut plan: LogicalPlanRef,
611 enable_share_plan: bool,
612 explain_trace: bool,
613 ctx: &OptimizerContextRef,
614 ) -> Result<LogicalPlanRef> {
615 if !has_logical_apply(plan.clone()) {
617 return Ok(plan);
618 }
619 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
621 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
622 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
625 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
627 plan = if enable_share_plan {
631 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
632 } else {
633 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
634 };
635 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
636
637 plan.check_apply_elimination()?;
639
640 Ok(plan)
641 }
642
643 pub fn column_pruning(
644 mut plan: LogicalPlanRef,
645 explain_trace: bool,
646 ctx: &OptimizerContextRef,
647 ) -> LogicalPlanRef {
648 let required_cols = (0..plan.schema().len()).collect_vec();
649 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
650 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
651 if explain_trace {
653 ctx.trace("Prune Columns:");
654 ctx.trace(plan.explain_to_string());
655 }
656
657 if column_pruning_ctx.need_second_round() {
658 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
661 if explain_trace {
662 ctx.trace("Prune Columns (For DAG):");
663 ctx.trace(plan.explain_to_string());
664 }
665 }
666 plan
667 }
668
669 pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
670 let mut v = NowProcTimeFinder::default();
672 plan.visit_exprs_recursive(&mut v);
673 if !v.has() {
674 return plan;
675 }
676
677 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
678
679 let plan = plan.rewrite_exprs_recursive(&mut v);
680
681 if ctx.is_explain_trace() {
682 ctx.trace("Inline Now and ProcTime:");
683 ctx.trace(plan.explain_to_string());
684 }
685 plan
686 }
687
688 pub fn gen_optimized_logical_plan_for_stream(
689 mut plan: LogicalPlanRef,
690 ) -> Result<LogicalPlanRef> {
691 let ctx = plan.ctx();
692 let explain_trace = ctx.is_explain_trace();
693
694 if explain_trace {
695 ctx.trace("Begin:");
696 ctx.trace(plan.explain_to_string());
697 }
698
699 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
701 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
703 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
705
706 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
710 if enable_share_plan {
711 plan = plan.common_subplan_sharing();
713 plan = plan.prune_share();
714 if explain_trace {
715 ctx.trace("Common Sub-plan Sharing:");
716 ctx.trace(plan.explain_to_string());
717 }
718 } else {
719 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
720
721 plan = ShareSourceRewriter::share_source(plan);
725 if explain_trace {
726 ctx.trace("Share Source:");
727 ctx.trace(plan.explain_to_string());
728 }
729 }
730 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
731 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
732 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
735 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
737
738 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
739
740 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
741 if has_logical_max_one_row(plan.clone()) {
742 bail!("Scalar subquery might produce more than one row.");
746 }
747
748 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
751
752 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
754
755 if plan.ctx().session_ctx().config().enable_join_ordering() {
756 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
760
761 if plan
763 .ctx()
764 .session_ctx()
765 .config()
766 .streaming_enable_bushy_join()
767 {
768 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
769 } else {
770 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
771 }
772 }
773
774 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
777
778 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
780
781 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
783
784 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
785 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
788 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
789 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
790
791 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
792 plan = if force_split_distinct_agg {
795 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
796 } else {
797 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
798 };
799
800 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
801
802 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
803
804 plan = Self::column_pruning(plan, explain_trace, &ctx);
806 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
807
808 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
809 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
810
811 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
812
813 #[cfg(debug_assertions)]
814 InputRefValidator.validate(plan.clone());
815
816 ctx.may_store_explain_logical(&plan);
817
818 Ok(plan)
819 }
820
821 pub fn gen_optimized_logical_plan_for_batch(
822 mut plan: LogicalPlanRef,
823 ) -> Result<LogicalPlanRef> {
824 let ctx = plan.ctx();
825 let explain_trace = ctx.is_explain_trace();
826
827 if explain_trace {
828 ctx.trace("Begin:");
829 ctx.trace(plan.explain_to_string());
830 }
831
832 plan = Self::inline_now_proc_time(plan, &ctx);
834
835 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
837
838 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
839 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
840 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
841 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
842 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
843 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
844 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
846 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
847 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
848 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
849 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
850 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
851 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
853
854 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
855
856 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
857
858 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
862
863 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
865 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
866
867 if plan.ctx().session_ctx().config().enable_join_ordering() {
868 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
872
873 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
875 }
876
877 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
880 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
881 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
882 }
883
884 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
886
887 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
888 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
891 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
892 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
893 }
894 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
895 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
896
897 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
899
900 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
901
902 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
903
904 plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
905
906 plan = Self::column_pruning(plan, explain_trace, &ctx);
908 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
909 (#[allow(unused_assignments)]
910 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
911 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
912 }
913
914 plan = plan.optimize_by_rules(&MATERIALIZE_ICEBERG_SCAN)?;
918
919 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
920 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
921
922 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
923
924 plan = plan.optimize_by_rules(&ICEBERG_COUNT_STAR)?;
926
927 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
928
929 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
930
931 plan = plan.optimize_by_rules(&PROJECT_TOP_N_TRANSPOSE)?;
932
933 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
934
935 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
936
937 #[cfg(debug_assertions)]
938 InputRefValidator.validate(plan.clone());
939
940 ctx.may_store_explain_logical(&plan);
941
942 Ok(plan)
943 }
944}