risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::BTreeMap;
21use std::vec;
22
23use itertools::Itertools;
24use risingwave_common::types::DataType;
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_expr::aggregate::{AggType, PbAggKind};
27
28use super::prelude::{PlanRef, *};
29use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
30use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
31use crate::optimizer::plan_node::{
32    LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
33};
34use crate::optimizer::property::Order;
35use crate::utils::{Condition, IndexSet};
36
37pub struct MinMaxOnIndexRule {}
38
39impl Rule<Logical> for MinMaxOnIndexRule {
40    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
41        let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
42        if !logical_agg.group_key().is_empty() {
43            return None;
44        }
45        let calls = logical_agg.agg_calls();
46        if calls.is_empty() {
47            return None;
48        }
49        let first_call = calls.iter().exactly_one().ok()?;
50
51        if matches!(
52            first_call.agg_type,
53            AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
54        ) && !first_call.distinct
55            && first_call.filter.always_true()
56            && first_call.order_by.is_empty()
57        {
58            let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
59            let kind = &calls.first()?.agg_type;
60            if !logical_scan.predicate().always_true() {
61                return None;
62            }
63            let order = Order {
64                column_orders: vec![ColumnOrder::new(
65                    calls.first()?.inputs.first()?.index(),
66                    if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
67                        OrderType::ascending()
68                    } else {
69                        OrderType::descending()
70                    },
71                )],
72            };
73            if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
74                Some(p)
75            } else {
76                self.try_on_pk(logical_agg, logical_scan, &order)
77            }
78        } else {
79            None
80        }
81    }
82}
83
84impl MinMaxOnIndexRule {
85    pub fn create() -> BoxedRule {
86        Box::new(MinMaxOnIndexRule {})
87    }
88
89    fn try_on_index(
90        &self,
91        logical_agg: &LogicalAgg,
92        logical_scan: LogicalScan,
93        required_order: &Order,
94    ) -> Option<PlanRef> {
95        let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
96        for index in order_satisfied_index {
97            if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
98                let non_null_filter = LogicalFilter::create_with_expr(
99                    index_scan.into(),
100                    FunctionCall::new_unchecked(
101                        ExprType::IsNotNull,
102                        vec![ExprImpl::InputRef(Box::new(InputRef::new(
103                            0,
104                            logical_agg.schema().fields[0].data_type.clone(),
105                        )))],
106                        DataType::Boolean,
107                    )
108                    .into(),
109                );
110
111                let topn =
112                    LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
113
114                let formatting_agg = Agg::new(
115                    vec![PlanAggCall {
116                        agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
117                        return_type: logical_agg.schema().fields[0].data_type.clone(),
118                        inputs: vec![InputRef::new(
119                            0,
120                            logical_agg.schema().fields[0].data_type.clone(),
121                        )],
122                        order_by: vec![],
123                        distinct: false,
124                        filter: Condition {
125                            conjunctions: vec![],
126                        },
127                        direct_args: vec![],
128                    }],
129                    IndexSet::empty(),
130                    topn.into(),
131                );
132
133                return Some(formatting_agg.into());
134            }
135        }
136
137        None
138    }
139
140    fn try_on_pk(
141        &self,
142        logical_agg: &LogicalAgg,
143        logical_scan: LogicalScan,
144        order: &Order,
145    ) -> Option<PlanRef> {
146        let output_col_map = logical_scan
147            .output_col_idx()
148            .iter()
149            .cloned()
150            .enumerate()
151            .map(|(id, col)| (col, id))
152            .collect::<BTreeMap<_, _>>();
153        let unmatched_idx = output_col_map.len();
154        let primary_key = logical_scan.primary_key();
155        let primary_key_order = Order {
156            column_orders: primary_key
157                .iter()
158                .map(|o| {
159                    ColumnOrder::new(
160                        *output_col_map
161                            .get(&o.column_index)
162                            .unwrap_or(&unmatched_idx),
163                        o.order_type,
164                    )
165                })
166                .collect::<Vec<_>>(),
167        };
168        if primary_key_order.satisfies(order) {
169            let non_null_filter = LogicalFilter::create_with_expr(
170                logical_scan.into(),
171                FunctionCall::new_unchecked(
172                    ExprType::IsNotNull,
173                    vec![ExprImpl::InputRef(Box::new(InputRef::new(
174                        0,
175                        logical_agg.schema().fields[0].data_type.clone(),
176                    )))],
177                    DataType::Boolean,
178                )
179                .into(),
180            );
181
182            let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
183
184            let formatting_agg = Agg::new(
185                vec![PlanAggCall {
186                    agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
187                    return_type: logical_agg.schema().fields[0].data_type.clone(),
188                    inputs: vec![InputRef::new(
189                        0,
190                        logical_agg.schema().fields[0].data_type.clone(),
191                    )],
192                    order_by: vec![],
193                    distinct: false,
194                    filter: Condition {
195                        conjunctions: vec![],
196                    },
197                    direct_args: vec![],
198                }],
199                IndexSet::empty(),
200                topn.into(),
201            );
202
203            Some(formatting_agg.into())
204        } else {
205            None
206        }
207    }
208}