risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs1use std::collections::BTreeMap;
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::{BoxedRule, Rule};
25use crate::optimizer::PlanRef;
26use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
27use crate::optimizer::property::Order;
28
29pub struct TopNOnIndexRule {}
30
31impl Rule for TopNOnIndexRule {
32 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
33 let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
34 let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
35 let order = logical_top_n.topn_order();
36 if order.column_orders.is_empty() {
37 return None;
38 }
39 if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
40 Some(p)
41 } else {
42 self.try_on_index(logical_top_n, logical_scan, order)
43 }
44 }
45}
46
47impl TopNOnIndexRule {
48 pub fn create() -> BoxedRule {
49 Box::new(TopNOnIndexRule {})
50 }
51
52 fn try_on_index(
53 &self,
54 logical_top_n: &LogicalTopN,
55 logical_scan: LogicalScan,
56 required_order: &Order,
57 ) -> Option<PlanRef> {
58 let scan_predicates = logical_scan.predicate();
59 let input_refs = scan_predicates.get_eq_const_input_refs();
60 let prefix = input_refs
61 .into_iter()
62 .flat_map(|input_ref| {
63 [
64 ColumnOrder {
65 column_index: input_ref.index,
66 order_type: OrderType::ascending_nulls_first(),
67 },
68 ColumnOrder {
69 column_index: input_ref.index,
70 order_type: OrderType::ascending_nulls_last(),
71 },
72 ColumnOrder {
73 column_index: input_ref.index,
74 order_type: OrderType::descending_nulls_first(),
75 },
76 ColumnOrder {
77 column_index: input_ref.index,
78 order_type: OrderType::descending_nulls_last(),
79 },
80 ]
81 })
82 .collect();
83 let order_satisfied_index =
84 logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
85 let mut longest_prefix: Option<Order> = None;
86 let mut selected_index = None;
87 for (index, prefix) in order_satisfied_index {
88 if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
89 longest_prefix = Some(prefix.clone());
90 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
91 selected_index = Some(
92 logical_top_n
93 .clone_with_input_and_prefix(index_scan.into(), prefix)
94 .into(),
95 );
96 }
97 }
98 }
99 selected_index
100 }
101
102 fn try_on_pk(
103 &self,
104 logical_top_n: &LogicalTopN,
105 logical_scan: LogicalScan,
106 order: &Order,
107 ) -> Option<PlanRef> {
108 let output_col_map = logical_scan
109 .output_col_idx()
110 .iter()
111 .cloned()
112 .enumerate()
113 .map(|(id, col)| (col, id))
114 .collect::<BTreeMap<_, _>>();
115 let unmatched_idx = output_col_map.len();
116 let primary_key = logical_scan.primary_key();
117 let primary_key_order = Order {
118 column_orders: primary_key
119 .iter()
120 .map(|o| {
121 ColumnOrder::new(
122 *output_col_map
123 .get(&o.column_index)
124 .unwrap_or(&unmatched_idx),
125 o.order_type,
126 )
127 })
128 .collect::<Vec<_>>(),
129 };
130 if primary_key_order.satisfies(order) {
131 Some(logical_top_n.clone_with_input(logical_scan.into()).into())
132 } else {
133 None
134 }
135 }
136}