risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::BTreeMap;
21use std::vec;
22
23use itertools::Itertools;
24use risingwave_common::types::DataType;
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_expr::aggregate::{AggType, PbAggKind};
27
28use super::{BoxedRule, Rule};
29use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
30use crate::optimizer::PlanRef;
31use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
32use crate::optimizer::plan_node::{
33    LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
34};
35use crate::optimizer::property::Order;
36use crate::utils::{Condition, IndexSet};
37
38pub struct MinMaxOnIndexRule {}
39
40impl Rule for MinMaxOnIndexRule {
41    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
42        let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
43        if !logical_agg.group_key().is_empty() {
44            return None;
45        }
46        let calls = logical_agg.agg_calls();
47        if calls.is_empty() {
48            return None;
49        }
50        let first_call = calls.iter().exactly_one().ok()?;
51
52        if matches!(
53            first_call.agg_type,
54            AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
55        ) && !first_call.distinct
56            && first_call.filter.always_true()
57            && first_call.order_by.is_empty()
58        {
59            let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
60            let kind = &calls.first()?.agg_type;
61            if !logical_scan.predicate().always_true() {
62                return None;
63            }
64            let order = Order {
65                column_orders: vec![ColumnOrder::new(
66                    calls.first()?.inputs.first()?.index(),
67                    if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
68                        OrderType::ascending()
69                    } else {
70                        OrderType::descending()
71                    },
72                )],
73            };
74            if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
75                Some(p)
76            } else {
77                self.try_on_pk(logical_agg, logical_scan, &order)
78            }
79        } else {
80            None
81        }
82    }
83}
84
85impl MinMaxOnIndexRule {
86    pub fn create() -> BoxedRule {
87        Box::new(MinMaxOnIndexRule {})
88    }
89
90    fn try_on_index(
91        &self,
92        logical_agg: &LogicalAgg,
93        logical_scan: LogicalScan,
94        required_order: &Order,
95    ) -> Option<PlanRef> {
96        let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
97        for index in order_satisfied_index {
98            if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
99                let non_null_filter = LogicalFilter::create_with_expr(
100                    index_scan.into(),
101                    FunctionCall::new_unchecked(
102                        ExprType::IsNotNull,
103                        vec![ExprImpl::InputRef(Box::new(InputRef::new(
104                            0,
105                            logical_agg.schema().fields[0].data_type.clone(),
106                        )))],
107                        DataType::Boolean,
108                    )
109                    .into(),
110                );
111
112                let topn =
113                    LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
114
115                let formatting_agg = Agg::new(
116                    vec![PlanAggCall {
117                        agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
118                        return_type: logical_agg.schema().fields[0].data_type.clone(),
119                        inputs: vec![InputRef::new(
120                            0,
121                            logical_agg.schema().fields[0].data_type.clone(),
122                        )],
123                        order_by: vec![],
124                        distinct: false,
125                        filter: Condition {
126                            conjunctions: vec![],
127                        },
128                        direct_args: vec![],
129                    }],
130                    IndexSet::empty(),
131                    topn.into(),
132                );
133
134                return Some(formatting_agg.into());
135            }
136        }
137
138        None
139    }
140
141    fn try_on_pk(
142        &self,
143        logical_agg: &LogicalAgg,
144        logical_scan: LogicalScan,
145        order: &Order,
146    ) -> Option<PlanRef> {
147        let output_col_map = logical_scan
148            .output_col_idx()
149            .iter()
150            .cloned()
151            .enumerate()
152            .map(|(id, col)| (col, id))
153            .collect::<BTreeMap<_, _>>();
154        let unmatched_idx = output_col_map.len();
155        let primary_key = logical_scan.primary_key();
156        let primary_key_order = Order {
157            column_orders: primary_key
158                .iter()
159                .map(|o| {
160                    ColumnOrder::new(
161                        *output_col_map
162                            .get(&o.column_index)
163                            .unwrap_or(&unmatched_idx),
164                        o.order_type,
165                    )
166                })
167                .collect::<Vec<_>>(),
168        };
169        if primary_key_order.satisfies(order) {
170            let non_null_filter = LogicalFilter::create_with_expr(
171                logical_scan.into(),
172                FunctionCall::new_unchecked(
173                    ExprType::IsNotNull,
174                    vec![ExprImpl::InputRef(Box::new(InputRef::new(
175                        0,
176                        logical_agg.schema().fields[0].data_type.clone(),
177                    )))],
178                    DataType::Boolean,
179                )
180                .into(),
181            );
182
183            let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
184
185            let formatting_agg = Agg::new(
186                vec![PlanAggCall {
187                    agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
188                    return_type: logical_agg.schema().fields[0].data_type.clone(),
189                    inputs: vec![InputRef::new(
190                        0,
191                        logical_agg.schema().fields[0].data_type.clone(),
192                    )],
193                    order_by: vec![],
194                    distinct: false,
195                    filter: Condition {
196                        conjunctions: vec![],
197                    },
198                    direct_args: vec![],
199                }],
200                IndexSet::empty(),
201                topn.into(),
202            );
203
204            Some(formatting_agg.into())
205        } else {
206            None
207        }
208    }
209}