risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::BTreeMap;
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::{BoxedRule, Rule};
25use crate::optimizer::PlanRef;
26use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
27use crate::optimizer::property::Order;
28
29pub struct TopNOnIndexRule {}
30
31impl Rule for TopNOnIndexRule {
32    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
33        let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
34        let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
35        let order = logical_top_n.topn_order();
36        if order.column_orders.is_empty() {
37            return None;
38        }
39        if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
40            Some(p)
41        } else {
42            self.try_on_index(logical_top_n, logical_scan, order)
43        }
44    }
45}
46
47impl TopNOnIndexRule {
48    pub fn create() -> BoxedRule {
49        Box::new(TopNOnIndexRule {})
50    }
51
52    fn try_on_index(
53        &self,
54        logical_top_n: &LogicalTopN,
55        logical_scan: LogicalScan,
56        required_order: &Order,
57    ) -> Option<PlanRef> {
58        let scan_predicates = logical_scan.predicate();
59        let input_refs = scan_predicates.get_eq_const_input_refs();
60        let prefix = input_refs
61            .into_iter()
62            .map(|input_ref| ColumnOrder {
63                column_index: input_ref.index,
64                order_type: OrderType::ascending(),
65            })
66            .collect();
67        let order_satisfied_index =
68            logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
69        let mut longest_prefix: Option<Order> = None;
70        let mut selected_index = None;
71        for (index, prefix) in order_satisfied_index {
72            if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
73                longest_prefix = Some(prefix.clone());
74                if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
75                    selected_index = Some(
76                        logical_top_n
77                            .clone_with_input_and_prefix(index_scan.into(), prefix)
78                            .into(),
79                    );
80                }
81            }
82        }
83        selected_index
84    }
85
86    fn try_on_pk(
87        &self,
88        logical_top_n: &LogicalTopN,
89        logical_scan: LogicalScan,
90        order: &Order,
91    ) -> Option<PlanRef> {
92        let output_col_map = logical_scan
93            .output_col_idx()
94            .iter()
95            .cloned()
96            .enumerate()
97            .map(|(id, col)| (col, id))
98            .collect::<BTreeMap<_, _>>();
99        let unmatched_idx = output_col_map.len();
100        let primary_key = logical_scan.primary_key();
101        let primary_key_order = Order {
102            column_orders: primary_key
103                .iter()
104                .map(|o| {
105                    ColumnOrder::new(
106                        *output_col_map
107                            .get(&o.column_index)
108                            .unwrap_or(&unmatched_idx),
109                        o.order_type,
110                    )
111                })
112                .collect::<Vec<_>>(),
113        };
114        if primary_key_order.satisfies(order) {
115            Some(logical_top_n.clone_with_input(logical_scan.into()).into())
116        } else {
117            None
118        }
119    }
120}