risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
18
19use super::prelude::{PlanRef, *};
20use crate::optimizer::plan_node::{LogicalProject, LogicalScan, LogicalTopN, PlanTreeNodeUnary};
21use crate::optimizer::property::Order;
22
23pub struct TopNOnIndexRule {}
24
25impl Rule<Logical> for TopNOnIndexRule {
26    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27        let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
28        let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
29        let order = logical_top_n.topn_order();
30        if order.column_orders.is_empty() {
31            return None;
32        }
33        if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
34            Some(p)
35        } else {
36            self.try_on_index(logical_top_n, logical_scan, order)
37        }
38    }
39}
40
41impl TopNOnIndexRule {
42    pub fn create() -> BoxedRule {
43        Box::new(TopNOnIndexRule {})
44    }
45
46    fn try_on_index(
47        &self,
48        logical_top_n: &LogicalTopN,
49        logical_scan: LogicalScan,
50        required_order: &Order,
51    ) -> Option<PlanRef> {
52        let (scan_for_index, output_len, _output_col_map, prefix) =
53            Self::prepare_scan_for_predicate(logical_scan);
54        let scan_output_len = scan_for_index.output_col_idx().len();
55        let order_satisfied_index =
56            scan_for_index.indexes_satisfy_order_with_prefix(required_order, &prefix);
57        let mut longest_prefix: Option<Order> = None;
58        let mut selected_index = None;
59        for (index, prefix) in order_satisfied_index {
60            if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
61                longest_prefix = Some(prefix.clone());
62                if let Some(index_scan) = scan_for_index.to_index_scan_if_index_covered(index) {
63                    selected_index = Some(Self::finish_top_n(
64                        logical_top_n,
65                        index_scan.into(),
66                        prefix,
67                        output_len,
68                        scan_output_len,
69                    ));
70                }
71            }
72        }
73        selected_index
74    }
75
76    fn try_on_pk(
77        &self,
78        logical_top_n: &LogicalTopN,
79        logical_scan: LogicalScan,
80        order: &Order,
81    ) -> Option<PlanRef> {
82        let (scan_for_pk, output_len, output_col_map, prefix) =
83            Self::prepare_scan_for_predicate(logical_scan);
84        let scan_output_len = scan_for_pk.output_col_idx().len();
85        let unmatched_idx = output_col_map.len();
86        let primary_key = scan_for_pk.primary_key();
87        let primary_key_order = Order {
88            column_orders: primary_key
89                .iter()
90                .map(|o| {
91                    ColumnOrder::new(
92                        *output_col_map
93                            .get(&o.column_index)
94                            .unwrap_or(&unmatched_idx),
95                        o.order_type,
96                    )
97                })
98                .collect::<Vec<_>>(),
99        };
100        let mut pk_orders_iter = primary_key_order.column_orders.iter().cloned().peekable();
101        let fixed_prefix = {
102            let mut fixed_prefix = vec![];
103            loop {
104                match pk_orders_iter.peek() {
105                    Some(order) if prefix.contains(order) => {
106                        let order = pk_orders_iter.next().unwrap();
107                        fixed_prefix.push(order);
108                    }
109                    _ => break,
110                }
111            }
112            Order {
113                column_orders: fixed_prefix,
114            }
115        };
116        let remaining_orders = Order {
117            column_orders: pk_orders_iter.collect(),
118        };
119        if !remaining_orders.satisfies(order) {
120            return None;
121        }
122        Some(Self::finish_top_n(
123            logical_top_n,
124            scan_for_pk.into(),
125            fixed_prefix,
126            output_len,
127            scan_output_len,
128        ))
129    }
130
131    fn prepare_scan_for_predicate(
132        logical_scan: LogicalScan,
133    ) -> (
134        LogicalScan,
135        usize,
136        BTreeMap<usize, usize>,
137        HashSet<ColumnOrder>,
138    ) {
139        let output_len = logical_scan.output_col_idx().len();
140        let scan = if logical_scan.output_col_idx() == logical_scan.required_col_idx() {
141            logical_scan
142        } else {
143            // `required_col_idx` is built by appending predicate columns to `output_col_idx`,
144            // so `output_col_idx` is always a prefix of `required_col_idx`.
145            logical_scan.clone_with_output_indices(logical_scan.required_col_idx().clone())
146        };
147        let output_col_map = Self::output_col_map(&scan);
148        let prefix = Self::build_eq_const_prefix(&scan, &output_col_map);
149        (scan, output_len, output_col_map, prefix)
150    }
151
152    fn output_col_map(scan: &LogicalScan) -> BTreeMap<usize, usize> {
153        scan.output_col_idx()
154            .iter()
155            .cloned()
156            .enumerate()
157            .map(|(id, col)| (col, id))
158            .collect()
159    }
160
161    fn build_eq_const_prefix(
162        scan: &LogicalScan,
163        output_col_map: &BTreeMap<usize, usize>,
164    ) -> HashSet<ColumnOrder> {
165        let input_refs = scan.predicate().get_eq_const_input_refs();
166        input_refs
167            .into_iter()
168            .flat_map(|input_ref| {
169                output_col_map
170                    .get(&input_ref.index)
171                    .into_iter()
172                    .flat_map(|&output_idx| {
173                        OrderType::all()
174                            .into_iter()
175                            .map(move |order_type| ColumnOrder {
176                                column_index: output_idx,
177                                order_type,
178                            })
179                    })
180            })
181            .collect()
182    }
183
184    fn finish_top_n(
185        logical_top_n: &LogicalTopN,
186        input: PlanRef,
187        prefix: Order,
188        output_len: usize,
189        scan_output_len: usize,
190    ) -> PlanRef {
191        let top_n = logical_top_n.clone_with_input_and_prefix(input, prefix);
192        if scan_output_len == output_len {
193            top_n.into()
194        } else {
195            LogicalProject::with_out_col_idx(top_n.into(), 0..output_len).into()
196        }
197    }
198}