Skip to main content

risingwave_frontend/optimizer/rule/batch/
batch_push_limit_to_scan_rule.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
17
18use super::prelude::*;
19use crate::optimizer::plan_node::generic::PhysicalPlanRef;
20use crate::optimizer::plan_node::{BatchIcebergScan, BatchLimit, BatchSeqScan, PlanTreeNodeUnary};
21
22pub struct BatchPushLimitToScanRule {}
23
24// Pushing a limit into an Iceberg scan collapses file tasks into one split.
25// Keep large scans parallel and let the upper BatchLimit enforce exact results.
26const ICEBERG_LIMIT_PUSHDOWN_THRESHOLD: u64 = 1_000_000;
27
28impl Rule<Batch> for BatchPushLimitToScanRule {
29    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
30        let limit: &BatchLimit = plan.as_batch_limit()?;
31        let limit_input = limit.input();
32
33        let pushed_limit = limit.limit().checked_add(limit.offset())?;
34        if let Some(scan) = limit_input.as_batch_seq_scan() {
35            if scan.limit().is_some() {
36                return None;
37            }
38            let new_scan = BatchSeqScan::new_with_dist(
39                scan.core().clone(),
40                scan.base.distribution().clone(),
41                scan.scan_ranges().iter().cloned().collect_vec(),
42                Some(pushed_limit),
43            );
44            return Some(limit.clone_with_input(new_scan.into()).into());
45        }
46
47        let scan: &BatchIcebergScan = limit_input.as_batch_iceberg_scan()?;
48        if scan.limit().is_some() {
49            return None;
50        }
51        if scan.iceberg_scan_type() != IcebergScanType::DataScan {
52            return None;
53        }
54        if pushed_limit > ICEBERG_LIMIT_PUSHDOWN_THRESHOLD {
55            return None;
56        }
57        let new_scan = scan.clone_with_limit(Some(pushed_limit));
58        Some(limit.clone_with_input(new_scan.into()).into())
59    }
60}
61
62impl BatchPushLimitToScanRule {
63    pub fn create() -> BoxedRule {
64        Box::new(BatchPushLimitToScanRule {})
65    }
66}