risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs1use std::collections::{BTreeMap, HashSet};
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::prelude::{PlanRef, *};
25use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
26use crate::optimizer::property::Order;
27
28pub struct TopNOnIndexRule {}
29
30impl Rule<Logical> for TopNOnIndexRule {
31 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
32 let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
33 let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
34 let order = logical_top_n.topn_order();
35 if order.column_orders.is_empty() {
36 return None;
37 }
38 if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
39 Some(p)
40 } else {
41 self.try_on_index(logical_top_n, logical_scan, order)
42 }
43 }
44}
45
46impl TopNOnIndexRule {
47 pub fn create() -> BoxedRule {
48 Box::new(TopNOnIndexRule {})
49 }
50
51 fn try_on_index(
52 &self,
53 logical_top_n: &LogicalTopN,
54 logical_scan: LogicalScan,
55 required_order: &Order,
56 ) -> Option<PlanRef> {
57 let prefix = Self::build_prefix_from_scan_predicates(&logical_scan);
58 let order_satisfied_index =
59 logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
60 let mut longest_prefix: Option<Order> = None;
61 let mut selected_index = None;
62 for (index, prefix) in order_satisfied_index {
63 if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
64 longest_prefix = Some(prefix.clone());
65 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
66 selected_index = Some(
67 logical_top_n
68 .clone_with_input_and_prefix(index_scan.into(), prefix)
69 .into(),
70 );
71 }
72 }
73 }
74 selected_index
75 }
76
77 fn try_on_pk(
78 &self,
79 logical_top_n: &LogicalTopN,
80 logical_scan: LogicalScan,
81 order: &Order,
82 ) -> Option<PlanRef> {
83 let prefix = Self::build_prefix_from_scan_predicates(&logical_scan);
84 let output_col_map = logical_scan
85 .output_col_idx()
86 .iter()
87 .cloned()
88 .enumerate()
89 .map(|(id, col)| (col, id))
90 .collect::<BTreeMap<_, _>>();
91 let unmatched_idx = output_col_map.len();
92 let primary_key = logical_scan.primary_key();
93 let primary_key_order = Order {
94 column_orders: primary_key
95 .iter()
96 .map(|o| {
97 ColumnOrder::new(
98 *output_col_map
99 .get(&o.column_index)
100 .unwrap_or(&unmatched_idx),
101 o.order_type,
102 )
103 })
104 .collect::<Vec<_>>(),
105 };
106 let mut pk_orders_iter = primary_key_order.column_orders.iter().cloned().peekable();
107 let fixed_prefix = {
108 let mut fixed_prefix = vec![];
109 loop {
110 match pk_orders_iter.peek() {
111 Some(order) if prefix.contains(order) => {
112 let order = pk_orders_iter.next().unwrap();
113 fixed_prefix.push(order);
114 }
115 _ => break,
116 }
117 }
118 Order {
119 column_orders: fixed_prefix,
120 }
121 };
122 let remaining_orders = Order {
123 column_orders: pk_orders_iter.collect(),
124 };
125 if !remaining_orders.satisfies(order) {
126 return None;
127 }
128 Some(
129 logical_top_n
130 .clone_with_input_and_prefix(logical_scan.into(), fixed_prefix)
131 .into(),
132 )
133 }
134
135 fn build_prefix_from_scan_predicates(logical_scan: &LogicalScan) -> HashSet<ColumnOrder> {
136 let scan_predicates = logical_scan.predicate();
137 let input_refs = scan_predicates.get_eq_const_input_refs();
138 input_refs
139 .into_iter()
140 .flat_map(|input_ref| {
141 [
142 ColumnOrder {
143 column_index: input_ref.index,
144 order_type: OrderType::ascending_nulls_first(),
145 },
146 ColumnOrder {
147 column_index: input_ref.index,
148 order_type: OrderType::ascending_nulls_last(),
149 },
150 ColumnOrder {
151 column_index: input_ref.index,
152 order_type: OrderType::descending_nulls_first(),
153 },
154 ColumnOrder {
155 column_index: input_ref.index,
156 order_type: OrderType::descending_nulls_last(),
157 },
158 ]
159 })
160 .collect()
161 }
162}