risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::BTreeMap;
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::prelude::{PlanRef, *};
25use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
26use crate::optimizer::property::Order;
27
28pub struct TopNOnIndexRule {}
29
30impl Rule<Logical> for TopNOnIndexRule {
31    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
32        let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
33        let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
34        let order = logical_top_n.topn_order();
35        if order.column_orders.is_empty() {
36            return None;
37        }
38        if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
39            Some(p)
40        } else {
41            self.try_on_index(logical_top_n, logical_scan, order)
42        }
43    }
44}
45
46impl TopNOnIndexRule {
47    pub fn create() -> BoxedRule {
48        Box::new(TopNOnIndexRule {})
49    }
50
51    fn try_on_index(
52        &self,
53        logical_top_n: &LogicalTopN,
54        logical_scan: LogicalScan,
55        required_order: &Order,
56    ) -> Option<PlanRef> {
57        let scan_predicates = logical_scan.predicate();
58        let input_refs = scan_predicates.get_eq_const_input_refs();
59        let prefix = input_refs
60            .into_iter()
61            .flat_map(|input_ref| {
62                [
63                    ColumnOrder {
64                        column_index: input_ref.index,
65                        order_type: OrderType::ascending_nulls_first(),
66                    },
67                    ColumnOrder {
68                        column_index: input_ref.index,
69                        order_type: OrderType::ascending_nulls_last(),
70                    },
71                    ColumnOrder {
72                        column_index: input_ref.index,
73                        order_type: OrderType::descending_nulls_first(),
74                    },
75                    ColumnOrder {
76                        column_index: input_ref.index,
77                        order_type: OrderType::descending_nulls_last(),
78                    },
79                ]
80            })
81            .collect();
82        let order_satisfied_index =
83            logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
84        let mut longest_prefix: Option<Order> = None;
85        let mut selected_index = None;
86        for (index, prefix) in order_satisfied_index {
87            if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
88                longest_prefix = Some(prefix.clone());
89                if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
90                    selected_index = Some(
91                        logical_top_n
92                            .clone_with_input_and_prefix(index_scan.into(), prefix)
93                            .into(),
94                    );
95                }
96            }
97        }
98        selected_index
99    }
100
101    fn try_on_pk(
102        &self,
103        logical_top_n: &LogicalTopN,
104        logical_scan: LogicalScan,
105        order: &Order,
106    ) -> Option<PlanRef> {
107        let output_col_map = logical_scan
108            .output_col_idx()
109            .iter()
110            .cloned()
111            .enumerate()
112            .map(|(id, col)| (col, id))
113            .collect::<BTreeMap<_, _>>();
114        let unmatched_idx = output_col_map.len();
115        let primary_key = logical_scan.primary_key();
116        let primary_key_order = Order {
117            column_orders: primary_key
118                .iter()
119                .map(|o| {
120                    ColumnOrder::new(
121                        *output_col_map
122                            .get(&o.column_index)
123                            .unwrap_or(&unmatched_idx),
124                        o.order_type,
125                    )
126                })
127                .collect::<Vec<_>>(),
128        };
129        if primary_key_order.satisfies(order) {
130            Some(logical_top_n.clone_with_input(logical_scan.into()).into())
131        } else {
132            None
133        }
134    }
135}