risingwave_frontend/optimizer/plan_node/
batch_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::LimitNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
26use crate::optimizer::property::{Distribution, Order, RequiredDist};
27
28/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchLimit {
31    pub base: PlanBase<Batch>,
32    core: generic::Limit<PlanRef>,
33}
34
35const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024;
36
37impl BatchLimit {
38    pub fn new(core: generic::Limit<PlanRef>) -> Self {
39        let base = PlanBase::new_batch_with_core(
40            &core,
41            core.input.distribution().clone(),
42            core.input.order().clone(),
43        );
44        BatchLimit { base, core }
45    }
46
47    fn two_phase_limit(&self, new_input: PlanRef) -> Result<PlanRef> {
48        let new_limit = self.core.limit + self.core.offset;
49        let new_offset = 0;
50        let logical_partial_limit = generic::Limit::new(new_input.clone(), new_limit, new_offset);
51        let batch_partial_limit = Self::new(logical_partial_limit);
52        let any_order = Order::any();
53
54        let single_dist = RequiredDist::single();
55        let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) {
56            if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD {
57                BatchExchange::new_with_sequential(
58                    batch_partial_limit.into(),
59                    any_order,
60                    Distribution::Single,
61                )
62                .into()
63            } else {
64                BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single)
65                    .into()
66            }
67        } else {
68            // The input's distribution is singleton, so use one phase limit is enough.
69            return Ok(self.clone_with_input(new_input).into());
70        };
71
72        let batch_global_limit = self.clone_with_input(ensure_single_dist);
73        Ok(batch_global_limit.into())
74    }
75
76    pub fn limit(&self) -> u64 {
77        self.core.limit
78    }
79
80    pub fn offset(&self) -> u64 {
81        self.core.offset
82    }
83}
84
85impl PlanTreeNodeUnary for BatchLimit {
86    fn input(&self) -> PlanRef {
87        self.core.input.clone()
88    }
89
90    fn clone_with_input(&self, input: PlanRef) -> Self {
91        let mut core = self.core.clone();
92        core.input = input;
93        Self::new(core)
94    }
95}
96impl_plan_tree_node_for_unary! {BatchLimit}
97impl_distill_by_unit!(BatchLimit, core, "BatchLimit");
98
99impl ToDistributedBatch for BatchLimit {
100    fn to_distributed(&self) -> Result<PlanRef> {
101        self.two_phase_limit(self.input().to_distributed()?)
102    }
103}
104
105impl ToBatchPb for BatchLimit {
106    fn to_batch_prost_body(&self) -> NodeBody {
107        NodeBody::Limit(LimitNode {
108            limit: self.core.limit,
109            offset: self.core.offset,
110        })
111    }
112}
113
114impl ToLocalBatch for BatchLimit {
115    fn to_local(&self) -> Result<PlanRef> {
116        self.two_phase_limit(self.input().to_local()?)
117    }
118}
119
120impl ExprRewritable for BatchLimit {}
121
122impl ExprVisitable for BatchLimit {}