risingwave_frontend/optimizer/plan_node/
batch_topn.rs1use risingwave_pb::batch_plan::TopNNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::generic::TopNLimit;
20use super::utils::impl_distill_by_unit;
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
23 ToDistributedBatch, generic,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::{BatchLimit, ToLocalBatch};
28use crate::optimizer::property::{Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchTopN {
33 pub base: PlanBase<Batch>,
34 core: generic::TopN<PlanRef>,
35}
36
37impl BatchTopN {
38 pub fn new(core: generic::TopN<PlanRef>) -> Self {
39 assert!(core.group_key.is_empty());
40 let base = PlanBase::new_batch_with_core(
41 &core,
42 core.input.distribution().clone(),
43 core.order.clone(),
45 );
46 BatchTopN { base, core }
47 }
48
49 fn two_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
50 let new_limit = TopNLimit::new(
51 self.core.limit_attr.limit() + self.core.offset,
52 self.core.limit_attr.with_ties(),
53 );
54 let new_offset = 0;
55 let partial_input: PlanRef = if input.order().satisfies(&self.core.order)
56 && !self.core.limit_attr.with_ties()
57 {
58 let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset);
59 let batch_partial_limit = BatchLimit::new(logical_partial_limit);
60 batch_partial_limit.into()
61 } else {
62 let logical_partial_topn =
63 generic::TopN::without_group(input, new_limit, new_offset, self.core.order.clone());
64 let batch_partial_topn = Self::new(logical_partial_topn);
65 batch_partial_topn.into()
66 };
67
68 let ensure_single_dist =
69 RequiredDist::single().batch_enforce_if_not_satisfies(partial_input, &Order::any())?;
70
71 let batch_global_topn = self.clone_with_input(ensure_single_dist);
72 Ok(batch_global_topn.into())
73 }
74
75 fn one_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
76 if input.order().satisfies(&self.core.order) && !self.core.limit_attr.with_ties() {
77 let logical_limit =
78 generic::Limit::new(input, self.core.limit_attr.limit(), self.core.offset);
79 let batch_limit = BatchLimit::new(logical_limit);
80 Ok(batch_limit.into())
81 } else {
82 Ok(self.clone_with_input(input).into())
83 }
84 }
85}
86
87impl_distill_by_unit!(BatchTopN, core, "BatchTopN");
88
89impl PlanTreeNodeUnary<Batch> for BatchTopN {
90 fn input(&self) -> PlanRef {
91 self.core.input.clone()
92 }
93
94 fn clone_with_input(&self, input: PlanRef) -> Self {
95 let mut core = self.core.clone();
96 core.input = input;
97 Self::new(core)
98 }
99}
100
101impl_plan_tree_node_for_unary! { Batch, BatchTopN}
102
103impl ToDistributedBatch for BatchTopN {
104 fn to_distributed(&self) -> Result<PlanRef> {
105 let input = self.input().to_distributed()?;
106 let single_dist = RequiredDist::single();
107 if input.distribution().satisfies(&single_dist) {
108 self.one_phase_topn(input)
109 } else {
110 self.two_phase_topn(input)
111 }
112 }
113}
114
115impl ToBatchPb for BatchTopN {
116 fn to_batch_prost_body(&self) -> NodeBody {
117 let column_orders = self.core.order.to_protobuf();
118 NodeBody::TopN(TopNNode {
119 limit: self.core.limit_attr.limit(),
120 offset: self.core.offset,
121 column_orders,
122 with_ties: self.core.limit_attr.with_ties(),
123 })
124 }
125}
126
127impl ToLocalBatch for BatchTopN {
128 fn to_local(&self) -> Result<PlanRef> {
129 let input = self.input().to_local()?;
130 let single_dist = RequiredDist::single();
131 if input.distribution().satisfies(&single_dist) {
132 self.one_phase_topn(input)
133 } else {
134 self.two_phase_topn(input)
135 }
136 }
137}
138
139impl ExprRewritable<Batch> for BatchTopN {}
140
141impl ExprVisitable for BatchTopN {}