risingwave_frontend/optimizer/plan_node/
stream_topn.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{DistillUnit, TopNLimit};
19use super::stream::prelude::*;
20use super::utils::{Distill, plan_node_name};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{Distribution, MonotonicityMap, Order, WatermarkColumns};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26/// `StreamTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct StreamTopN {
29    pub base: PlanBase<Stream>,
30    core: generic::TopN<PlanRef>,
31}
32
33impl StreamTopN {
34    pub fn new(core: generic::TopN<PlanRef>) -> Self {
35        assert!(core.group_key.is_empty());
36        assert!(core.limit_attr.limit() > 0);
37        let input = &core.input;
38        let dist = match input.distribution() {
39            Distribution::Single => Distribution::Single,
40            _ => panic!(),
41        };
42        let watermark_columns = WatermarkColumns::new();
43
44        let base = PlanBase::new_stream_with_core(
45            &core,
46            dist,
47            false,
48            false,
49            watermark_columns,
50            MonotonicityMap::new(),
51        );
52        StreamTopN { base, core }
53    }
54
55    pub fn limit_attr(&self) -> TopNLimit {
56        self.core.limit_attr
57    }
58
59    pub fn offset(&self) -> u64 {
60        self.core.offset
61    }
62
63    pub fn topn_order(&self) -> &Order {
64        &self.core.order
65    }
66}
67
68impl Distill for StreamTopN {
69    fn distill<'a>(&self) -> XmlNode<'a> {
70        let name = plan_node_name!("StreamTopN",
71            { "append_only", self.input().append_only() },
72        );
73        self.core.distill_with_name(name)
74    }
75}
76
77impl PlanTreeNodeUnary for StreamTopN {
78    fn input(&self) -> PlanRef {
79        self.core.input.clone()
80    }
81
82    fn clone_with_input(&self, input: PlanRef) -> Self {
83        let mut core = self.core.clone();
84        core.input = input;
85        Self::new(core)
86    }
87}
88
89impl_plan_tree_node_for_unary! { StreamTopN }
90
91impl StreamNode for StreamTopN {
92    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
93        use risingwave_pb::stream_plan::*;
94
95        let input = self.input();
96        let topn_node = TopNNode {
97            limit: self.limit_attr().limit(),
98            offset: self.offset(),
99            with_ties: self.limit_attr().with_ties(),
100            table: Some(
101                self.core
102                    .infer_internal_table_catalog(
103                        input.schema(),
104                        input.ctx(),
105                        input.expect_stream_key(),
106                        None,
107                    )
108                    .with_id(state.gen_table_id_wrapped())
109                    .to_internal_table_prost(),
110            ),
111            order_by: self.topn_order().to_protobuf(),
112        };
113        if self.input().append_only() {
114            PbNodeBody::AppendOnlyTopN(Box::new(topn_node))
115        } else {
116            PbNodeBody::TopN(Box::new(topn_node))
117        }
118    }
119}
120
121impl ExprRewritable for StreamTopN {}
122
123impl ExprVisitable for StreamTopN {}