risingwave_frontend/optimizer/plan_node/
stream_topn.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16
17use pretty_xmlish::XmlNode;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::{DistillUnit, TopNLimit};
21use super::stream::prelude::*;
22use super::utils::{Distill, plan_node_name};
23use super::{
24    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
25};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, MonotonicityMap, Order, WatermarkColumns};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30/// `StreamTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamTopN {
33    pub base: PlanBase<Stream>,
34    core: generic::TopN<PlanRef>,
35}
36
37impl StreamTopN {
38    pub fn new(core: generic::TopN<PlanRef>) -> Result<Self> {
39        assert!(core.group_key.is_empty());
40        assert!(core.limit_attr.limit() > 0);
41        let input = &core.input;
42        assert_matches!(input.distribution(), Distribution::Single);
43        reject_upsert_input!(input);
44        let watermark_columns = WatermarkColumns::new();
45
46        let base = PlanBase::new_stream_with_core(
47            &core,
48            Distribution::Single,
49            StreamKind::Retract,
50            false,
51            watermark_columns,
52            MonotonicityMap::new(),
53        );
54
55        Ok(StreamTopN { base, core })
56    }
57
58    pub fn limit_attr(&self) -> TopNLimit {
59        self.core.limit_attr
60    }
61
62    pub fn offset(&self) -> u64 {
63        self.core.offset
64    }
65
66    pub fn topn_order(&self) -> &Order {
67        &self.core.order
68    }
69}
70
71impl Distill for StreamTopN {
72    fn distill<'a>(&self) -> XmlNode<'a> {
73        let name = plan_node_name!("StreamTopN",
74            { "append_only", self.input().append_only() },
75        );
76        self.core.distill_with_name(name)
77    }
78}
79
80impl PlanTreeNodeUnary<Stream> for StreamTopN {
81    fn input(&self) -> PlanRef {
82        self.core.input.clone()
83    }
84
85    fn clone_with_input(&self, input: PlanRef) -> Self {
86        let mut core = self.core.clone();
87        core.input = input;
88        Self::new(core).unwrap()
89    }
90}
91
92impl_plan_tree_node_for_unary! { Stream, StreamTopN }
93
94impl StreamNode for StreamTopN {
95    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
96        use risingwave_pb::stream_plan::*;
97
98        let input = self.input();
99        let topn_node = TopNNode {
100            limit: self.limit_attr().limit(),
101            offset: self.offset(),
102            with_ties: self.limit_attr().with_ties(),
103            table: Some(
104                self.core
105                    .infer_internal_table_catalog(
106                        input.schema(),
107                        input.ctx(),
108                        input.expect_stream_key(),
109                        None,
110                    )
111                    .with_id(state.gen_table_id_wrapped())
112                    .to_internal_table_prost(),
113            ),
114            order_by: self.topn_order().to_protobuf(),
115        };
116        if self.input().append_only() {
117            PbNodeBody::AppendOnlyTopN(Box::new(topn_node))
118        } else {
119            PbNodeBody::TopN(Box::new(topn_node))
120        }
121    }
122}
123
124impl ExprRewritable<Stream> for StreamTopN {}
125
126impl ExprVisitable for StreamTopN {}