1use itertools::Itertools;
16use risingwave_common::bail;
17use thiserror_ext::AsReport as _;
18
19use super::plan_node::{ConventionMarker, Logical, LogicalPlanRef};
20use super::plan_visitor::has_logical_max_one_row;
21use crate::error::Result;
22use crate::expr::NowProcTimeFinder;
23use crate::optimizer::PlanRef;
24use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
27};
28use crate::optimizer::plan_rewriter::ShareSourceRewriter;
29#[cfg(debug_assertions)]
30use crate::optimizer::plan_visitor::InputRefValidator;
31use crate::optimizer::plan_visitor::{
32 HasMaxOneRowApply, PlanCheckApplyEliminationExt, PlanVisitor, has_logical_apply,
33};
34use crate::optimizer::rule::*;
35use crate::utils::Condition;
36use crate::{Binder, Explain, OptimizerContextRef, Planner};
37
38impl<C: ConventionMarker> PlanRef<C> {
39 fn optimize_by_rules_inner(
40 self,
41 heuristic_optimizer: &mut HeuristicOptimizer<'_, C>,
42 stage_name: &str,
43 ) -> Result<PlanRef<C>> {
44 let ctx = self.ctx();
45
46 let result = heuristic_optimizer.optimize(self);
47 let stats = heuristic_optimizer.get_stats();
48
49 if ctx.is_explain_trace() && stats.has_applied_rule() {
50 ctx.trace(format!("{}:", stage_name));
51 ctx.trace(format!("{}", stats));
52 ctx.trace(match &result {
53 Ok(plan) => plan.explain_to_string(),
54 Err(error) => format!("Optimization failed: {}", error.as_report()),
55 });
56 }
57 ctx.add_rule_applied(stats.total_applied());
58
59 result
60 }
61
62 pub(crate) fn optimize_by_rules(
63 self,
64 OptimizationStage {
65 stage_name,
66 rules,
67 apply_order,
68 }: &OptimizationStage<C>,
69 ) -> Result<PlanRef<C>> {
70 self.optimize_by_rules_inner(&mut HeuristicOptimizer::new(apply_order, rules), stage_name)
71 }
72
73 pub(crate) fn optimize_by_rules_until_fix_point(
74 mut self,
75 OptimizationStage {
76 stage_name,
77 rules,
78 apply_order,
79 }: &OptimizationStage<C>,
80 ) -> Result<PlanRef<C>> {
81 loop {
82 let mut heuristic_optimizer = HeuristicOptimizer::new(apply_order, rules);
83 self = self.optimize_by_rules_inner(&mut heuristic_optimizer, stage_name)?;
84 if !heuristic_optimizer.get_stats().has_applied_rule() {
85 return Ok(self);
86 }
87 }
88 }
89}
90
91pub struct OptimizationStage<C: ConventionMarker = Logical> {
92 stage_name: String,
93 rules: Vec<BoxedRule<C>>,
94 apply_order: ApplyOrder,
95}
96
97impl<C: ConventionMarker> OptimizationStage<C> {
98 pub fn new<S>(name: S, rules: Vec<BoxedRule<C>>, apply_order: ApplyOrder) -> Self
99 where
100 S: Into<String>,
101 {
102 OptimizationStage {
103 stage_name: name.into(),
104 rules,
105 apply_order,
106 }
107 }
108}
109
110use std::collections::{HashMap, HashSet};
111use std::sync::LazyLock;
112
113use risingwave_common::id::ObjectId;
114use risingwave_pb::common::PbObjectType;
115use risingwave_sqlparser::ast::Statement;
116
117use crate::optimizer::plan_node::generic::GenericPlanRef;
118use crate::optimizer::plan_visitor::RelationCollectorVisitor;
119use crate::session::SessionImpl;
120
121pub struct LogicalOptimizer {}
122
123static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
124 OptimizationStage::new(
125 "DAG To Tree",
126 vec![DagToTreeRule::create()],
127 ApplyOrder::TopDown,
128 )
129});
130
131static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
132 OptimizationStage::new(
133 "Convert GENERATE_SERIES Ends With NOW",
134 vec![GenerateSeriesWithNowRule::create()],
135 ApplyOrder::TopDown,
136 )
137});
138
139static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
140 OptimizationStage::new(
141 "Table Function Convert",
142 vec![
143 TableFunctionToFileScanRule::create(),
145 TableFunctionToInternalBackfillProgressRule::create(),
147 TableFunctionToInternalSourceBackfillProgressRule::create(),
149 TableFunctionToInternalGetChannelDeltaStatsRule::create(),
151 TableFunctionToPostgresQueryRule::create(),
153 TableFunctionToMySqlQueryRule::create(),
155 TableFunctionToProjectSetRule::create(),
157 ],
158 ApplyOrder::TopDown,
159 )
160});
161
162static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
163 OptimizationStage::new(
164 "Table Function To FileScan",
165 vec![TableFunctionToFileScanRule::create()],
166 ApplyOrder::TopDown,
167 )
168});
169
170static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
171 OptimizationStage::new(
172 "Table Function To PostgresQuery",
173 vec![TableFunctionToPostgresQueryRule::create()],
174 ApplyOrder::TopDown,
175 )
176});
177
178static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::new(|| {
179 OptimizationStage::new(
180 "Table Function To MySQL",
181 vec![TableFunctionToMySqlQueryRule::create()],
182 ApplyOrder::TopDown,
183 )
184});
185
186static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
187 LazyLock::new(|| {
188 OptimizationStage::new(
189 "Table Function To Internal Backfill Progress",
190 vec![TableFunctionToInternalBackfillProgressRule::create()],
191 ApplyOrder::TopDown,
192 )
193 });
194
195static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
196 LazyLock::new(|| {
197 OptimizationStage::new(
198 "Table Function To Internal Source Backfill Progress",
199 vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
200 ApplyOrder::TopDown,
201 )
202 });
203
204static TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS: LazyLock<OptimizationStage> =
205 LazyLock::new(|| {
206 OptimizationStage::new(
207 "Table Function To Internal Get Channel Delta Stats",
208 vec![TableFunctionToInternalGetChannelDeltaStatsRule::create()],
209 ApplyOrder::TopDown,
210 )
211 });
212
213static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
214 OptimizationStage::new(
215 "Values Extract Project",
216 vec![ValuesExtractProjectRule::create()],
217 ApplyOrder::TopDown,
218 )
219});
220
221static SIMPLE_UNNESTING: LazyLock<OptimizationStage> = LazyLock::new(|| {
222 OptimizationStage::new(
223 "Simple Unnesting",
224 vec![
225 PullUpCorrelatedPredicateRule::create(),
227 PullUpCorrelatedProjectValueRule::create(),
229 PullUpCorrelatedPredicateAggRule::create(),
230 MaxOneRowEliminateRule::create(),
232 ApplyTableFunctionToProjectSetRule::create(),
234 ApplyToJoinRule::create(),
236 ],
237 ApplyOrder::BottomUp,
238 )
239});
240
241static SET_OPERATION_MERGE: LazyLock<OptimizationStage> = LazyLock::new(|| {
242 OptimizationStage::new(
243 "Set Operation Merge",
244 vec![
245 UnionMergeRule::create(),
246 IntersectMergeRule::create(),
247 ExceptMergeRule::create(),
248 ],
249 ApplyOrder::BottomUp,
250 )
251});
252
253static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
254 LazyLock::new(|| {
255 OptimizationStage::new(
256 "General Unnesting(Translate Apply)",
257 vec![TranslateApplyRule::create(true)],
258 ApplyOrder::TopDown,
259 )
260 });
261
262static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage> =
263 LazyLock::new(|| {
264 OptimizationStage::new(
265 "General Unnesting(Translate Apply)",
266 vec![TranslateApplyRule::create(false)],
267 ApplyOrder::TopDown,
268 )
269 });
270
271static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock::new(|| {
272 OptimizationStage::new(
273 "General Unnesting(Push Down Apply)",
274 vec![
275 ApplyEliminateRule::create(),
276 ApplyAggTransposeRule::create(),
277 ApplyDedupTransposeRule::create(),
278 ApplyFilterTransposeRule::create(),
279 ApplyProjectTransposeRule::create(),
280 ApplyProjectSetTransposeRule::create(),
281 ApplyTopNTransposeRule::create(),
282 ApplyLimitTransposeRule::create(),
283 ApplyJoinTransposeRule::create(),
284 ApplyUnionTransposeRule::create(),
285 ApplyOverWindowTransposeRule::create(),
286 ApplyExpandTransposeRule::create(),
287 ApplyHopWindowTransposeRule::create(),
288 CrossJoinEliminateRule::create(),
289 ApplyShareEliminateRule::create(),
290 ],
291 ApplyOrder::TopDown,
292 )
293});
294
295static TO_MULTI_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
296 OptimizationStage::new(
297 "To MultiJoin",
298 vec![MergeMultiJoinRule::create()],
299 ApplyOrder::TopDown,
300 )
301});
302
303static LEFT_DEEP_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
304 OptimizationStage::new(
305 "Join Ordering".to_owned(),
306 vec![LeftDeepTreeJoinOrderingRule::create()],
307 ApplyOrder::TopDown,
308 )
309});
310
311static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(|| {
312 OptimizationStage::new(
313 "Join Ordering".to_owned(),
314 vec![BushyTreeJoinOrderingRule::create()],
315 ApplyOrder::TopDown,
316 )
317});
318
319static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
320 OptimizationStage::new(
321 "Push down filter with now into a left semijoin",
322 vec![
323 SplitNowAndRule::create(),
324 SplitNowOrRule::create(),
325 FilterWithNowToJoinRule::create(),
326 ],
327 ApplyOrder::TopDown,
328 )
329});
330
331static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
332 OptimizationStage::new(
333 "Push down the calculation of inputs of join's condition",
334 vec![PushCalculationOfJoinRule::create()],
335 ApplyOrder::TopDown,
336 )
337});
338
339static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
340 OptimizationStage::new(
341 "Convert Distinct Aggregation",
342 vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
343 ApplyOrder::TopDown,
344 )
345});
346
347static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
348 OptimizationStage::new(
349 "Convert Distinct Aggregation",
350 vec![
351 UnionToDistinctRule::create(),
352 DistinctAggRule::create(false),
353 ],
354 ApplyOrder::TopDown,
355 )
356});
357
358static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
359 OptimizationStage::new(
360 "Simplify Aggregation",
361 vec![
362 AggGroupBySimplifyRule::create(),
363 AggCallMergeRule::create(),
364 UnifyFirstLastValueRule::create(),
365 ],
366 ApplyOrder::TopDown,
367 )
368});
369
370static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
371 OptimizationStage::new(
372 "Join Commute".to_owned(),
373 vec![JoinCommuteRule::create()],
374 ApplyOrder::TopDown,
375 )
376});
377
378static CONSTANT_OUTPUT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
379 OptimizationStage::new(
380 "Constant Output Operator Remove",
381 vec![EmptyAggRemoveRule::create()],
382 ApplyOrder::TopDown,
383 )
384});
385
386static PROJECT_REMOVE: LazyLock<OptimizationStage> = LazyLock::new(|| {
387 OptimizationStage::new(
388 "Project Remove",
389 vec![
390 ProjectMergeRule::create(),
392 ProjectEliminateRule::create(),
393 TrivialProjectToValuesRule::create(),
394 UnionInputValuesMergeRule::create(),
395 JoinProjectTransposeRule::create(),
396 ProjectJoinMergeRule::create(),
399 AggProjectMergeRule::create(),
400 ],
401 ApplyOrder::BottomUp,
402 )
403});
404
405static SPLIT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
406 OptimizationStage::new(
407 "Split Over Window",
408 vec![OverWindowSplitRule::create()],
409 ApplyOrder::TopDown,
410 )
411});
412
413static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
417 OptimizationStage::new(
418 "Convert Over Window",
419 vec![
420 ProjectMergeRule::create(),
421 ProjectEliminateRule::create(),
422 TrivialProjectToValuesRule::create(),
423 UnionInputValuesMergeRule::create(),
424 OverWindowToAggAndJoinRule::create(),
425 OverWindowToTopNRule::create(),
426 ],
427 ApplyOrder::TopDown,
428 )
429});
430
431static CONVERT_OVER_WINDOW_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
433 OptimizationStage::new(
434 "Convert Over Window",
435 vec![
436 ProjectMergeRule::create(),
437 ProjectEliminateRule::create(),
438 TrivialProjectToValuesRule::create(),
439 UnionInputValuesMergeRule::create(),
440 OverWindowToAggAndJoinRule::create(),
441 ],
442 ApplyOrder::TopDown,
443 )
444});
445
446static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
447 OptimizationStage::new(
448 "Merge Over Window",
449 vec![OverWindowMergeRule::create()],
450 ApplyOrder::TopDown,
451 )
452});
453
454static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
455 OptimizationStage::new(
456 "Rewrite Like Expr",
457 vec![RewriteLikeExprRule::create()],
458 ApplyOrder::TopDown,
459 )
460});
461
462static TOP_N_AGG_ON_INDEX: LazyLock<OptimizationStage> = LazyLock::new(|| {
463 OptimizationStage::new(
464 "TopN/SimpleAgg on Index",
465 vec![
466 TopNProjectTransposeRule::create(),
467 TopNOnIndexRule::create(),
468 MinMaxOnIndexRule::create(),
469 ],
470 ApplyOrder::TopDown,
471 )
472});
473
474static PROJECT_TOP_N_TRANSPOSE: LazyLock<OptimizationStage> = LazyLock::new(|| {
475 OptimizationStage::new(
476 "Project TopN Transpose",
477 vec![ProjectTopNTransposeRule::create()],
478 ApplyOrder::TopDown,
479 )
480});
481
482static ALWAYS_FALSE_FILTER: LazyLock<OptimizationStage> = LazyLock::new(|| {
483 OptimizationStage::new(
484 "Void always-false filter's downstream",
485 vec![AlwaysFalseFilterRule::create()],
486 ApplyOrder::TopDown,
487 )
488});
489
490static LIMIT_PUSH_DOWN: LazyLock<OptimizationStage> = LazyLock::new(|| {
491 OptimizationStage::new(
492 "Push Down Limit",
493 vec![LimitPushDownRule::create()],
494 ApplyOrder::TopDown,
495 )
496});
497
498static PULL_UP_HOP: LazyLock<OptimizationStage> = LazyLock::new(|| {
499 OptimizationStage::new(
500 "Pull Up Hop",
501 vec![PullUpHopRule::create()],
502 ApplyOrder::BottomUp,
503 )
504});
505
506static SET_OPERATION_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
507 OptimizationStage::new(
508 "Set Operation To Join",
509 vec![
510 IntersectToSemiJoinRule::create(),
511 ExceptToAntiJoinRule::create(),
512 ],
513 ApplyOrder::BottomUp,
514 )
515});
516
517static GROUPING_SETS: LazyLock<OptimizationStage> = LazyLock::new(|| {
518 OptimizationStage::new(
519 "Grouping Sets",
520 vec![
521 GroupingSetsToExpandRule::create(),
522 ExpandToProjectRule::create(),
523 ],
524 ApplyOrder::TopDown,
525 )
526});
527
528static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
529 OptimizationStage::new(
530 "Common Sub Expression Extract",
531 vec![CommonSubExprExtractRule::create()],
532 ApplyOrder::TopDown,
533 )
534});
535
536static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock<OptimizationStage> = LazyLock::new(|| {
537 OptimizationStage::new(
538 "Logical Filter Expression Simplify",
539 vec![LogicalFilterExpressionSimplifyRule::create()],
540 ApplyOrder::TopDown,
541 )
542});
543
544static REWRITE_SOURCE_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
545 OptimizationStage::new(
546 "Rewrite Source For Batch",
547 vec![SourceToKafkaScanRule::create()],
548 ApplyOrder::TopDown,
549 )
550});
551
552static MATERIALIZE_ICEBERG_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
553 OptimizationStage::new(
554 "Materialize Iceberg Scan",
555 vec![
556 IcebergEngineStorageSelectionRule::create(),
558 IcebergIntermediateScanRule::create(),
561 ],
562 ApplyOrder::TopDown,
563 )
564});
565
566static ICEBERG_COUNT_STAR: LazyLock<OptimizationStage> = LazyLock::new(|| {
567 OptimizationStage::new(
568 "Iceberg Count Star Optimization",
569 vec![IcebergCountStarRule::create()],
570 ApplyOrder::BottomUp,
571 )
572});
573
574static TOP_N_TO_VECTOR_SEARCH: LazyLock<OptimizationStage> = LazyLock::new(|| {
575 OptimizationStage::new(
576 "TopN to Vector Search",
577 vec![TopNToVectorSearchRule::create()],
578 ApplyOrder::BottomUp,
579 )
580});
581
582static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH: LazyLock<OptimizationStage> =
583 LazyLock::new(|| {
584 OptimizationStage::new(
585 "Correlated TopN to Vector Search",
586 vec![CorrelatedTopNToVectorSearchRule::create(true)],
587 ApplyOrder::BottomUp,
588 )
589 });
590
591static CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM: LazyLock<OptimizationStage> =
592 LazyLock::new(|| {
593 OptimizationStage::new(
594 "Correlated TopN to Vector Search",
595 vec![CorrelatedTopNToVectorSearchRule::create(false)],
596 ApplyOrder::BottomUp,
597 )
598 });
599
600static BATCH_MV_SELECTION: LazyLock<OptimizationStage> = LazyLock::new(|| {
601 OptimizationStage::new(
602 "Batch Mv Selection",
603 vec![MvSelectionRule::create()],
604 ApplyOrder::TopDown,
605 )
606});
607
608impl LogicalOptimizer {
609 pub fn predicate_pushdown(
610 plan: LogicalPlanRef,
611 explain_trace: bool,
612 ctx: &OptimizerContextRef,
613 ) -> LogicalPlanRef {
614 let plan = plan.predicate_pushdown(
615 Condition::true_cond(),
616 &mut PredicatePushdownContext::new(plan.clone()),
617 );
618 if explain_trace {
619 ctx.trace("Predicate Push Down:");
620 ctx.trace(plan.explain_to_string());
621 }
622 plan
623 }
624
625 pub fn subquery_unnesting(
626 mut plan: LogicalPlanRef,
627 enable_share_plan: bool,
628 explain_trace: bool,
629 ctx: &OptimizerContextRef,
630 ) -> Result<LogicalPlanRef> {
631 if !has_logical_apply(plan.clone()) {
633 return Ok(plan);
634 }
635 plan = plan.optimize_by_rules(&SIMPLE_UNNESTING)?;
637 debug_assert!(!HasMaxOneRowApply().visit(plan.clone()));
638 plan = Self::predicate_pushdown(plan, explain_trace, ctx);
641 plan = plan.optimize_by_rules(&VALUES_EXTRACT_PROJECT)?;
643 plan = if enable_share_plan {
647 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE)?
648 } else {
649 plan.optimize_by_rules(&GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE)?
650 };
651 plan = plan.optimize_by_rules_until_fix_point(&GENERAL_UNNESTING_PUSH_DOWN_APPLY)?;
652
653 plan.check_apply_elimination()?;
655
656 Ok(plan)
657 }
658
659 pub fn column_pruning(
660 mut plan: LogicalPlanRef,
661 explain_trace: bool,
662 ctx: &OptimizerContextRef,
663 ) -> LogicalPlanRef {
664 let required_cols = (0..plan.schema().len()).collect_vec();
665 let mut column_pruning_ctx = ColumnPruningContext::new(plan.clone());
666 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
667 if explain_trace {
669 ctx.trace("Prune Columns:");
670 ctx.trace(plan.explain_to_string());
671 }
672
673 if column_pruning_ctx.need_second_round() {
674 plan = plan.prune_col(&required_cols, &mut column_pruning_ctx);
677 if explain_trace {
678 ctx.trace("Prune Columns (For DAG):");
679 ctx.trace(plan.explain_to_string());
680 }
681 }
682 plan
683 }
684
685 pub fn inline_now_proc_time(plan: LogicalPlanRef, ctx: &OptimizerContextRef) -> LogicalPlanRef {
686 let mut v = NowProcTimeFinder::default();
688 plan.visit_exprs_recursive(&mut v);
689 if !v.has() {
690 return plan;
691 }
692
693 let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();
694
695 let plan = plan.rewrite_exprs_recursive(&mut v);
696
697 if ctx.is_explain_trace() {
698 ctx.trace("Inline Now and ProcTime:");
699 ctx.trace(plan.explain_to_string());
700 }
701 plan
702 }
703
704 pub fn gen_optimized_logical_plan_for_stream(
705 mut plan: LogicalPlanRef,
706 ) -> Result<LogicalPlanRef> {
707 let ctx = plan.ctx();
708 let explain_trace = ctx.is_explain_trace();
709
710 if explain_trace {
711 ctx.trace("Begin:");
712 ctx.trace(plan.explain_to_string());
713 }
714
715 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
717 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
719 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
721
722 let enable_share_plan = ctx.session_ctx().config().enable_share_plan();
726 if enable_share_plan {
727 plan = plan.common_subplan_sharing();
729 plan = plan.prune_share();
730 if explain_trace {
731 ctx.trace("Common Sub-plan Sharing:");
732 ctx.trace(plan.explain_to_string());
733 }
734 } else {
735 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
736
737 plan = ShareSourceRewriter::share_source(plan);
741 if explain_trace {
742 ctx.trace("Share Source:");
743 ctx.trace(plan.explain_to_string());
744 }
745 }
746 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
747 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
748 plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW)?;
751 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
753
754 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_STREAM)?;
755
756 plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
757 if has_logical_max_one_row(plan.clone()) {
758 bail!("Scalar subquery might produce more than one row.");
762 }
763
764 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
767
768 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
770
771 if plan.ctx().session_ctx().config().enable_join_ordering() {
772 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
776
777 if plan
779 .ctx()
780 .session_ctx()
781 .config()
782 .streaming_enable_bushy_join()
783 {
784 plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING)?;
785 } else {
786 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
787 }
788 }
789
790 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
793
794 plan = plan.optimize_by_rules(&FILTER_WITH_NOW_TO_JOIN)?;
796
797 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
799
800 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
801 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
804 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW)?;
805 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
806
807 let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg();
808 plan = if force_split_distinct_agg {
811 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?
812 } else {
813 plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)?
814 };
815
816 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
817
818 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
819
820 plan = Self::column_pruning(plan, explain_trace, &ctx);
822 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
823
824 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
825 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
826
827 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
828
829 #[cfg(debug_assertions)]
830 InputRefValidator.validate(plan.clone());
831
832 ctx.may_store_explain_logical(&plan);
833
834 Ok(plan)
835 }
836
837 pub fn gen_optimized_logical_plan_for_batch(
838 mut plan: LogicalPlanRef,
839 ) -> Result<LogicalPlanRef> {
840 let ctx = plan.ctx();
841 let explain_trace = ctx.is_explain_trace();
842
843 if explain_trace {
844 ctx.trace("Begin:");
845 ctx.trace(plan.explain_to_string());
846 }
847
848 if ctx.session_ctx().config().enable_mv_selection() {
849 let query_relations =
850 RelationCollectorVisitor::collect_with(HashSet::new(), plan.clone());
851 Self::register_batch_mview_candidates(ctx.session_ctx(), &ctx, &query_relations);
852 plan = plan.optimize_by_rules(&BATCH_MV_SELECTION)?;
853 }
854
855 plan = Self::inline_now_proc_time(plan, &ctx);
857
858 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
860
861 plan = plan.optimize_by_rules(&REWRITE_SOURCE_FOR_BATCH)?;
862 plan = plan.optimize_by_rules(&GROUPING_SETS)?;
863 plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR)?;
864 plan = plan.optimize_by_rules(&SET_OPERATION_MERGE)?;
865 plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN)?;
866 plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER)?;
867 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
869 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
870 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
871 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
872 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_GET_CHANNEL_DELTA_STATS)?;
873 plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
874 plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;
876
877 plan = plan.optimize_by_rules(&CORRELATED_TOP_N_TO_VECTOR_SEARCH_FOR_BATCH)?;
878
879 plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;
880
881 plan = plan.optimize_by_rules(&LOGICAL_FILTER_EXPRESSION_SIMPLIFY)?;
885
886 let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
888 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
889
890 if plan.ctx().session_ctx().config().enable_join_ordering() {
891 plan = plan.optimize_by_rules(&TO_MULTI_JOIN)?;
895
896 plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_ORDERING)?;
898 }
899
900 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
903 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
904 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
905 }
906
907 plan = plan.optimize_by_rules(&PUSH_CALC_OF_JOIN)?;
909
910 plan = plan.optimize_by_rules(&SPLIT_OVER_WINDOW)?;
911 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
914 last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied();
915 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
916 }
917 plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW_FOR_BATCH)?;
918 plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW)?;
919
920 plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH)?;
922
923 plan = plan.optimize_by_rules(&SIMPLIFY_AGG)?;
924
925 plan = plan.optimize_by_rules(&JOIN_COMMUTE)?;
926
927 plan = plan.optimize_by_rules(&TOP_N_TO_VECTOR_SEARCH)?;
928
929 plan = Self::column_pruning(plan, explain_trace, &ctx);
931 if last_total_rule_applied_before_predicate_pushdown != ctx.total_rule_applied() {
932 (#[allow(unused_assignments)]
933 last_total_rule_applied_before_predicate_pushdown) = ctx.total_rule_applied();
934 plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
935 }
936
937 plan = plan.optimize_by_rules(&MATERIALIZE_ICEBERG_SCAN)?;
939
940 plan = plan.optimize_by_rules(&CONSTANT_OUTPUT_REMOVE)?;
941 plan = plan.optimize_by_rules(&PROJECT_REMOVE)?;
942
943 plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT)?;
944
945 plan = plan.optimize_by_rules(&ICEBERG_COUNT_STAR)?;
947
948 plan = plan.optimize_by_rules(&PULL_UP_HOP)?;
949
950 plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX)?;
951
952 plan = plan.optimize_by_rules(&PROJECT_TOP_N_TRANSPOSE)?;
953
954 plan = plan.optimize_by_rules(&LIMIT_PUSH_DOWN)?;
955
956 plan = plan.optimize_by_rules(&DAG_TO_TREE)?;
957
958 #[cfg(debug_assertions)]
959 InputRefValidator.validate(plan.clone());
960
961 ctx.may_store_explain_logical(&plan);
962
963 Ok(plan)
964 }
965
966 fn register_batch_mview_candidates(
967 session: &SessionImpl,
968 context: &OptimizerContextRef,
969 query_relations: &HashSet<ObjectId>,
970 ) {
971 let catalog_reader = session.env().catalog_reader().read_guard();
972 let user_reader = session.env().user_info_reader().read_guard();
973 let Some(current_user) = user_reader.get_user_by_name(&session.user_name()) else {
974 return;
975 };
976 let mut mv_dependencies: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
977 let mut mviews_with_source_dependency: HashSet<ObjectId> = HashSet::new();
978 for dep in catalog_reader.iter_object_dependencies() {
979 mv_dependencies
980 .entry(dep.object_id)
981 .or_default()
982 .insert(dep.referenced_object_id);
983 if dep.referenced_object_type == PbObjectType::Source {
984 mviews_with_source_dependency.insert(dep.object_id);
985 }
986 }
987 let db_name = session.database();
988 let Ok(schemas) = catalog_reader.iter_schemas(&db_name) else {
989 return;
990 };
991
992 for schema in schemas {
993 for mv in schema.iter_created_mvs_with_acl(current_user) {
994 let mv_object_id = mv.id().as_object_id();
995 if mviews_with_source_dependency.contains(&mv_object_id) {
996 continue;
997 }
998 let is_subset = mv_dependencies
999 .get(&mv_object_id)
1000 .is_some_and(|deps| deps.is_subset(query_relations));
1001 if !is_subset {
1002 continue;
1003 }
1004 let Ok(stmt) = mv.create_sql_ast() else {
1005 continue;
1006 };
1007 let Statement::CreateView {
1008 materialized: true,
1009 query,
1010 ..
1011 } = stmt
1012 else {
1013 continue;
1014 };
1015 let mut binder = Binder::new_for_batch(session);
1016 let Ok(bound_query) = binder.bind_query(&query) else {
1017 continue;
1018 };
1019 let mut planner = Planner::new_for_batch_dql(context.clone());
1020 let Ok(plan_root) = planner.plan_query(bound_query) else {
1021 continue;
1022 };
1023 context.add_batch_mview_candidate(mv.clone(), plan_root.plan.clone());
1024 }
1025 }
1026 }
1027}