risingwave_frontend/optimizer/plan_node/
batch_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::LimitNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22    ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
27use crate::optimizer::property::{Distribution, Order, RequiredDist};
28
29/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchLimit {
32    pub base: PlanBase<Batch>,
33    core: generic::Limit<PlanRef>,
34}
35
36const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024;
37
38impl BatchLimit {
39    pub fn new(core: generic::Limit<PlanRef>) -> Self {
40        let base = PlanBase::new_batch_with_core(
41            &core,
42            core.input.distribution().clone(),
43            core.input.order().clone(),
44        );
45        BatchLimit { base, core }
46    }
47
48    fn two_phase_limit(&self, new_input: PlanRef) -> Result<PlanRef> {
49        let new_limit = self.core.limit + self.core.offset;
50        let new_offset = 0;
51        let logical_partial_limit = generic::Limit::new(new_input.clone(), new_limit, new_offset);
52        let batch_partial_limit = Self::new(logical_partial_limit);
53        let any_order = Order::any();
54
55        let single_dist = RequiredDist::single();
56        let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) {
57            if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD {
58                BatchExchange::new_with_sequential(
59                    batch_partial_limit.into(),
60                    any_order,
61                    Distribution::Single,
62                )
63                .into()
64            } else {
65                BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single)
66                    .into()
67            }
68        } else {
69            // The input's distribution is singleton, so use one phase limit is enough.
70            return Ok(self.clone_with_input(new_input).into());
71        };
72
73        let batch_global_limit = self.clone_with_input(ensure_single_dist);
74        Ok(batch_global_limit.into())
75    }
76
77    pub fn limit(&self) -> u64 {
78        self.core.limit
79    }
80
81    pub fn offset(&self) -> u64 {
82        self.core.offset
83    }
84}
85
86impl PlanTreeNodeUnary<Batch> for BatchLimit {
87    fn input(&self) -> PlanRef {
88        self.core.input.clone()
89    }
90
91    fn clone_with_input(&self, input: PlanRef) -> Self {
92        let mut core = self.core.clone();
93        core.input = input;
94        Self::new(core)
95    }
96}
97impl_plan_tree_node_for_unary! { Batch, BatchLimit}
98impl_distill_by_unit!(BatchLimit, core, "BatchLimit");
99
100impl ToDistributedBatch for BatchLimit {
101    fn to_distributed(&self) -> Result<PlanRef> {
102        self.two_phase_limit(self.input().to_distributed()?)
103    }
104}
105
106impl ToBatchPb for BatchLimit {
107    fn to_batch_prost_body(&self) -> NodeBody {
108        NodeBody::Limit(LimitNode {
109            limit: self.core.limit,
110            offset: self.core.offset,
111        })
112    }
113}
114
115impl ToLocalBatch for BatchLimit {
116    fn to_local(&self) -> Result<PlanRef> {
117        self.two_phase_limit(self.input().to_local()?)
118    }
119}
120
121impl ExprRewritable<Batch> for BatchLimit {}
122
123impl ExprVisitable for BatchLimit {}