risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::vec;
17
18use itertools::Itertools;
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
21use risingwave_expr::aggregate::{AggType, PbAggKind};
22
23use super::prelude::{PlanRef, *};
24use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
25use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
26use crate::optimizer::plan_node::{
27    LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
28};
29use crate::optimizer::property::Order;
30use crate::utils::{Condition, IndexSet};
31
32pub struct MinMaxOnIndexRule {}
33
34impl Rule<Logical> for MinMaxOnIndexRule {
35    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
36        let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
37        if !logical_agg.group_key().is_empty() {
38            return None;
39        }
40        let calls = logical_agg.agg_calls();
41        if calls.is_empty() {
42            return None;
43        }
44        let first_call = calls.iter().exactly_one().ok()?;
45
46        if matches!(
47            first_call.agg_type,
48            AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
49        ) && !first_call.distinct
50            && first_call.filter.always_true()
51            && first_call.order_by.is_empty()
52        {
53            let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
54            let kind = &calls.first()?.agg_type;
55            if !logical_scan.predicate().always_true() {
56                return None;
57            }
58            let order = Order {
59                column_orders: vec![ColumnOrder::new(
60                    calls.first()?.inputs.first()?.index(),
61                    if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
62                        OrderType::ascending()
63                    } else {
64                        OrderType::descending()
65                    },
66                )],
67            };
68            if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
69                Some(p)
70            } else {
71                self.try_on_pk(logical_agg, logical_scan, &order)
72            }
73        } else {
74            None
75        }
76    }
77}
78
79impl MinMaxOnIndexRule {
80    pub fn create() -> BoxedRule {
81        Box::new(MinMaxOnIndexRule {})
82    }
83
84    fn try_on_index(
85        &self,
86        logical_agg: &LogicalAgg,
87        logical_scan: LogicalScan,
88        required_order: &Order,
89    ) -> Option<PlanRef> {
90        let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
91        for index in order_satisfied_index {
92            if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
93                let non_null_filter = LogicalFilter::create_with_expr(
94                    index_scan.into(),
95                    FunctionCall::new_unchecked(
96                        ExprType::IsNotNull,
97                        vec![ExprImpl::InputRef(Box::new(InputRef::new(
98                            0,
99                            logical_agg.schema().fields[0].data_type.clone(),
100                        )))],
101                        DataType::Boolean,
102                    )
103                    .into(),
104                );
105
106                let topn =
107                    LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
108
109                let formatting_agg = Agg::new(
110                    vec![PlanAggCall {
111                        agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
112                        return_type: logical_agg.schema().fields[0].data_type.clone(),
113                        inputs: vec![InputRef::new(
114                            0,
115                            logical_agg.schema().fields[0].data_type.clone(),
116                        )],
117                        order_by: vec![],
118                        distinct: false,
119                        filter: Condition {
120                            conjunctions: vec![],
121                        },
122                        direct_args: vec![],
123                    }],
124                    IndexSet::empty(),
125                    topn.into(),
126                );
127
128                return Some(formatting_agg.into());
129            }
130        }
131
132        None
133    }
134
135    fn try_on_pk(
136        &self,
137        logical_agg: &LogicalAgg,
138        logical_scan: LogicalScan,
139        order: &Order,
140    ) -> Option<PlanRef> {
141        let output_col_map = logical_scan
142            .output_col_idx()
143            .iter()
144            .cloned()
145            .enumerate()
146            .map(|(id, col)| (col, id))
147            .collect::<BTreeMap<_, _>>();
148        let unmatched_idx = output_col_map.len();
149        let primary_key = logical_scan.primary_key();
150        let primary_key_order = Order {
151            column_orders: primary_key
152                .iter()
153                .map(|o| {
154                    ColumnOrder::new(
155                        *output_col_map
156                            .get(&o.column_index)
157                            .unwrap_or(&unmatched_idx),
158                        o.order_type,
159                    )
160                })
161                .collect::<Vec<_>>(),
162        };
163        if primary_key_order.satisfies(order) {
164            let non_null_filter = LogicalFilter::create_with_expr(
165                logical_scan.into(),
166                FunctionCall::new_unchecked(
167                    ExprType::IsNotNull,
168                    vec![ExprImpl::InputRef(Box::new(InputRef::new(
169                        0,
170                        logical_agg.schema().fields[0].data_type.clone(),
171                    )))],
172                    DataType::Boolean,
173                )
174                .into(),
175            );
176
177            let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
178
179            let formatting_agg = Agg::new(
180                vec![PlanAggCall {
181                    agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
182                    return_type: logical_agg.schema().fields[0].data_type.clone(),
183                    inputs: vec![InputRef::new(
184                        0,
185                        logical_agg.schema().fields[0].data_type.clone(),
186                    )],
187                    order_by: vec![],
188                    distinct: false,
189                    filter: Condition {
190                        conjunctions: vec![],
191                    },
192                    direct_args: vec![],
193                }],
194                IndexSet::empty(),
195                topn.into(),
196            );
197
198            Some(formatting_agg.into())
199        } else {
200            None
201        }
202    }
203}