risingwave_frontend/optimizer/plan_node/
batch_limit.rs1use risingwave_pb::batch_plan::LimitNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22 ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
27use crate::optimizer::property::{Distribution, Order, RequiredDist};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchLimit {
32 pub base: PlanBase<Batch>,
33 core: generic::Limit<PlanRef>,
34}
35
36const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024;
37
38impl BatchLimit {
39 pub fn new(core: generic::Limit<PlanRef>) -> Self {
40 let base = PlanBase::new_batch_with_core(
41 &core,
42 core.input.distribution().clone(),
43 core.input.order().clone(),
44 );
45 BatchLimit { base, core }
46 }
47
48 fn two_phase_limit(&self, new_input: PlanRef) -> Result<PlanRef> {
49 let new_limit = self.core.limit + self.core.offset;
50 let new_offset = 0;
51 let logical_partial_limit = generic::Limit::new(new_input.clone(), new_limit, new_offset);
52 let batch_partial_limit = Self::new(logical_partial_limit);
53 let any_order = Order::any();
54
55 let single_dist = RequiredDist::single();
56 let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) {
57 if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD {
58 BatchExchange::new_with_sequential(
59 batch_partial_limit.into(),
60 any_order,
61 Distribution::Single,
62 )
63 .into()
64 } else {
65 BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single)
66 .into()
67 }
68 } else {
69 return Ok(self.clone_with_input(new_input).into());
71 };
72
73 let batch_global_limit = self.clone_with_input(ensure_single_dist);
74 Ok(batch_global_limit.into())
75 }
76
77 pub fn limit(&self) -> u64 {
78 self.core.limit
79 }
80
81 pub fn offset(&self) -> u64 {
82 self.core.offset
83 }
84}
85
86impl PlanTreeNodeUnary<Batch> for BatchLimit {
87 fn input(&self) -> PlanRef {
88 self.core.input.clone()
89 }
90
91 fn clone_with_input(&self, input: PlanRef) -> Self {
92 let mut core = self.core.clone();
93 core.input = input;
94 Self::new(core)
95 }
96}
97impl_plan_tree_node_for_unary! { Batch, BatchLimit}
98impl_distill_by_unit!(BatchLimit, core, "BatchLimit");
99
100impl ToDistributedBatch for BatchLimit {
101 fn to_distributed(&self) -> Result<PlanRef> {
102 self.two_phase_limit(self.input().to_distributed()?)
103 }
104}
105
106impl ToBatchPb for BatchLimit {
107 fn to_batch_prost_body(&self) -> NodeBody {
108 NodeBody::Limit(LimitNode {
109 limit: self.core.limit,
110 offset: self.core.offset,
111 })
112 }
113}
114
115impl ToLocalBatch for BatchLimit {
116 fn to_local(&self) -> Result<PlanRef> {
117 self.two_phase_limit(self.input().to_local()?)
118 }
119}
120
121impl ExprRewritable<Batch> for BatchLimit {}
122
123impl ExprVisitable for BatchLimit {}