risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs1use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
18
19use super::prelude::{PlanRef, *};
20use crate::optimizer::plan_node::{LogicalProject, LogicalScan, LogicalTopN, PlanTreeNodeUnary};
21use crate::optimizer::property::Order;
22
23pub struct TopNOnIndexRule {}
24
25impl Rule<Logical> for TopNOnIndexRule {
26 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27 let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
28 let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
29 let order = logical_top_n.topn_order();
30 if order.column_orders.is_empty() {
31 return None;
32 }
33 if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
34 Some(p)
35 } else {
36 self.try_on_index(logical_top_n, logical_scan, order)
37 }
38 }
39}
40
41impl TopNOnIndexRule {
42 pub fn create() -> BoxedRule {
43 Box::new(TopNOnIndexRule {})
44 }
45
46 fn try_on_index(
47 &self,
48 logical_top_n: &LogicalTopN,
49 logical_scan: LogicalScan,
50 required_order: &Order,
51 ) -> Option<PlanRef> {
52 let (scan_for_index, output_len, _output_col_map, prefix) =
53 Self::prepare_scan_for_predicate(logical_scan);
54 let scan_output_len = scan_for_index.output_col_idx().len();
55 let order_satisfied_index =
56 scan_for_index.indexes_satisfy_order_with_prefix(required_order, &prefix);
57 let mut longest_prefix: Option<Order> = None;
58 let mut selected_index = None;
59 for (index, prefix) in order_satisfied_index {
60 if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
61 longest_prefix = Some(prefix.clone());
62 if let Some(index_scan) = scan_for_index.to_index_scan_if_index_covered(index) {
63 selected_index = Some(Self::finish_top_n(
64 logical_top_n,
65 index_scan.into(),
66 prefix,
67 output_len,
68 scan_output_len,
69 ));
70 }
71 }
72 }
73 selected_index
74 }
75
76 fn try_on_pk(
77 &self,
78 logical_top_n: &LogicalTopN,
79 logical_scan: LogicalScan,
80 order: &Order,
81 ) -> Option<PlanRef> {
82 let (scan_for_pk, output_len, output_col_map, prefix) =
83 Self::prepare_scan_for_predicate(logical_scan);
84 let scan_output_len = scan_for_pk.output_col_idx().len();
85 let unmatched_idx = output_col_map.len();
86 let primary_key = scan_for_pk.primary_key();
87 let primary_key_order = Order {
88 column_orders: primary_key
89 .iter()
90 .map(|o| {
91 ColumnOrder::new(
92 *output_col_map
93 .get(&o.column_index)
94 .unwrap_or(&unmatched_idx),
95 o.order_type,
96 )
97 })
98 .collect::<Vec<_>>(),
99 };
100 let mut pk_orders_iter = primary_key_order.column_orders.iter().cloned().peekable();
101 let fixed_prefix = {
102 let mut fixed_prefix = vec![];
103 loop {
104 match pk_orders_iter.peek() {
105 Some(order) if prefix.contains(order) => {
106 let order = pk_orders_iter.next().unwrap();
107 fixed_prefix.push(order);
108 }
109 _ => break,
110 }
111 }
112 Order {
113 column_orders: fixed_prefix,
114 }
115 };
116 let remaining_orders = Order {
117 column_orders: pk_orders_iter.collect(),
118 };
119 if !remaining_orders.satisfies(order) {
120 return None;
121 }
122 Some(Self::finish_top_n(
123 logical_top_n,
124 scan_for_pk.into(),
125 fixed_prefix,
126 output_len,
127 scan_output_len,
128 ))
129 }
130
131 fn prepare_scan_for_predicate(
132 logical_scan: LogicalScan,
133 ) -> (
134 LogicalScan,
135 usize,
136 BTreeMap<usize, usize>,
137 HashSet<ColumnOrder>,
138 ) {
139 let output_len = logical_scan.output_col_idx().len();
140 let scan = if logical_scan.output_col_idx() == logical_scan.required_col_idx() {
141 logical_scan
142 } else {
143 logical_scan.clone_with_output_indices(logical_scan.required_col_idx().clone())
146 };
147 let output_col_map = Self::output_col_map(&scan);
148 let prefix = Self::build_eq_const_prefix(&scan, &output_col_map);
149 (scan, output_len, output_col_map, prefix)
150 }
151
152 fn output_col_map(scan: &LogicalScan) -> BTreeMap<usize, usize> {
153 scan.output_col_idx()
154 .iter()
155 .cloned()
156 .enumerate()
157 .map(|(id, col)| (col, id))
158 .collect()
159 }
160
161 fn build_eq_const_prefix(
162 scan: &LogicalScan,
163 output_col_map: &BTreeMap<usize, usize>,
164 ) -> HashSet<ColumnOrder> {
165 let input_refs = scan.predicate().get_eq_const_input_refs();
166 input_refs
167 .into_iter()
168 .flat_map(|input_ref| {
169 output_col_map
170 .get(&input_ref.index)
171 .into_iter()
172 .flat_map(|&output_idx| {
173 OrderType::all()
174 .into_iter()
175 .map(move |order_type| ColumnOrder {
176 column_index: output_idx,
177 order_type,
178 })
179 })
180 })
181 .collect()
182 }
183
184 fn finish_top_n(
185 logical_top_n: &LogicalTopN,
186 input: PlanRef,
187 prefix: Order,
188 output_len: usize,
189 scan_output_len: usize,
190 ) -> PlanRef {
191 let top_n = logical_top_n.clone_with_input_and_prefix(input, prefix);
192 if scan_output_len == output_len {
193 top_n.into()
194 } else {
195 LogicalProject::with_out_col_idx(top_n.into(), 0..output_len).into()
196 }
197 }
198}