risingwave_frontend/optimizer/plan_node/generic/
top_n.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use pretty_xmlish::{Pretty, Str, XmlNode};
18use risingwave_common::catalog::{FieldDisplay, Schema};
19use risingwave_common::util::sort_util::OrderType;
20
21use super::super::utils::TableCatalogBuilder;
22use super::{DistillUnit, GenericPlanNode, GenericPlanRef, stream};
23use crate::TableCatalog;
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::utils::childless_record;
26use crate::optimizer::property::{FunctionalDependencySet, Order, OrderDisplay};
27
28/// `TopN` sorts the input data and fetches up to `limit` rows from `offset`
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct TopN<PlanRef> {
31    pub input: PlanRef,
32    pub limit_attr: TopNLimit,
33    pub offset: u64,
34    pub order: Order,
35    pub group_key: Vec<usize>,
36}
37
38impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
39    /// Infers the state table catalog for [`super::super::StreamTopN`] and
40    /// [`super::super::StreamGroupTopN`].
41    pub fn infer_internal_table_catalog(
42        &self,
43        schema: &Schema,
44        _ctx: OptimizerContextRef,
45        input_stream_key: &[usize],
46        vnode_col_idx: Option<usize>,
47    ) -> TableCatalog {
48        let columns_fields = schema.fields().to_vec();
49        let column_orders = &self.order.column_orders;
50        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
51        columns_fields.iter().for_each(|field| {
52            internal_table_catalog_builder.add_column(field);
53        });
54        let mut order_cols = HashSet::new();
55
56        // Here we want the state table to store the states in the order we want, firstly in
57        // ascending order by the columns specified by the group key, then by the columns
58        // specified by `order`. If we do that, when the later group topN operator
59        // does a prefix scanning with the group key, we can fetch the data in the
60        // desired order.
61        self.group_key.iter().for_each(|&idx| {
62            internal_table_catalog_builder.add_order_column(idx, OrderType::ascending());
63            order_cols.insert(idx);
64        });
65        let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();
66
67        column_orders.iter().for_each(|order| {
68            internal_table_catalog_builder.add_order_column(order.column_index, order.order_type);
69            order_cols.insert(order.column_index);
70        });
71
72        input_stream_key.iter().for_each(|idx| {
73            if order_cols.insert(*idx) {
74                internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
75            }
76        });
77        if let Some(vnode_col_idx) = vnode_col_idx {
78            internal_table_catalog_builder.set_vnode_col_idx(vnode_col_idx);
79        }
80
81        internal_table_catalog_builder.build(
82            self.input.distribution().dist_column_indices().to_vec(),
83            read_prefix_len_hint,
84        )
85    }
86
87    /// decompose -> (input, limit, offset, `with_ties`, order, `group_key`)
88    pub fn decompose(self) -> (PlanRef, u64, u64, bool, Order, Vec<usize>) {
89        let (limit, with_ties) = match self.limit_attr {
90            TopNLimit::Simple(limit) => (limit, false),
91            TopNLimit::WithTies(limit) => (limit, true),
92        };
93        (
94            self.input,
95            limit,
96            self.offset,
97            with_ties,
98            self.order,
99            self.group_key,
100        )
101    }
102}
103
104impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
105    pub fn with_group(
106        input: PlanRef,
107        limit_attr: TopNLimit,
108        offset: u64,
109        order: Order,
110        group_key: Vec<usize>,
111    ) -> Self {
112        if limit_attr.with_ties() {
113            assert!(offset == 0, "WITH TIES is not supported with OFFSET");
114        }
115
116        debug_assert!(
117            group_key.iter().all(|&idx| idx < input.schema().len()),
118            "Invalid group keys {:?} input schema size = {}",
119            &group_key,
120            input.schema().len()
121        );
122
123        Self {
124            input,
125            limit_attr,
126            offset,
127            order,
128            group_key,
129        }
130    }
131
132    pub fn without_group(input: PlanRef, limit_attr: TopNLimit, offset: u64, order: Order) -> Self {
133        if limit_attr.with_ties() {
134            assert!(offset == 0, "WITH TIES is not supported with OFFSET");
135        }
136
137        Self {
138            input,
139            limit_attr,
140            offset,
141            order,
142            group_key: vec![],
143        }
144    }
145}
146
147impl<PlanRef: GenericPlanRef> DistillUnit for TopN<PlanRef> {
148    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
149        let mut vec = Vec::with_capacity(5);
150        let input_schema = self.input.schema();
151        let order_d = OrderDisplay {
152            order: &self.order,
153            input_schema,
154        };
155        vec.push(("order", order_d.distill()));
156        vec.push(("limit", Pretty::debug(&self.limit_attr.limit())));
157        vec.push(("offset", Pretty::debug(&self.offset)));
158        if self.limit_attr.with_ties() {
159            vec.push(("with_ties", Pretty::debug(&true)));
160        }
161        if !self.group_key.is_empty() {
162            let f = |i| Pretty::display(&FieldDisplay(&self.input.schema()[i]));
163            vec.push((
164                "group_key",
165                Pretty::Array(self.group_key.iter().copied().map(f).collect()),
166            ));
167        }
168        childless_record(name, vec)
169    }
170}
171
172impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
173    fn schema(&self) -> Schema {
174        self.input.schema().clone()
175    }
176
177    fn stream_key(&self) -> Option<Vec<usize>> {
178        let input_stream_key = self.input.stream_key()?;
179        let mut stream_key = self.group_key.clone();
180        if !self.limit_attr.max_one_row() {
181            for i in input_stream_key {
182                if !stream_key.contains(i) {
183                    stream_key.push(*i);
184                }
185            }
186        }
187        // else: We can use the group key as the stream key when there is at most one record for each
188        // value of the group key.
189
190        Some(stream_key)
191    }
192
193    fn ctx(&self) -> OptimizerContextRef {
194        self.input.ctx()
195    }
196
197    fn functional_dependency(&self) -> FunctionalDependencySet {
198        self.input.functional_dependency().clone()
199    }
200}
201
202/// `Limit` is used to specify the number of records to return.
203#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
204pub enum TopNLimit {
205    /// The number of records returned is exactly the same as the number after `LIMIT` in the SQL
206    /// query.
207    Simple(u64),
208    /// If the SQL query contains `WITH TIES`, then it is supposed to bring all records with the
209    /// same value even if the number of records exceeds the number specified after `LIMIT` in the
210    /// query.
211    WithTies(u64),
212}
213
214impl TopNLimit {
215    pub fn new(limit: u64, with_ties: bool) -> Self {
216        if with_ties {
217            Self::WithTies(limit)
218        } else {
219            Self::Simple(limit)
220        }
221    }
222
223    pub fn limit(&self) -> u64 {
224        match self {
225            TopNLimit::Simple(limit) => *limit,
226            TopNLimit::WithTies(limit) => *limit,
227        }
228    }
229
230    pub fn with_ties(&self) -> bool {
231        match self {
232            TopNLimit::Simple(_) => false,
233            TopNLimit::WithTies(_) => true,
234        }
235    }
236
237    /// Whether this `Limit` returns at most one record for each value. Only `LIMIT 1` without
238    /// `WITH TIES` satisfies this condition.
239    pub fn max_one_row(&self) -> bool {
240        match self {
241            TopNLimit::Simple(limit) => *limit == 1,
242            TopNLimit::WithTies(_) => false,
243        }
244    }
245}