risingwave_frontend/optimizer/rule/batch/
batch_push_limit_to_scan_rule.rs1use itertools::Itertools;
16use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
17
18use super::prelude::*;
19use crate::optimizer::plan_node::generic::PhysicalPlanRef;
20use crate::optimizer::plan_node::{BatchIcebergScan, BatchLimit, BatchSeqScan, PlanTreeNodeUnary};
21
22pub struct BatchPushLimitToScanRule {}
23
24const ICEBERG_LIMIT_PUSHDOWN_THRESHOLD: u64 = 1_000_000;
27
28impl Rule<Batch> for BatchPushLimitToScanRule {
29 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
30 let limit: &BatchLimit = plan.as_batch_limit()?;
31 let limit_input = limit.input();
32
33 let pushed_limit = limit.limit().checked_add(limit.offset())?;
34 if let Some(scan) = limit_input.as_batch_seq_scan() {
35 if scan.limit().is_some() {
36 return None;
37 }
38 let new_scan = BatchSeqScan::new_with_dist(
39 scan.core().clone(),
40 scan.base.distribution().clone(),
41 scan.scan_ranges().iter().cloned().collect_vec(),
42 Some(pushed_limit),
43 );
44 return Some(limit.clone_with_input(new_scan.into()).into());
45 }
46
47 let scan: &BatchIcebergScan = limit_input.as_batch_iceberg_scan()?;
48 if scan.limit().is_some() {
49 return None;
50 }
51 if scan.iceberg_scan_type() != IcebergScanType::DataScan {
52 return None;
53 }
54 if pushed_limit > ICEBERG_LIMIT_PUSHDOWN_THRESHOLD {
55 return None;
56 }
57 let new_scan = scan.clone_with_limit(Some(pushed_limit));
58 Some(limit.clone_with_input(new_scan.into()).into())
59 }
60}
61
62impl BatchPushLimitToScanRule {
63 pub fn create() -> BoxedRule {
64 Box::new(BatchPushLimitToScanRule {})
65 }
66}