risingwave_frontend/optimizer/plan_node/
batch_group_topn.rs1use risingwave_pb::batch_plan::GroupTopNNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::ToLocalBatch;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{Order, RequiredDist};
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchGroupTopN {
31 pub base: PlanBase<Batch>,
32 core: generic::TopN<PlanRef>,
33}
34
35impl BatchGroupTopN {
36 pub fn new(core: generic::TopN<PlanRef>) -> Self {
37 assert!(!core.group_key.is_empty());
38 let base =
39 PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
40 BatchGroupTopN { base, core }
41 }
42
43 fn group_key(&self) -> &[usize] {
44 &self.core.group_key
45 }
46}
47
48impl_distill_by_unit!(BatchGroupTopN, core, "BatchGroupTopN");
49
50impl PlanTreeNodeUnary for BatchGroupTopN {
51 fn input(&self) -> PlanRef {
52 self.core.input.clone()
53 }
54
55 fn clone_with_input(&self, input: PlanRef) -> Self {
56 let mut core = self.core.clone();
57 core.input = input;
58 Self::new(core)
59 }
60}
61
62impl_plan_tree_node_for_unary! {BatchGroupTopN}
63
64impl ToDistributedBatch for BatchGroupTopN {
65 fn to_distributed(&self) -> Result<PlanRef> {
66 let input = self.input().to_distributed()?;
67 let input = RequiredDist::hash_shard(self.group_key())
68 .enforce_if_not_satisfies(input, &Order::any())?;
69 Ok(self.clone_with_input(input).into())
70 }
71}
72
73impl ToBatchPb for BatchGroupTopN {
74 fn to_batch_prost_body(&self) -> NodeBody {
75 let column_orders = self.core.order.to_protobuf();
76 NodeBody::GroupTopN(GroupTopNNode {
77 limit: self.core.limit_attr.limit(),
78 offset: self.core.offset,
79 column_orders,
80 group_key: self.group_key().iter().map(|c| *c as u32).collect(),
81 with_ties: self.core.limit_attr.with_ties(),
82 })
83 }
84}
85
86impl ToLocalBatch for BatchGroupTopN {
87 fn to_local(&self) -> Result<PlanRef> {
88 let input = self.input().to_local()?;
89 let input = RequiredDist::single().enforce_if_not_satisfies(input, &Order::any())?;
90 Ok(self.clone_with_input(input).into())
91 }
92}
93
94impl ExprRewritable for BatchGroupTopN {}
95
96impl ExprVisitable for BatchGroupTopN {}