risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs1use std::collections::BTreeMap;
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::{BoxedRule, Rule};
25use crate::optimizer::PlanRef;
26use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
27use crate::optimizer::property::Order;
28
29pub struct TopNOnIndexRule {}
30
31impl Rule for TopNOnIndexRule {
32 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
33 let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
34 let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
35 let order = logical_top_n.topn_order();
36 if order.column_orders.is_empty() {
37 return None;
38 }
39 if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
40 Some(p)
41 } else {
42 self.try_on_index(logical_top_n, logical_scan, order)
43 }
44 }
45}
46
47impl TopNOnIndexRule {
48 pub fn create() -> BoxedRule {
49 Box::new(TopNOnIndexRule {})
50 }
51
52 fn try_on_index(
53 &self,
54 logical_top_n: &LogicalTopN,
55 logical_scan: LogicalScan,
56 required_order: &Order,
57 ) -> Option<PlanRef> {
58 let scan_predicates = logical_scan.predicate();
59 let input_refs = scan_predicates.get_eq_const_input_refs();
60 let prefix = input_refs
61 .into_iter()
62 .map(|input_ref| ColumnOrder {
63 column_index: input_ref.index,
64 order_type: OrderType::ascending(),
65 })
66 .collect();
67 let order_satisfied_index =
68 logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
69 let mut longest_prefix: Option<Order> = None;
70 let mut selected_index = None;
71 for (index, prefix) in order_satisfied_index {
72 if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
73 longest_prefix = Some(prefix.clone());
74 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
75 selected_index = Some(
76 logical_top_n
77 .clone_with_input_and_prefix(index_scan.into(), prefix)
78 .into(),
79 );
80 }
81 }
82 }
83 selected_index
84 }
85
86 fn try_on_pk(
87 &self,
88 logical_top_n: &LogicalTopN,
89 logical_scan: LogicalScan,
90 order: &Order,
91 ) -> Option<PlanRef> {
92 let output_col_map = logical_scan
93 .output_col_idx()
94 .iter()
95 .cloned()
96 .enumerate()
97 .map(|(id, col)| (col, id))
98 .collect::<BTreeMap<_, _>>();
99 let unmatched_idx = output_col_map.len();
100 let primary_key = logical_scan.primary_key();
101 let primary_key_order = Order {
102 column_orders: primary_key
103 .iter()
104 .map(|o| {
105 ColumnOrder::new(
106 *output_col_map
107 .get(&o.column_index)
108 .unwrap_or(&unmatched_idx),
109 o.order_type,
110 )
111 })
112 .collect::<Vec<_>>(),
113 };
114 if primary_key_order.satisfies(order) {
115 Some(logical_top_n.clone_with_input(logical_scan.into()).into())
116 } else {
117 None
118 }
119 }
120}