risingwave_frontend/optimizer/plan_node/
stream_group_topn.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{DistillUnit, TopNLimit};
19use super::stream::prelude::*;
20use super::utils::{Distill, plan_node_name, watermark_pretty};
21use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, generic};
22use crate::PlanRef;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanNode;
25use crate::optimizer::property::{MonotonicityMap, Order};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct StreamGroupTopN {
30    pub base: PlanBase<Stream>,
31    core: generic::TopN<PlanRef>,
32    /// an optional column index which is the vnode of each row computed by the input's consistent
33    /// hash distribution
34    vnode_col_idx: Option<usize>,
35}
36
37impl StreamGroupTopN {
38    pub fn new(core: generic::TopN<PlanRef>, vnode_col_idx: Option<usize>) -> Self {
39        assert!(!core.group_key.is_empty());
40        assert!(core.limit_attr.limit() > 0);
41        let input = &core.input;
42
43        // FIXME(rc): Actually only watermark messages on the first group-by column are propagated
44        // acccoring to the current GroupTopN implementation. This should be fixed.
45        let watermark_columns = if input.append_only() {
46            input.watermark_columns().clone()
47        } else {
48            input.watermark_columns().retain_clone(&core.group_key)
49        };
50
51        let mut stream_key = core
52            .stream_key()
53            .expect("logical node should have stream key here");
54        if let Some(vnode_col_idx) = vnode_col_idx
55            && stream_key.len() > 1
56        {
57            // The output stream key of `GroupTopN` is a union of group key and input stream key,
58            // while vnode is calculated from a subset of input stream key. So we can safely remove
59            // the vnode column from output stream key. While at meanwhile we cannot leave the stream key
60            // as empty, so we only remove it when stream key length is > 1.
61            stream_key.remove(stream_key.iter().position(|i| *i == vnode_col_idx).unwrap());
62        }
63
64        let base = PlanBase::new_stream(
65            core.ctx(),
66            core.schema(),
67            Some(stream_key),
68            core.functional_dependency(),
69            input.distribution().clone(),
70            false,
71            // TODO: https://github.com/risingwavelabs/risingwave/issues/8348
72            false,
73            watermark_columns,
74            MonotonicityMap::new(), // TODO: derive monotonicity
75        );
76        StreamGroupTopN {
77            base,
78            core,
79            vnode_col_idx,
80        }
81    }
82
83    pub fn limit_attr(&self) -> TopNLimit {
84        self.core.limit_attr
85    }
86
87    pub fn offset(&self) -> u64 {
88        self.core.offset
89    }
90
91    pub fn topn_order(&self) -> &Order {
92        &self.core.order
93    }
94
95    pub fn group_key(&self) -> &[usize] {
96        &self.core.group_key
97    }
98}
99
100impl StreamNode for StreamGroupTopN {
101    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
102        use risingwave_pb::stream_plan::*;
103
104        let input = self.input();
105        let table = self
106            .core
107            .infer_internal_table_catalog(
108                input.schema(),
109                input.ctx(),
110                input.expect_stream_key(),
111                self.vnode_col_idx,
112            )
113            .with_id(state.gen_table_id_wrapped());
114        assert!(!self.group_key().is_empty());
115        let group_topn_node = GroupTopNNode {
116            limit: self.limit_attr().limit(),
117            offset: self.offset(),
118            with_ties: self.limit_attr().with_ties(),
119            group_key: self.group_key().iter().map(|idx| *idx as u32).collect(),
120            table: Some(table.to_internal_table_prost()),
121            order_by: self.topn_order().to_protobuf(),
122        };
123        if self.input().append_only() {
124            PbNodeBody::AppendOnlyGroupTopN(Box::new(group_topn_node))
125        } else {
126            PbNodeBody::GroupTopN(Box::new(group_topn_node))
127        }
128    }
129}
130
131impl Distill for StreamGroupTopN {
132    fn distill<'a>(&self) -> XmlNode<'a> {
133        let name = plan_node_name!("StreamGroupTopN",
134            { "append_only", self.input().append_only() },
135        );
136        let mut node = self.core.distill_with_name(name);
137        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
138            node.fields.push(("output_watermarks".into(), ow));
139        }
140        node
141    }
142}
143
144impl_plan_tree_node_for_unary! { StreamGroupTopN }
145
146impl PlanTreeNodeUnary for StreamGroupTopN {
147    fn input(&self) -> PlanRef {
148        self.core.input.clone()
149    }
150
151    fn clone_with_input(&self, input: PlanRef) -> Self {
152        let mut core = self.core.clone();
153        core.input = input;
154        Self::new(core, self.vnode_col_idx)
155    }
156}
157
158impl ExprRewritable for StreamGroupTopN {}
159
160impl ExprVisitable for StreamGroupTopN {}