risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rs1use std::collections::BTreeMap;
21use std::vec;
22
23use itertools::Itertools;
24use risingwave_common::types::DataType;
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_expr::aggregate::{AggType, PbAggKind};
27
28use super::prelude::{PlanRef, *};
29use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
30use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
31use crate::optimizer::plan_node::{
32 LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
33};
34use crate::optimizer::property::Order;
35use crate::utils::{Condition, IndexSet};
36
37pub struct MinMaxOnIndexRule {}
38
39impl Rule<Logical> for MinMaxOnIndexRule {
40 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
41 let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
42 if !logical_agg.group_key().is_empty() {
43 return None;
44 }
45 let calls = logical_agg.agg_calls();
46 if calls.is_empty() {
47 return None;
48 }
49 let first_call = calls.iter().exactly_one().ok()?;
50
51 if matches!(
52 first_call.agg_type,
53 AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
54 ) && !first_call.distinct
55 && first_call.filter.always_true()
56 && first_call.order_by.is_empty()
57 {
58 let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
59 let kind = &calls.first()?.agg_type;
60 if !logical_scan.predicate().always_true() {
61 return None;
62 }
63 let order = Order {
64 column_orders: vec![ColumnOrder::new(
65 calls.first()?.inputs.first()?.index(),
66 if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
67 OrderType::ascending()
68 } else {
69 OrderType::descending()
70 },
71 )],
72 };
73 if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
74 Some(p)
75 } else {
76 self.try_on_pk(logical_agg, logical_scan, &order)
77 }
78 } else {
79 None
80 }
81 }
82}
83
84impl MinMaxOnIndexRule {
85 pub fn create() -> BoxedRule {
86 Box::new(MinMaxOnIndexRule {})
87 }
88
89 fn try_on_index(
90 &self,
91 logical_agg: &LogicalAgg,
92 logical_scan: LogicalScan,
93 required_order: &Order,
94 ) -> Option<PlanRef> {
95 let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
96 for index in order_satisfied_index {
97 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
98 let non_null_filter = LogicalFilter::create_with_expr(
99 index_scan.into(),
100 FunctionCall::new_unchecked(
101 ExprType::IsNotNull,
102 vec![ExprImpl::InputRef(Box::new(InputRef::new(
103 0,
104 logical_agg.schema().fields[0].data_type.clone(),
105 )))],
106 DataType::Boolean,
107 )
108 .into(),
109 );
110
111 let topn =
112 LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
113
114 let formatting_agg = Agg::new(
115 vec![PlanAggCall {
116 agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
117 return_type: logical_agg.schema().fields[0].data_type.clone(),
118 inputs: vec![InputRef::new(
119 0,
120 logical_agg.schema().fields[0].data_type.clone(),
121 )],
122 order_by: vec![],
123 distinct: false,
124 filter: Condition {
125 conjunctions: vec![],
126 },
127 direct_args: vec![],
128 }],
129 IndexSet::empty(),
130 topn.into(),
131 );
132
133 return Some(formatting_agg.into());
134 }
135 }
136
137 None
138 }
139
140 fn try_on_pk(
141 &self,
142 logical_agg: &LogicalAgg,
143 logical_scan: LogicalScan,
144 order: &Order,
145 ) -> Option<PlanRef> {
146 let output_col_map = logical_scan
147 .output_col_idx()
148 .iter()
149 .cloned()
150 .enumerate()
151 .map(|(id, col)| (col, id))
152 .collect::<BTreeMap<_, _>>();
153 let unmatched_idx = output_col_map.len();
154 let primary_key = logical_scan.primary_key();
155 let primary_key_order = Order {
156 column_orders: primary_key
157 .iter()
158 .map(|o| {
159 ColumnOrder::new(
160 *output_col_map
161 .get(&o.column_index)
162 .unwrap_or(&unmatched_idx),
163 o.order_type,
164 )
165 })
166 .collect::<Vec<_>>(),
167 };
168 if primary_key_order.satisfies(order) {
169 let non_null_filter = LogicalFilter::create_with_expr(
170 logical_scan.into(),
171 FunctionCall::new_unchecked(
172 ExprType::IsNotNull,
173 vec![ExprImpl::InputRef(Box::new(InputRef::new(
174 0,
175 logical_agg.schema().fields[0].data_type.clone(),
176 )))],
177 DataType::Boolean,
178 )
179 .into(),
180 );
181
182 let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
183
184 let formatting_agg = Agg::new(
185 vec![PlanAggCall {
186 agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
187 return_type: logical_agg.schema().fields[0].data_type.clone(),
188 inputs: vec![InputRef::new(
189 0,
190 logical_agg.schema().fields[0].data_type.clone(),
191 )],
192 order_by: vec![],
193 distinct: false,
194 filter: Condition {
195 conjunctions: vec![],
196 },
197 direct_args: vec![],
198 }],
199 IndexSet::empty(),
200 topn.into(),
201 );
202
203 Some(formatting_agg.into())
204 } else {
205 None
206 }
207 }
208}