risingwave_frontend/optimizer/rule/
top_n_on_index_rule.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::BTreeMap;
21
22use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
23
24use super::{BoxedRule, Rule};
25use crate::optimizer::PlanRef;
26use crate::optimizer::plan_node::{LogicalScan, LogicalTopN, PlanTreeNodeUnary};
27use crate::optimizer::property::Order;
28
29pub struct TopNOnIndexRule {}
30
31impl Rule for TopNOnIndexRule {
32    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
33        let logical_top_n: &LogicalTopN = plan.as_logical_top_n()?;
34        let logical_scan: LogicalScan = logical_top_n.input().as_logical_scan()?.to_owned();
35        let order = logical_top_n.topn_order();
36        if order.column_orders.is_empty() {
37            return None;
38        }
39        if let Some(p) = self.try_on_pk(logical_top_n, logical_scan.clone(), order) {
40            Some(p)
41        } else {
42            self.try_on_index(logical_top_n, logical_scan, order)
43        }
44    }
45}
46
47impl TopNOnIndexRule {
48    pub fn create() -> BoxedRule {
49        Box::new(TopNOnIndexRule {})
50    }
51
52    fn try_on_index(
53        &self,
54        logical_top_n: &LogicalTopN,
55        logical_scan: LogicalScan,
56        required_order: &Order,
57    ) -> Option<PlanRef> {
58        let scan_predicates = logical_scan.predicate();
59        let input_refs = scan_predicates.get_eq_const_input_refs();
60        let prefix = input_refs
61            .into_iter()
62            .flat_map(|input_ref| {
63                [
64                    ColumnOrder {
65                        column_index: input_ref.index,
66                        order_type: OrderType::ascending_nulls_first(),
67                    },
68                    ColumnOrder {
69                        column_index: input_ref.index,
70                        order_type: OrderType::ascending_nulls_last(),
71                    },
72                    ColumnOrder {
73                        column_index: input_ref.index,
74                        order_type: OrderType::descending_nulls_first(),
75                    },
76                    ColumnOrder {
77                        column_index: input_ref.index,
78                        order_type: OrderType::descending_nulls_last(),
79                    },
80                ]
81            })
82            .collect();
83        let order_satisfied_index =
84            logical_scan.indexes_satisfy_order_with_prefix(required_order, &prefix);
85        let mut longest_prefix: Option<Order> = None;
86        let mut selected_index = None;
87        for (index, prefix) in order_satisfied_index {
88            if prefix.len() >= longest_prefix.as_ref().map_or(0, |p| p.len()) {
89                longest_prefix = Some(prefix.clone());
90                if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
91                    selected_index = Some(
92                        logical_top_n
93                            .clone_with_input_and_prefix(index_scan.into(), prefix)
94                            .into(),
95                    );
96                }
97            }
98        }
99        selected_index
100    }
101
102    fn try_on_pk(
103        &self,
104        logical_top_n: &LogicalTopN,
105        logical_scan: LogicalScan,
106        order: &Order,
107    ) -> Option<PlanRef> {
108        let output_col_map = logical_scan
109            .output_col_idx()
110            .iter()
111            .cloned()
112            .enumerate()
113            .map(|(id, col)| (col, id))
114            .collect::<BTreeMap<_, _>>();
115        let unmatched_idx = output_col_map.len();
116        let primary_key = logical_scan.primary_key();
117        let primary_key_order = Order {
118            column_orders: primary_key
119                .iter()
120                .map(|o| {
121                    ColumnOrder::new(
122                        *output_col_map
123                            .get(&o.column_index)
124                            .unwrap_or(&unmatched_idx),
125                        o.order_type,
126                    )
127                })
128                .collect::<Vec<_>>(),
129        };
130        if primary_key_order.satisfies(order) {
131            Some(logical_top_n.clone_with_input(logical_scan.into()).into())
132        } else {
133            None
134        }
135    }
136}