risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rs1use std::collections::BTreeMap;
21use std::vec;
22
23use itertools::Itertools;
24use risingwave_common::types::DataType;
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_expr::aggregate::{AggType, PbAggKind};
27
28use super::{BoxedRule, Rule};
29use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
30use crate::optimizer::PlanRef;
31use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
32use crate::optimizer::plan_node::{
33 LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
34};
35use crate::optimizer::property::Order;
36use crate::utils::{Condition, IndexSet};
37
38pub struct MinMaxOnIndexRule {}
39
40impl Rule for MinMaxOnIndexRule {
41 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
42 let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
43 if !logical_agg.group_key().is_empty() {
44 return None;
45 }
46 let calls = logical_agg.agg_calls();
47 if calls.is_empty() {
48 return None;
49 }
50 let first_call = calls.iter().exactly_one().ok()?;
51
52 if matches!(
53 first_call.agg_type,
54 AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
55 ) && !first_call.distinct
56 && first_call.filter.always_true()
57 && first_call.order_by.is_empty()
58 {
59 let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
60 let kind = &calls.first()?.agg_type;
61 if !logical_scan.predicate().always_true() {
62 return None;
63 }
64 let order = Order {
65 column_orders: vec![ColumnOrder::new(
66 calls.first()?.inputs.first()?.index(),
67 if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
68 OrderType::ascending()
69 } else {
70 OrderType::descending()
71 },
72 )],
73 };
74 if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
75 Some(p)
76 } else {
77 self.try_on_pk(logical_agg, logical_scan, &order)
78 }
79 } else {
80 None
81 }
82 }
83}
84
85impl MinMaxOnIndexRule {
86 pub fn create() -> BoxedRule {
87 Box::new(MinMaxOnIndexRule {})
88 }
89
90 fn try_on_index(
91 &self,
92 logical_agg: &LogicalAgg,
93 logical_scan: LogicalScan,
94 required_order: &Order,
95 ) -> Option<PlanRef> {
96 let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
97 for index in order_satisfied_index {
98 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
99 let non_null_filter = LogicalFilter::create_with_expr(
100 index_scan.into(),
101 FunctionCall::new_unchecked(
102 ExprType::IsNotNull,
103 vec![ExprImpl::InputRef(Box::new(InputRef::new(
104 0,
105 logical_agg.schema().fields[0].data_type.clone(),
106 )))],
107 DataType::Boolean,
108 )
109 .into(),
110 );
111
112 let topn =
113 LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
114
115 let formatting_agg = Agg::new(
116 vec![PlanAggCall {
117 agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
118 return_type: logical_agg.schema().fields[0].data_type.clone(),
119 inputs: vec![InputRef::new(
120 0,
121 logical_agg.schema().fields[0].data_type.clone(),
122 )],
123 order_by: vec![],
124 distinct: false,
125 filter: Condition {
126 conjunctions: vec![],
127 },
128 direct_args: vec![],
129 }],
130 IndexSet::empty(),
131 topn.into(),
132 );
133
134 return Some(formatting_agg.into());
135 }
136 }
137
138 None
139 }
140
141 fn try_on_pk(
142 &self,
143 logical_agg: &LogicalAgg,
144 logical_scan: LogicalScan,
145 order: &Order,
146 ) -> Option<PlanRef> {
147 let output_col_map = logical_scan
148 .output_col_idx()
149 .iter()
150 .cloned()
151 .enumerate()
152 .map(|(id, col)| (col, id))
153 .collect::<BTreeMap<_, _>>();
154 let unmatched_idx = output_col_map.len();
155 let primary_key = logical_scan.primary_key();
156 let primary_key_order = Order {
157 column_orders: primary_key
158 .iter()
159 .map(|o| {
160 ColumnOrder::new(
161 *output_col_map
162 .get(&o.column_index)
163 .unwrap_or(&unmatched_idx),
164 o.order_type,
165 )
166 })
167 .collect::<Vec<_>>(),
168 };
169 if primary_key_order.satisfies(order) {
170 let non_null_filter = LogicalFilter::create_with_expr(
171 logical_scan.into(),
172 FunctionCall::new_unchecked(
173 ExprType::IsNotNull,
174 vec![ExprImpl::InputRef(Box::new(InputRef::new(
175 0,
176 logical_agg.schema().fields[0].data_type.clone(),
177 )))],
178 DataType::Boolean,
179 )
180 .into(),
181 );
182
183 let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
184
185 let formatting_agg = Agg::new(
186 vec![PlanAggCall {
187 agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
188 return_type: logical_agg.schema().fields[0].data_type.clone(),
189 inputs: vec![InputRef::new(
190 0,
191 logical_agg.schema().fields[0].data_type.clone(),
192 )],
193 order_by: vec![],
194 distinct: false,
195 filter: Condition {
196 conjunctions: vec![],
197 },
198 direct_args: vec![],
199 }],
200 IndexSet::empty(),
201 topn.into(),
202 );
203
204 Some(formatting_agg.into())
205 } else {
206 None
207 }
208 }
209}