risingwave_frontend/optimizer/plan_node/
batch_limit.rs1use risingwave_pb::batch_plan::LimitNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
26use crate::optimizer::property::{Distribution, Order, RequiredDist};
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchLimit {
31 pub base: PlanBase<Batch>,
32 core: generic::Limit<PlanRef>,
33}
34
35const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024;
36
37impl BatchLimit {
38 pub fn new(core: generic::Limit<PlanRef>) -> Self {
39 let base = PlanBase::new_batch_with_core(
40 &core,
41 core.input.distribution().clone(),
42 core.input.order().clone(),
43 );
44 BatchLimit { base, core }
45 }
46
47 fn two_phase_limit(&self, new_input: PlanRef) -> Result<PlanRef> {
48 let new_limit = self.core.limit + self.core.offset;
49 let new_offset = 0;
50 let logical_partial_limit = generic::Limit::new(new_input.clone(), new_limit, new_offset);
51 let batch_partial_limit = Self::new(logical_partial_limit);
52 let any_order = Order::any();
53
54 let single_dist = RequiredDist::single();
55 let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) {
56 if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD {
57 BatchExchange::new_with_sequential(
58 batch_partial_limit.into(),
59 any_order,
60 Distribution::Single,
61 )
62 .into()
63 } else {
64 BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single)
65 .into()
66 }
67 } else {
68 return Ok(self.clone_with_input(new_input).into());
70 };
71
72 let batch_global_limit = self.clone_with_input(ensure_single_dist);
73 Ok(batch_global_limit.into())
74 }
75
76 pub fn limit(&self) -> u64 {
77 self.core.limit
78 }
79
80 pub fn offset(&self) -> u64 {
81 self.core.offset
82 }
83}
84
85impl PlanTreeNodeUnary for BatchLimit {
86 fn input(&self) -> PlanRef {
87 self.core.input.clone()
88 }
89
90 fn clone_with_input(&self, input: PlanRef) -> Self {
91 let mut core = self.core.clone();
92 core.input = input;
93 Self::new(core)
94 }
95}
96impl_plan_tree_node_for_unary! {BatchLimit}
97impl_distill_by_unit!(BatchLimit, core, "BatchLimit");
98
99impl ToDistributedBatch for BatchLimit {
100 fn to_distributed(&self) -> Result<PlanRef> {
101 self.two_phase_limit(self.input().to_distributed()?)
102 }
103}
104
105impl ToBatchPb for BatchLimit {
106 fn to_batch_prost_body(&self) -> NodeBody {
107 NodeBody::Limit(LimitNode {
108 limit: self.core.limit,
109 offset: self.core.offset,
110 })
111 }
112}
113
114impl ToLocalBatch for BatchLimit {
115 fn to_local(&self) -> Result<PlanRef> {
116 self.two_phase_limit(self.input().to_local()?)
117 }
118}
119
120impl ExprRewritable for BatchLimit {}
121
122impl ExprVisitable for BatchLimit {}