risingwave_frontend/optimizer/rule/
iceberg_count_star_rule.rs1use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
16
17use super::prelude::*;
18use crate::expr::ExprImpl;
19use crate::optimizer::plan_node::generic::GenericPlanRef;
20use crate::optimizer::plan_node::{LogicalValues, PlanAggCall, PlanTreeNodeUnary};
21
22pub struct IcebergCountStarRule;
23
24impl Rule<Logical> for IcebergCountStarRule {
25 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
26 let agg = plan.as_logical_agg()?;
27 let core = agg.core();
28 if core.group_key.is_empty()
29 && core.grouping_sets.is_empty()
30 && core.agg_calls.len() == 1
31 && core.agg_calls[0] == PlanAggCall::count_star()
32 {
33 let input = agg.input();
34 let iceberg_scan = input.as_logical_iceberg_scan()?;
35 if iceberg_scan.iceberg_scan_type() != IcebergScanType::DataScan {
36 return None;
37 }
38 if iceberg_scan.task.predicate().is_some() {
39 return None;
40 }
41 let count: i64 = iceberg_scan
42 .task
43 .tasks()
44 .iter()
45 .map(|task| task.record_count)
46 .sum::<Option<u64>>()?
47 .try_into()
48 .ok()?;
49 return Some(
50 LogicalValues::new(
51 vec![vec![ExprImpl::literal_bigint(count)]],
52 agg.schema().clone(),
53 agg.ctx(),
54 )
55 .into(),
56 );
57 }
58 None
59 }
60}
61
62impl IcebergCountStarRule {
63 pub fn create() -> BoxedRule {
64 Box::new(Self)
65 }
66}