risingwave_frontend/optimizer/plan_node/
stream_group_topn.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{DistillUnit, TopNLimit};
19use super::stream::prelude::*;
20use super::utils::{Distill, plan_node_name, watermark_pretty};
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::property::{MonotonicityMap, Order};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamGroupTopN {
32    pub base: PlanBase<Stream>,
33    core: generic::TopN<PlanRef>,
34    /// an optional column index which is the vnode of each row computed by the input's consistent
35    /// hash distribution
36    vnode_col_idx: Option<usize>,
37}
38
39impl StreamGroupTopN {
40    pub fn new(core: generic::TopN<PlanRef>, vnode_col_idx: Option<usize>) -> Result<Self> {
41        assert!(!core.group_key.is_empty());
42        assert!(core.limit_attr.limit() > 0);
43        reject_upsert_input!(core.input);
44
45        let input = &core.input;
46
47        // FIXME(rc): Actually only watermark messages on the first group-by column are propagated
48        // acccoring to the current GroupTopN implementation. This should be fixed.
49        let watermark_columns = if input.append_only() {
50            input.watermark_columns().clone()
51        } else {
52            input.watermark_columns().retain_clone(&core.group_key)
53        };
54
55        let mut stream_key = core
56            .stream_key()
57            .expect("logical node should have stream key here");
58        if let Some(vnode_col_idx) = vnode_col_idx
59            && stream_key.len() > 1
60        {
61            // The output stream key of `GroupTopN` is a union of group key and input stream key,
62            // while vnode is calculated from a subset of input stream key. So we can safely remove
63            // the vnode column from output stream key. While at meanwhile we cannot leave the stream key
64            // as empty, so we only remove it when stream key length is > 1.
65            stream_key.remove(stream_key.iter().position(|i| *i == vnode_col_idx).unwrap());
66        }
67
68        let base = PlanBase::new_stream(
69            core.ctx(),
70            core.schema(),
71            Some(stream_key),
72            core.functional_dependency(),
73            input.distribution().clone(),
74            StreamKind::Retract,
75            // TODO: https://github.com/risingwavelabs/risingwave/issues/8348
76            false,
77            watermark_columns,
78            MonotonicityMap::new(), // TODO: derive monotonicity
79        );
80
81        Ok(StreamGroupTopN {
82            base,
83            core,
84            vnode_col_idx,
85        })
86    }
87
88    pub fn limit_attr(&self) -> TopNLimit {
89        self.core.limit_attr
90    }
91
92    pub fn offset(&self) -> u64 {
93        self.core.offset
94    }
95
96    pub fn topn_order(&self) -> &Order {
97        &self.core.order
98    }
99
100    pub fn group_key(&self) -> &[usize] {
101        &self.core.group_key
102    }
103}
104
105impl StreamNode for StreamGroupTopN {
106    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
107        use risingwave_pb::stream_plan::*;
108
109        let input = self.input();
110        let table = self
111            .core
112            .infer_internal_table_catalog(
113                input.schema(),
114                input.ctx(),
115                input.expect_stream_key(),
116                self.vnode_col_idx,
117            )
118            .with_id(state.gen_table_id_wrapped());
119        assert!(!self.group_key().is_empty());
120        let group_topn_node = GroupTopNNode {
121            limit: self.limit_attr().limit(),
122            offset: self.offset(),
123            with_ties: self.limit_attr().with_ties(),
124            group_key: self.group_key().iter().map(|idx| *idx as u32).collect(),
125            table: Some(table.to_internal_table_prost()),
126            order_by: self.topn_order().to_protobuf(),
127        };
128        if self.input().append_only() {
129            PbNodeBody::AppendOnlyGroupTopN(Box::new(group_topn_node))
130        } else {
131            PbNodeBody::GroupTopN(Box::new(group_topn_node))
132        }
133    }
134}
135
136impl Distill for StreamGroupTopN {
137    fn distill<'a>(&self) -> XmlNode<'a> {
138        let name = plan_node_name!("StreamGroupTopN",
139            { "append_only", self.input().append_only() },
140        );
141        let mut node = self.core.distill_with_name(name);
142        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
143            node.fields.push(("output_watermarks".into(), ow));
144        }
145        node
146    }
147}
148
149impl_plan_tree_node_for_unary! { Stream, StreamGroupTopN }
150
151impl PlanTreeNodeUnary<Stream> for StreamGroupTopN {
152    fn input(&self) -> PlanRef {
153        self.core.input.clone()
154    }
155
156    fn clone_with_input(&self, input: PlanRef) -> Self {
157        let mut core = self.core.clone();
158        core.input = input;
159        Self::new(core, self.vnode_col_idx).unwrap()
160    }
161}
162
163impl ExprRewritable<Stream> for StreamGroupTopN {}
164
165impl ExprVisitable for StreamGroupTopN {}