risingwave_frontend/optimizer/rule/
min_max_on_index_rule.rsuse std::collections::BTreeMap;
use std::vec;
use itertools::Itertools;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::aggregate::{AggType, PbAggKind};
use super::{BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{
LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
};
use crate::optimizer::property::Order;
use crate::optimizer::PlanRef;
use crate::utils::{Condition, IndexSet};
pub struct MinMaxOnIndexRule {}
impl Rule for MinMaxOnIndexRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
if !logical_agg.group_key().is_empty() {
return None;
}
let calls = logical_agg.agg_calls();
if calls.is_empty() {
return None;
}
let first_call = calls.iter().exactly_one().ok()?;
if matches!(
first_call.agg_type,
AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
) && !first_call.distinct
&& first_call.filter.always_true()
&& first_call.order_by.is_empty()
{
let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
let kind = &calls.first()?.agg_type;
if !logical_scan.predicate().always_true() {
return None;
}
let order = Order {
column_orders: vec![ColumnOrder::new(
calls.first()?.inputs.first()?.index(),
if matches!(kind, AggType::Builtin(PbAggKind::Min)) {
OrderType::ascending()
} else {
OrderType::descending()
},
)],
};
if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
Some(p)
} else {
self.try_on_pk(logical_agg, logical_scan, &order)
}
} else {
None
}
}
}
impl MinMaxOnIndexRule {
pub fn create() -> BoxedRule {
Box::new(MinMaxOnIndexRule {})
}
fn try_on_index(
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
required_order: &Order,
) -> Option<PlanRef> {
let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
for index in order_satisfied_index {
if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
let non_null_filter = LogicalFilter::create_with_expr(
index_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);
let topn =
LogicalTopN::new(non_null_filter, 1, 0, false, required_order.clone(), vec![]);
let formatting_agg = Agg::new(
vec![PlanAggCall {
agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
direct_args: vec![],
}],
IndexSet::empty(),
topn.into(),
);
return Some(formatting_agg.into());
}
}
None
}
fn try_on_pk(
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
) -> Option<PlanRef> {
let output_col_map = logical_scan
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let unmatched_idx = output_col_map.len();
let primary_key = logical_scan.primary_key();
let primary_key_order = Order {
column_orders: primary_key
.iter()
.map(|o| {
ColumnOrder::new(
*output_col_map
.get(&o.column_index)
.unwrap_or(&unmatched_idx),
o.order_type,
)
})
.collect::<Vec<_>>(),
};
if primary_key_order.satisfies(order) {
let non_null_filter = LogicalFilter::create_with_expr(
logical_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);
let topn = LogicalTopN::new(non_null_filter, 1, 0, false, order.clone(), vec![]);
let formatting_agg = Agg::new(
vec![PlanAggCall {
agg_type: logical_agg.agg_calls().first()?.agg_type.clone(),
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
direct_args: vec![],
}],
IndexSet::empty(),
topn.into(),
);
Some(formatting_agg.into())
} else {
None
}
}
}