1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Binder, Explain, OptimizerContextRef, Planner};
37
38impl<C: ConventionMarker> PlanRef<C> {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42 stage_name: &str,
43 ) -> Result<PlanRef<C>> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage<C>,
69 ) -> Result<PlanRef<C>> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage<C>,
80 ) -> Result<PlanRef<C>> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92 stage_name: String,
93 rules: Vec<BoxedRule<C>>,
94 apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::collections::{HashMap, HashSet};
111use std::sync::LazyLock;
112
113use risingwave_common::id::ObjectId;
114use risingwave_pb::common::PbObjectType;
115use risingwave_sqlparser::ast::Statement;
116
117use crate::optimizer::plan_node::generic::GenericPlanRef;
118use crate::optimizer::plan_visitor::RelationCollectorVisitor;
119use crate::session::SessionImpl;
120
121pub struct LogicalOptimizer {}
122
123static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
124 OptimizationStage::new(
125 "DAG To Tree",
126 vec![DagToTreeRule::create()],
127 ApplyOrder::TopDown,
128 )
129});
130
131static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
132 OptimizationStage::new(
133 "Convert GENERATE_SERIES Ends With NOW",
134 vec![GenerateSeriesWithNowRule::create()],
135 ApplyOrder::TopDown,
136 )
137});
138
139static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
140 OptimizationStage::new(
141 "Table Function Convert",
142 vec![
143 TableFunctionToFileScanRule::create(),
145 TableFunctionToInternalBackfillProgressRule::create(),
147 TableFunctionToInternalSourceBackfillProgressRule::create(),
149 TableFunctionToInternalGetChannelDeltaStatsRule::create(),
151 TableFunctionToPostgresQueryRule::create(),
153 TableFunctionToMySqlQueryRule::create(),
155 TableFunctionToProjectSetRule::create(),
157 ],
158 ApplyOrder::TopDown,
159 )
160});
161
162static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
163 OptimizationStage::new(
164 "Table Function To FileScan",
165 vec![TableFunctionToFileScanRule::create()],
166 ApplyOrder::TopDown,
167 )
168});
169
170static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
171 OptimizationStage::new(
172 "Table Function To PostgresQuery",
173 vec![TableFunctionToPostgresQueryRule::create()],
174 ApplyOrder::TopDown,
175 )
176});
177
178static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
179 OptimizationStage::new(
180 "Table Function To MySQL",
181 vec![TableFunctionToMySqlQueryRule::create()],
182 ApplyOrder::TopDown,
183 )
184});
185
186static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
187 LazyLock::new(|| {
188 OptimizationStage::new(
189 "Table Function To Internal Backfill Progress",
190 vec![TableFunctionToInternalBackfillProgressRule::create()],
191 ApplyOrder::TopDown,
192 )
193 });
194
195static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
196 LazyLock::new(|| {
197 OptimizationStage::new(
198 "Table Function To Internal Source Backfill Progress",
199 vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
200 ApplyOrder::TopDown,
201 )
202 });
203
204static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
205 LazyLock::new(|| {
206 OptimizationStage::new(
207 "Table Function To Internal Get Channel Delta Stats",
208 vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
209 ApplyOrder::TopDown,
210 )
211 });
212
213static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
214 OptimizationStage::new(
215 "Values Extract Project",
216 vec![ValuesExtractProjectRule::create()],
217 ApplyOrder::TopDown,
218 )
219});
220
221static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
222 OptimizationStage::new(
223 "Simple Unnesting",
224 vec![
225 PullUpCorrelatedPredicateRule::create(),
227 PullUpCorrelatedProjectValueRule::create(),
229 PullUpCorrelatedPredicateAggRule::create(),
230 MaxOneRowEliminateRule::create(),
232 ApplyTableFunctionToProjectSetRule::create(),
234 ApplyToJoinRule::create(),
236 ],
237 ApplyOrder::BottomUp,
238 )
239});
240
241static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
242 OptimizationStage::new(
243 "Set Operation Merge",
244 vec![
245 UnionMergeRule::create(),
246 IntersectMergeRule::create(),
247 ExceptMergeRule::create(),
248 ],
249 ApplyOrder::BottomUp,
250 )
251});
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
254 LazyLock::new(|| {
255 OptimizationStage::new(
256 "General Unnesting(Translate Apply)",
257 vec![TranslateApplyRule::create(true)],
258 ApplyOrder::TopDown,
259 )
260 });
261
262static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
263 LazyLock::new(|| {
264 OptimizationStage::new(
265 "General Unnesting(Translate Apply)",
266 vec![TranslateApplyRule::create(false)],
267 ApplyOrder::TopDown,
268 )
269 });
270
271static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
272 OptimizationStage::new(
273 "General Unnesting(Push Down Apply)",
274 vec![
275 ApplyEliminateRule::create(),
276 ApplyAggTransposeRule::create(),
277 ApplyDedupTransposeRule::create(),
278 ApplyFilterTransposeRule::create(),
279 ApplyProjectTransposeRule::create(),
280 ApplyProjectSetTransposeRule::create(),
281 ApplyTopNTransposeRule::create(),
282 ApplyLimitTransposeRule::create(),
283 ApplyJoinTransposeRule::create(),
284 ApplyUnionTransposeRule::create(),
285 ApplyOverWindowTransposeRule::create(),
286 ApplyExpandTransposeRule::create(),
287 ApplyHopWindowTransposeRule::create(),
288 CrossJoinEliminateRule::create(),
289 ApplyShareEliminateRule::create(),
290 ],
291 ApplyOrder::TopDown,
292 )
293});
294
295static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
296 OptimizationStage::new(
297 "To MultiJoin",
298 vec![MergeMultiJoinRule::create()],
299 ApplyOrder::TopDown,
300 )
301});
302
303static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
304 OptimizationStage::new(
305 "Join Ordering".to_owned(),
306 vec![LeftDeepTreeJoinOrderingRule::create()],
307 ApplyOrder::TopDown,
308 )
309});
310
311static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
312 OptimizationStage::new(
313 "Join Ordering".to_owned(),
314 vec![BushyTreeJoinOrderingRule::create()],
315 ApplyOrder::TopDown,
316 )
317});
318
319static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
320 OptimizationStage::new(
321 "Push down filter with now into a left semijoin",
322 vec![
323 SplitNowAndRule::create(),
324 SplitNowOrRule::create(),
325 FilterWithNowToJoinRule::create(),
326 ],
327 ApplyOrder::TopDown,
328 )
329});
330
331static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
332 OptimizationStage::new(
333 "Push down the calculation of inputs of join's condition",
334 vec![PushCalculationOfJoinRule::create()],
335 ApplyOrder::TopDown,
336 )
337});
338
339static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
340 OptimizationStage::new(
341 "Convert Distinct Aggregation",
342 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
343 ApplyOrder::TopDown,
344 )
345});
346
347static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
348 OptimizationStage::new(
349 "Convert Distinct Aggregation",
350 vec![
351 UnionToDistinctRule::create(),
352 DistinctAggRule::create(false),
353 ],
354 ApplyOrder::TopDown,
355 )
356});
357
358static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
359 OptimizationStage::new(
360 "Simplify Aggregation",
361 vec![
362 AggGroupBySimplifyRule::create(),
363 AggCallMergeRule::create(),
364 UnifyFirstLastValueRule::create(),
365 ],
366 ApplyOrder::TopDown,
367 )
368});
369
370static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
371 OptimizationStage::new(
372 "Join Commute".to_owned(),
373 vec![JoinCommuteRule::create()],
374 ApplyOrder::TopDown,
375 )
376});
377
378static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
379 OptimizationStage::new(
380 "Constant Output Operator Remove",
381 vec![EmptyAggRemoveRule::create()],
382 ApplyOrder::TopDown,
383 )
384});
385
386static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
387 OptimizationStage::new(
388 "Project Remove",
389 vec![
390 ProjectMergeRule::create(),
392 ProjectEliminateRule::create(),
393 TrivialProjectToValuesRule::create(),
394 UnionInputValuesMergeRule::create(),
395 JoinProjectTransposeRule::create(),
396 ProjectJoinMergeRule::create(),
399 AggProjectMergeRule::create(),
400 ],
401 ApplyOrder::BottomUp,
402 )
403});
404
405static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
406 OptimizationStage::new(
407 "Split Over Window",
408 vec![OverWindowSplitRule::create()],
409 ApplyOrder::TopDown,
410 )
411});
412
413static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
417 OptimizationStage::new(
418 "Convert Over Window",
419 vec![
420 ProjectMergeRule::create(),
421 ProjectEliminateRule::create(),
422 TrivialProjectToValuesRule::create(),
423 UnionInputValuesMergeRule::create(),
424 OverWindowToAggAndJoinRule::create(),
425 OverWindowToTopNRule::create(),
426 ],
427 ApplyOrder::TopDown,
428 )
429});
430
431static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
433 OptimizationStage::new(
434 "Convert Over Window",
435 vec![
436 ProjectMergeRule::create(),
437 ProjectEliminateRule::create(),
438 TrivialProjectToValuesRule::create(),
439 UnionInputValuesMergeRule::create(),
440 OverWindowToAggAndJoinRule::create(),
441 ],
442 ApplyOrder::TopDown,
443 )
444});
445
446static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
447 OptimizationStage::new(
448 "Merge Over Window",
449 vec![OverWindowMergeRule::create()],
450 ApplyOrder::TopDown,
451 )
452});
453
454static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
455 OptimizationStage::new(
456 "Rewrite Like Expr",
457 vec![RewriteLikeExprRule::create()],
458 ApplyOrder::TopDown,
459 )
460});
461
462static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
463 OptimizationStage::new(
464 "TopN/SimpleAgg on Index",
465 vec![
466 TopNProjectTransposeRule::create(),
467 TopNOnIndexRule::create(),
468 MinMaxOnIndexRule::create(),
469 ],
470 ApplyOrder::TopDown,
471 )
472});
473
474static PROJECT_TOP_N_TRANSPOSE: LazyLock<OptimizationStage> = LazyLock::new(|| {
475 OptimizationStage::new(
476 "Project TopN Transpose",
477 vec![ProjectTopNTransposeRule::create()],
478 ApplyOrder::TopDown,
479 )
480});
481
482static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
483 OptimizationStage::new(
484 "Void always-false filter's downstream",
485 vec![AlwaysFalseFilterRule::create()],
486 ApplyOrder::TopDown,
487 )
488});
489
490static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
491 OptimizationStage::new(
492 "Push Down Limit",
493 vec![LimitPushDownRule::create()],
494 ApplyOrder::TopDown,
495 )
496});
497
498static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
499 OptimizationStage::new(
500 "Pull Up Hop",
501 vec![PullUpHopRule::create()],
502 ApplyOrder::BottomUp,
503 )
504});
505
506static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
507 OptimizationStage::new(
508 "Set Operation To Join",
509 vec![
510 IntersectToSemiJoinRule::create(),
511 ExceptToAntiJoinRule::create(),
512 ],
513 ApplyOrder::BottomUp,
514 )
515});
516
517static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
518 OptimizationStage::new(
519 "Grouping Sets",
520 vec![
521 GroupingSetsToExpandRule::create(),
522 ExpandToProjectRule::create(),
523 ],
524 ApplyOrder::TopDown,
525 )
526});
527
528static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
529 OptimizationStage::new(
530 "Common Sub Expression Extract",
531 vec![CommonSubExprExtractRule::create()],
532 ApplyOrder::TopDown,
533 )
534});
535
536static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
537 OptimizationStage::new(
538 "Logical Filter Expression Simplify",
539 vec![LogicalFilterExpressionSimplifyRule::create()],
540 ApplyOrder::TopDown,
541 )
542});
543
544static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
545 OptimizationStage::new(
546 "Rewrite Source For Batch",
547 vec![
548 SourceToKafkaScanRule::create(),
549 SourceToIcebergIntermediateScanRule::create(),
552 ],
553 ApplyOrder::TopDown,
554 )
555});
556
557static MATERIALIZE_ICEBERG_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
558 OptimizationStage::new(
559 "Materialize Iceberg Scan",
560 vec![IcebergIntermediateScanRule::create()],
561 ApplyOrder::TopDown,
562 )
563});
564
565static ICEBERG_COUNT_STAR: LazyLock<OptimizationStage> = LazyLock::new(|| {
566 OptimizationStage::new(
567 "Iceberg Count Star Optimization",
568 vec![IcebergCountStarRule::create()],
569 ApplyOrder::BottomUp,
570 )
571});
572
573static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
574 OptimizationStage::new(
575 "TopN to Vector Search",
576 vec![TopNToVectorSearchRule::create()],
577 ApplyOrder::BottomUp,
578 )
579});
580
581static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
582 LazyLock::new(|| {
583 OptimizationStage::new(
584 "Correlated TopN to Vector Search",
585 vec![CorrelatedTopNToVectorSearchRule::create(true)],
586 ApplyOrder::BottomUp,
587 )
588 });
589
590static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
591 LazyLock::new(|| {
592 OptimizationStage::new(
593 "Correlated TopN to Vector Search",
594 vec![CorrelatedTopNToVectorSearchRule::create(false)],
595 ApplyOrder::BottomUp,
596 )
597 });
598
599static BATCH_MV_SELECTION: LazyLock<OptimizationStage> = LazyLock::new(|| {
600 OptimizationStage::new(
601 "Batch Mv Selection",
602 vec![MvSelectionRule::create()],
603 ApplyOrder::TopDown,
604 )
605});
606
607impl LogicalOptimizer {
608 pub fn predicate_pushdown(
609 plan: LogicalPlanRef,
610 explain_trace: bool,
611 ctx: &OptimizerContextRef,
612 ) -> LogicalPlanRef {
613 let plan = plan.predicate_pushdown(
614 Condition::true_cond(),
615 &mut PredicatePushdownContext::new(plan.clone()),
616 );
617 if explain_trace {
618 ctx.trace("Predicate Push Down:");
619 ctx.trace(plan.explain_to_string());
620 }
621 plan
622 }
623
624 pub fn subquery_unnesting(
625 mut plan: LogicalPlanRef,
626 enable_share_plan: bool,
627 explain_trace: bool,
628 ctx: &OptimizerContextRef,
629 ) -> Result<LogicalPlanRef> {
630 if !has_logical_apply(plan.clone()) {
632 return Ok(plan);
633 }
634 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
636 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
637 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
640 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
642 plan = if enable_share_plan {
646 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
647 } else {
648 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
649 };
650 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
651
652 plan.check_apply_elimination()?;
654
655 Ok(plan)
656 }
657
658 pub fn column_pruning(
659 mut plan: LogicalPlanRef,
660 explain_trace: bool,
661 ctx: &OptimizerContextRef,
662 ) -> LogicalPlanRef {
663 let required_cols = (0..plan.schema().len()).collect_vec();
664 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
665 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
666 if explain_trace {
668 ctx.trace("Prune Columns:");
669 ctx.trace(plan.explain_to_string());
670 }
671
672 if column_pruning_ctx.need_second_round() {
673 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
676 if explain_trace {
677 ctx.trace("Prune Columns (For DAG):");
678 ctx.trace(plan.explain_to_string());
679 }
680 }
681 plan
682 }
683
684 pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
685 let mut v = NowProcTimeFinder::default();
687 plan.visit_exprs_recursive(&mut v);
688 if !v.has() {
689 return plan;
690 }
691
692 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
693
694 let plan = plan.rewrite_exprs_recursive(&mut v);
695
696 if ctx.is_explain_trace() {
697 ctx.trace("Inline Now and ProcTime:");
698 ctx.trace(plan.explain_to_string());
699 }
700 plan
701 }
702
703 pub fn gen_optimized_logical_plan_for_stream(
704 mut plan: LogicalPlanRef,
705 ) -> Result<LogicalPlanRef> {
706 let ctx = plan.ctx();
707 let explain_trace = ctx.is_explain_trace();
708
709 if explain_trace {
710 ctx.trace("Begin:");
711 ctx.trace(plan.explain_to_string());
712 }
713
714 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
716 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
718 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
720
721 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
725 if enable_share_plan {
726 plan = plan.common_subplan_sharing();
728 plan = plan.prune_share();
729 if explain_trace {
730 ctx.trace("Common Sub-plan Sharing:");
731 ctx.trace(plan.explain_to_string());
732 }
733 } else {
734 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
735
736 plan = ShareSourceRewriter::share_source(plan);
740 if explain_trace {
741 ctx.trace("Share Source:");
742 ctx.trace(plan.explain_to_string());
743 }
744 }
745 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
746 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
747 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
750 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
752
753 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
754
755 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
756 if has_logical_max_one_row(plan.clone()) {
757 bail!("Scalar subquery might produce more than one row.");
761 }
762
763 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
766
767 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
769
770 if plan.ctx().session_ctx().config().enable_join_ordering() {
771 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
775
776 if plan
778 .ctx()
779 .session_ctx()
780 .config()
781 .streaming_enable_bushy_join()
782 {
783 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
784 } else {
785 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
786 }
787 }
788
789 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
792
793 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
795
796 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
798
799 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
800 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
803 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
804 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
805
806 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
807 plan = if force_split_distinct_agg {
810 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
811 } else {
812 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
813 };
814
815 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
816
817 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
818
819 plan = Self::column_pruning(plan, explain_trace, &ctx);
821 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
822
823 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
824 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
825
826 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
827
828 #[cfg(debug_assertions)]
829 InputRefValidator.validate(plan.clone());
830
831 ctx.may_store_explain_logical(&plan);
832
833 Ok(plan)
834 }
835
836 pub fn gen_optimized_logical_plan_for_batch(
837 mut plan: LogicalPlanRef,
838 ) -> Result<LogicalPlanRef> {
839 let ctx = plan.ctx();
840 let explain_trace = ctx.is_explain_trace();
841
842 if explain_trace {
843 ctx.trace("Begin:");
844 ctx.trace(plan.explain_to_string());
845 }
846
847 if ctx.session_ctx().config().enable_mv_selection() {
848 let query_relations =
849 RelationCollectorVisitor::collect_with(HashSet::new(), plan.clone());
850 Self::register_batch_mview_candidates(ctx.session_ctx(), &ctx, &query_relations);
851 plan = plan.optimize_by_rules(&BATCH_MV_SELECTION)?;
852 }
853
854 plan = Self::inline_now_proc_time(plan, &ctx);
856
857 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
859
860 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
861 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
862 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
863 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
864 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
865 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
866 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
868 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
869 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
870 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
871 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
872 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
873 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
875
876 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
877
878 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
879
880 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
884
885 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
887 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
888
889 if plan.ctx().session_ctx().config().enable_join_ordering() {
890 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
894
895 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
897 }
898
899 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
902 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
903 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
904 }
905
906 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
908
909 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
910 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
913 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
914 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
915 }
916 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
917 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
918
919 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
921
922 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
923
924 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
925
926 plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
927
928 plan = Self::column_pruning(plan, explain_trace, &ctx);
930 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
931 (#[allow(unused_assignments)]
932 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
933 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
934 }
935
936 plan = plan.optimize_by_rules(&MATERIALIZE_ICEBERG_SCAN)?;
940
941 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
942 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
943
944 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
945
946 plan = plan.optimize_by_rules(&ICEBERG_COUNT_STAR)?;
948
949 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
950
951 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
952
953 plan = plan.optimize_by_rules(&PROJECT_TOP_N_TRANSPOSE)?;
954
955 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
956
957 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
958
959 #[cfg(debug_assertions)]
960 InputRefValidator.validate(plan.clone());
961
962 ctx.may_store_explain_logical(&plan);
963
964 Ok(plan)
965 }
966
967 fn register_batch_mview_candidates(
968 session: &SessionImpl,
969 context: &OptimizerContextRef,
970 query_relations: &HashSet<ObjectId>,
971 ) {
972 let catalog_reader = session.env().catalog_reader().read_guard();
973 let user_reader = session.env().user_info_reader().read_guard();
974 let Some(current_user) = user_reader.get_user_by_name(&session.user_name()) else {
975 return;
976 };
977 let mut mv_dependencies: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
978 let mut mviews_with_source_dependency: HashSet<ObjectId> = HashSet::new();
979 for dep in catalog_reader.iter_object_dependencies() {
980 mv_dependencies
981 .entry(dep.object_id)
982 .or_default()
983 .insert(dep.referenced_object_id);
984 if dep.referenced_object_type == PbObjectType::Source {
985 mviews_with_source_dependency.insert(dep.object_id);
986 }
987 }
988 let db_name = session.database();
989 let Ok(schemas) = catalog_reader.iter_schemas(&db_name) else {
990 return;
991 };
992
993 for schema in schemas {
994 for mv in schema.iter_created_mvs_with_acl(current_user) {
995 let mv_object_id = mv.id().as_object_id();
996 if mviews_with_source_dependency.contains(&mv_object_id) {
997 continue;
998 }
999 let is_subset = mv_dependencies
1000 .get(&mv_object_id)
1001 .is_some_and(|deps| deps.is_subset(query_relations));
1002 if !is_subset {
1003 continue;
1004 }
1005 let Ok(stmt) = mv.create_sql_ast() else {
1006 continue;
1007 };
1008 let Statement::CreateView {
1009 materialized: true,
1010 query,
1011 ..
1012 } = stmt
1013 else {
1014 continue;
1015 };
1016 let mut binder = Binder::new_for_batch(session);
1017 let Ok(bound_query) = binder.bind_query(&query) else {
1018 continue;
1019 };
1020 let mut planner = Planner::new_for_batch_dql(context.clone());
1021 let Ok(plan_root) = planner.plan_query(bound_query) else {
1022 continue;
1023 };
1024 context.add_batch_mview_candidate(mv.clone(), plan_root.plan.clone());
1025 }
1026 }
1027 }
1028}