risingwave_frontend/optimizer/plan_node/
batch_group_topn.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::GroupTopNNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22    ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::ToLocalBatch;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Order, RequiredDist};
28
29/// `BatchGroupTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchGroupTopN {
32    pub base: PlanBase<Batch>,
33    core: generic::TopN<PlanRef>,
34}
35
36impl BatchGroupTopN {
37    pub fn new(core: generic::TopN<PlanRef>) -> Self {
38        assert!(!core.group_key.is_empty());
39        let base =
40            PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
41        BatchGroupTopN { base, core }
42    }
43
44    fn group_key(&self) -> &[usize] {
45        &self.core.group_key
46    }
47}
48
49impl_distill_by_unit!(BatchGroupTopN, core, "BatchGroupTopN");
50
51impl PlanTreeNodeUnary<Batch> for BatchGroupTopN {
52    fn input(&self) -> PlanRef {
53        self.core.input.clone()
54    }
55
56    fn clone_with_input(&self, input: PlanRef) -> Self {
57        let mut core = self.core.clone();
58        core.input = input;
59        Self::new(core)
60    }
61}
62
63impl_plan_tree_node_for_unary! { Batch, BatchGroupTopN}
64
65impl ToDistributedBatch for BatchGroupTopN {
66    fn to_distributed(&self) -> Result<PlanRef> {
67        let input = self.input().to_distributed()?;
68        let input = RequiredDist::hash_shard(self.group_key())
69            .batch_enforce_if_not_satisfies(input, &Order::any())?;
70        Ok(self.clone_with_input(input).into())
71    }
72}
73
74impl ToBatchPb for BatchGroupTopN {
75    fn to_batch_prost_body(&self) -> NodeBody {
76        let column_orders = self.core.order.to_protobuf();
77        NodeBody::GroupTopN(GroupTopNNode {
78            limit: self.core.limit_attr.limit(),
79            offset: self.core.offset,
80            column_orders,
81            group_key: self.group_key().iter().map(|c| *c as u32).collect(),
82            with_ties: self.core.limit_attr.with_ties(),
83        })
84    }
85}
86
87impl ToLocalBatch for BatchGroupTopN {
88    fn to_local(&self) -> Result<PlanRef> {
89        let input = self.input().to_local()?;
90        let input = RequiredDist::single().batch_enforce_if_not_satisfies(input, &Order::any())?;
91        Ok(self.clone_with_input(input).into())
92    }
93}
94
95impl ExprRewritable<Batch> for BatchGroupTopN {}
96
97impl ExprVisitable for BatchGroupTopN {}