risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rs1use std::collections::BTreeMap;
16use std::vec;
17
18use itertools::Itertools;
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
21use risingwave_expr::aggregate::{AggType, PbAggKind};
22
23use super::prelude::{PlanRef, *};
24use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
25use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
26use crate::optimizer::plan_node::{
27 LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
28};
29use crate::optimizer::property::Order;
30use crate::utils::{Condition, IndexSet};
31
32pub struct MinMaxOnIndexRule {}
33
34impl Rule<Logical> for MinMaxOnIndexRule {
35 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
36 let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
37 if !logical_agg.group_key().is_empty() {
38 return None;
39 }
40 let calls = logical_agg.agg_calls();
41 if calls.is_empty() {
42 return None;
43 }
44 let first_call = calls.iter().exactly_one().ok()?;
45
46 if matches!(
47 first_call.agg_type,
48 AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
49 ) && !first_call.distinct
50 && first_call.filter.always_true()
51 && first_call.order_by.is_empty()
52 {
53 let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
54 let kind = &calls.first()?.agg_type;
55 if !logical_scan.predicate().always_true() {
56 return None;
57 }
58 let order = Order {
59 column_orders: vec![ColumnOrder::new(
60 calls.first()?.inputs.first()?.index(),
61 if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
62 OrderType::ascending()
63 } else {
64 OrderType::descending()
65 },
66 )],
67 };
68 if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
69 Some(p)
70 } else {
71 self.try_on_pk(logical_agg, logical_scan, &order)
72 }
73 } else {
74 None
75 }
76 }
77}
78
79impl MinMaxOnIndexRule {
80 pub fn create() -> BoxedRule {
81 Box::new(MinMaxOnIndexRule {})
82 }
83
84 fn try_on_index(
85 &self,
86 logical_agg: &LogicalAgg,
87 logical_scan: LogicalScan,
88 required_order: &Order,
89 ) -> Option<PlanRef> {
90 let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
91 for index in order_satisfied_index {
92 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
93 let non_null_filter = LogicalFilter::create_with_expr(
94 index_scan.into(),
95 FunctionCall::new_unchecked(
96 ExprType::IsNotNull,
97 vec![ExprImpl::InputRef(Box::new(InputRef::new(
98 0,
99 logical_agg.schema().fields[0].data_type.clone(),
100 )))],
101 DataType::Boolean,
102 )
103 .into(),
104 );
105
106 let topn =
107 LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
108
109 let formatting_agg = Agg::new(
110 vec![PlanAggCall {
111 agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
112 return_type: logical_agg.schema().fields[0].data_type.clone(),
113 inputs: vec![InputRef::new(
114 0,
115 logical_agg.schema().fields[0].data_type.clone(),
116 )],
117 order_by: vec![],
118 distinct: false,
119 filter: Condition {
120 conjunctions: vec![],
121 },
122 direct_args: vec![],
123 }],
124 IndexSet::empty(),
125 topn.into(),
126 );
127
128 return Some(formatting_agg.into());
129 }
130 }
131
132 None
133 }
134
135 fn try_on_pk(
136 &self,
137 logical_agg: &LogicalAgg,
138 logical_scan: LogicalScan,
139 order: &Order,
140 ) -> Option<PlanRef> {
141 let output_col_map = logical_scan
142 .output_col_idx()
143 .iter()
144 .cloned()
145 .enumerate()
146 .map(|(id, col)| (col, id))
147 .collect::<BTreeMap<_, _>>();
148 let unmatched_idx = output_col_map.len();
149 let primary_key = logical_scan.primary_key();
150 let primary_key_order = Order {
151 column_orders: primary_key
152 .iter()
153 .map(|o| {
154 ColumnOrder::new(
155 *output_col_map
156 .get(&o.column_index)
157 .unwrap_or(&unmatched_idx),
158 o.order_type,
159 )
160 })
161 .collect::<Vec<_>>(),
162 };
163 if primary_key_order.satisfies(order) {
164 let non_null_filter = LogicalFilter::create_with_expr(
165 logical_scan.into(),
166 FunctionCall::new_unchecked(
167 ExprType::IsNotNull,
168 vec![ExprImpl::InputRef(Box::new(InputRef::new(
169 0,
170 logical_agg.schema().fields[0].data_type.clone(),
171 )))],
172 DataType::Boolean,
173 )
174 .into(),
175 );
176
177 let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
178
179 let formatting_agg = Agg::new(
180 vec![PlanAggCall {
181 agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
182 return_type: logical_agg.schema().fields[0].data_type.clone(),
183 inputs: vec![InputRef::new(
184 0,
185 logical_agg.schema().fields[0].data_type.clone(),
186 )],
187 order_by: vec![],
188 distinct: false,
189 filter: Condition {
190 conjunctions: vec![],
191 },
192 direct_args: vec![],
193 }],
194 IndexSet::empty(),
195 topn.into(),
196 );
197
198 Some(formatting_agg.into())
199 } else {
200 None
201 }
202 }
203}