risingwave_frontend/optimizer/plan_node/
batch_group_topn.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::GroupTopNNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::ToLocalBatch;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{Order, RequiredDist};
27
28/// `BatchGroupTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchGroupTopN {
31    pub base: PlanBase<Batch>,
32    core: generic::TopN<PlanRef>,
33}
34
35impl BatchGroupTopN {
36    pub fn new(core: generic::TopN<PlanRef>) -> Self {
37        assert!(!core.group_key.is_empty());
38        let base =
39            PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
40        BatchGroupTopN { base, core }
41    }
42
43    fn group_key(&self) -> &[usize] {
44        &self.core.group_key
45    }
46}
47
48impl_distill_by_unit!(BatchGroupTopN, core, "BatchGroupTopN");
49
50impl PlanTreeNodeUnary for BatchGroupTopN {
51    fn input(&self) -> PlanRef {
52        self.core.input.clone()
53    }
54
55    fn clone_with_input(&self, input: PlanRef) -> Self {
56        let mut core = self.core.clone();
57        core.input = input;
58        Self::new(core)
59    }
60}
61
62impl_plan_tree_node_for_unary! {BatchGroupTopN}
63
64impl ToDistributedBatch for BatchGroupTopN {
65    fn to_distributed(&self) -> Result<PlanRef> {
66        let input = self.input().to_distributed()?;
67        let input = RequiredDist::hash_shard(self.group_key())
68            .enforce_if_not_satisfies(input, &Order::any())?;
69        Ok(self.clone_with_input(input).into())
70    }
71}
72
73impl ToBatchPb for BatchGroupTopN {
74    fn to_batch_prost_body(&self) -> NodeBody {
75        let column_orders = self.core.order.to_protobuf();
76        NodeBody::GroupTopN(GroupTopNNode {
77            limit: self.core.limit_attr.limit(),
78            offset: self.core.offset,
79            column_orders,
80            group_key: self.group_key().iter().map(|c| *c as u32).collect(),
81            with_ties: self.core.limit_attr.with_ties(),
82        })
83    }
84}
85
86impl ToLocalBatch for BatchGroupTopN {
87    fn to_local(&self) -> Result<PlanRef> {
88        let input = self.input().to_local()?;
89        let input = RequiredDist::single().enforce_if_not_satisfies(input, &Order::any())?;
90        Ok(self.clone_with_input(input).into())
91    }
92}
93
94impl ExprRewritable for BatchGroupTopN {}
95
96impl ExprVisitable for BatchGroupTopN {}