risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs1use std::collections::BTreeMap;
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::prelude::{PlanRef, *};
25use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
26use crate::optimizer::property::Order;
27
28pub struct TopNOnIndexRule {}
29
30impl Rule<Logical> for TopNOnIndexRule {
31 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
32 let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
33 let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
34 let order = logical_top_n.topn_order();
35 if order.column_orders.is_empty() {
36 return None;
37 }
38 if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
39 Some(p)
40 } else {
41 self.try_on_index(logical_top_n, logical_scan, order)
42 }
43 }
44}
45
46impl TopNOnIndexRule {
47 pub fn create() -> BoxedRule {
48 Box::new(TopNOnIndexRule {})
49 }
50
51 fn try_on_index(
52 &self,
53 logical_top_n: &LogicalTopN,
54 logical_scan: LogicalScan,
55 required_order: &Order,
56 ) -> Option<PlanRef> {
57 let scan_predicates = logical_scan.predicate();
58 let input_refs = scan_predicates.get_eq_const_input_refs();
59 let prefix = input_refs
60 .into_iter()
61 .flat_map(|input_ref| {
62 [
63 ColumnOrder {
64 column_index: input_ref.index,
65 order_type: OrderType::ascending_nulls_first(),
66 },
67 ColumnOrder {
68 column_index: input_ref.index,
69 order_type: OrderType::ascending_nulls_last(),
70 },
71 ColumnOrder {
72 column_index: input_ref.index,
73 order_type: OrderType::descending_nulls_first(),
74 },
75 ColumnOrder {
76 column_index: input_ref.index,
77 order_type: OrderType::descending_nulls_last(),
78 },
79 ]
80 })
81 .collect();
82 let order_satisfied_index =
83 logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
84 let mut longest_prefix: Option<Order> = None;
85 let mut selected_index = None;
86 for (index, prefix) in order_satisfied_index {
87 if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
88 longest_prefix = Some(prefix.clone());
89 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
90 selected_index = Some(
91 logical_top_n
92 .clone_with_input_and_prefix(index_scan.into(), prefix)
93 .into(),
94 );
95 }
96 }
97 }
98 selected_index
99 }
100
101 fn try_on_pk(
102 &self,
103 logical_top_n: &LogicalTopN,
104 logical_scan: LogicalScan,
105 order: &Order,
106 ) -> Option<PlanRef> {
107 let output_col_map = logical_scan
108 .output_col_idx()
109 .iter()
110 .cloned()
111 .enumerate()
112 .map(|(id, col)| (col, id))
113 .collect::<BTreeMap<_, _>>();
114 let unmatched_idx = output_col_map.len();
115 let primary_key = logical_scan.primary_key();
116 let primary_key_order = Order {
117 column_orders: primary_key
118 .iter()
119 .map(|o| {
120 ColumnOrder::new(
121 *output_col_map
122 .get(&o.column_index)
123 .unwrap_or(&unmatched_idx),
124 o.order_type,
125 )
126 })
127 .collect::<Vec<_>>(),
128 };
129 if primary_key_order.satisfies(order) {
130 Some(logical_top_n.clone_with_input(logical_scan.into()).into())
131 } else {
132 None
133 }
134 }
135}