risingwave_frontend/optimizer/plan_node/
batch_topn.rs1use risingwave_pb::batch_plan::TopNNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::generic::TopNLimit;
20use super::utils::impl_distill_by_unit;
21use super::{
22 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{BatchLimit, ToLocalBatch};
27use crate::optimizer::property::{Order, RequiredDist};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchTopN {
32 pub base: PlanBase<Batch>,
33 core: generic::TopN<PlanRef>,
34}
35
36impl BatchTopN {
37 pub fn new(core: generic::TopN<PlanRef>) -> Self {
38 assert!(core.group_key.is_empty());
39 let base = PlanBase::new_batch_with_core(
40 &core,
41 core.input.distribution().clone(),
42 core.order.clone(),
44 );
45 BatchTopN { base, core }
46 }
47
48 fn two_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
49 let new_limit = TopNLimit::new(
50 self.core.limit_attr.limit() + self.core.offset,
51 self.core.limit_attr.with_ties(),
52 );
53 let new_offset = 0;
54 let partial_input: PlanRef = if input.order().satisfies(&self.core.order)
55 && !self.core.limit_attr.with_ties()
56 {
57 let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset);
58 let batch_partial_limit = BatchLimit::new(logical_partial_limit);
59 batch_partial_limit.into()
60 } else {
61 let logical_partial_topn =
62 generic::TopN::without_group(input, new_limit, new_offset, self.core.order.clone());
63 let batch_partial_topn = Self::new(logical_partial_topn);
64 batch_partial_topn.into()
65 };
66
67 let ensure_single_dist =
68 RequiredDist::single().enforce_if_not_satisfies(partial_input, &Order::any())?;
69
70 let batch_global_topn = self.clone_with_input(ensure_single_dist);
71 Ok(batch_global_topn.into())
72 }
73
74 fn one_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
75 if input.order().satisfies(&self.core.order) && !self.core.limit_attr.with_ties() {
76 let logical_limit =
77 generic::Limit::new(input, self.core.limit_attr.limit(), self.core.offset);
78 let batch_limit = BatchLimit::new(logical_limit);
79 Ok(batch_limit.into())
80 } else {
81 Ok(self.clone_with_input(input).into())
82 }
83 }
84}
85
86impl_distill_by_unit!(BatchTopN, core, "BatchTopN");
87
88impl PlanTreeNodeUnary for BatchTopN {
89 fn input(&self) -> PlanRef {
90 self.core.input.clone()
91 }
92
93 fn clone_with_input(&self, input: PlanRef) -> Self {
94 let mut core = self.core.clone();
95 core.input = input;
96 Self::new(core)
97 }
98}
99
100impl_plan_tree_node_for_unary! {BatchTopN}
101
102impl ToDistributedBatch for BatchTopN {
103 fn to_distributed(&self) -> Result<PlanRef> {
104 let input = self.input().to_distributed()?;
105 let single_dist = RequiredDist::single();
106 if input.distribution().satisfies(&single_dist) {
107 self.one_phase_topn(input)
108 } else {
109 self.two_phase_topn(input)
110 }
111 }
112}
113
114impl ToBatchPb for BatchTopN {
115 fn to_batch_prost_body(&self) -> NodeBody {
116 let column_orders = self.core.order.to_protobuf();
117 NodeBody::TopN(TopNNode {
118 limit: self.core.limit_attr.limit(),
119 offset: self.core.offset,
120 column_orders,
121 with_ties: self.core.limit_attr.with_ties(),
122 })
123 }
124}
125
126impl ToLocalBatch for BatchTopN {
127 fn to_local(&self) -> Result<PlanRef> {
128 let input = self.input().to_local()?;
129 let single_dist = RequiredDist::single();
130 if input.distribution().satisfies(&single_dist) {
131 self.one_phase_topn(input)
132 } else {
133 self.two_phase_topn(input)
134 }
135 }
136}
137
138impl ExprRewritable for BatchTopN {}
139
140impl ExprVisitable for BatchTopN {}