risingwave_frontend/optimizer/plan_node/generic/
top_n.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use pretty_xmlish::{Pretty, Str, XmlNode};
18use risingwave_common::catalog::{FieldDisplay, Schema};
19use risingwave_common::util::sort_util::OrderType;
20
21use super::super::utils::TableCatalogBuilder;
22use super::{DistillUnit, GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
23use crate::TableCatalog;
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::StreamPlanRef;
26use crate::optimizer::plan_node::utils::childless_record;
27use crate::optimizer::property::{FunctionalDependencySet, Order, OrderDisplay};
28
29/// `TopN` sorts the input data and fetches up to `limit` rows from `offset`
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct TopN<PlanRef> {
32    pub input: PlanRef,
33    pub limit_attr: TopNLimit,
34    pub offset: u64,
35    pub order: Order,
36    pub group_key: Vec<usize>,
37}
38
39impl TopN<StreamPlanRef> {
40    /// Infers the state table catalog for [`super::super::StreamTopN`] and
41    /// [`super::super::StreamGroupTopN`].
42    pub fn infer_internal_table_catalog(
43        &self,
44        schema: &Schema,
45        _ctx: OptimizerContextRef,
46        input_stream_key: &[usize],
47        vnode_col_idx: Option<usize>,
48    ) -> TableCatalog {
49        let columns_fields = schema.fields().to_vec();
50        let column_orders = &self.order.column_orders;
51        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
52        columns_fields.iter().for_each(|field| {
53            internal_table_catalog_builder.add_column(field);
54        });
55        let mut order_cols = HashSet::new();
56
57        // Here we want the state table to store the states in the order we want, firstly in
58        // ascending order by the columns specified by the group key, then by the columns
59        // specified by `order`. If we do that, when the later group topN operator
60        // does a prefix scanning with the group key, we can fetch the data in the
61        // desired order.
62        self.group_key.iter().for_each(|&idx| {
63            internal_table_catalog_builder.add_order_column(idx, OrderType::ascending());
64            order_cols.insert(idx);
65        });
66        let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();
67
68        column_orders.iter().for_each(|order| {
69            internal_table_catalog_builder.add_order_column(order.column_index, order.order_type);
70            order_cols.insert(order.column_index);
71        });
72
73        input_stream_key.iter().for_each(|idx| {
74            if order_cols.insert(*idx) {
75                internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
76            }
77        });
78        if let Some(vnode_col_idx) = vnode_col_idx {
79            internal_table_catalog_builder.set_vnode_col_idx(vnode_col_idx);
80        }
81
82        internal_table_catalog_builder.build(
83            self.input.distribution().dist_column_indices().to_vec(),
84            read_prefix_len_hint,
85        )
86    }
87}
88
89impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
90    /// decompose -> (input, limit, offset, `with_ties`, order, `group_key`)
91    pub fn decompose(self) -> (PlanRef, u64, u64, bool, Order, Vec<usize>) {
92        let (limit, with_ties) = match self.limit_attr {
93            TopNLimit::Simple(limit) => (limit, false),
94            TopNLimit::WithTies(limit) => (limit, true),
95        };
96        (
97            self.input,
98            limit,
99            self.offset,
100            with_ties,
101            self.order,
102            self.group_key,
103        )
104    }
105}
106
107impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
108    pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> TopN<OtherPlanRef> {
109        TopN {
110            input,
111            limit_attr: self.limit_attr,
112            offset: self.offset,
113            order: self.order.clone(),
114            group_key: self.group_key.clone(),
115        }
116    }
117
118    pub fn with_group(
119        input: PlanRef,
120        limit_attr: TopNLimit,
121        offset: u64,
122        order: Order,
123        group_key: Vec<usize>,
124    ) -> Self {
125        if limit_attr.with_ties() {
126            assert!(offset == 0, "WITH TIES is not supported with OFFSET");
127        }
128
129        debug_assert!(
130            group_key.iter().all(|&idx| idx < input.schema().len()),
131            "Invalid group keys {:?} input schema size = {}",
132            &group_key,
133            input.schema().len()
134        );
135
136        Self {
137            input,
138            limit_attr,
139            offset,
140            order,
141            group_key,
142        }
143    }
144
145    pub fn without_group(input: PlanRef, limit_attr: TopNLimit, offset: u64, order: Order) -> Self {
146        if limit_attr.with_ties() {
147            assert!(offset == 0, "WITH TIES is not supported with OFFSET");
148        }
149
150        Self {
151            input,
152            limit_attr,
153            offset,
154            order,
155            group_key: vec![],
156        }
157    }
158}
159
160impl<PlanRef: GenericPlanRef> DistillUnit for TopN<PlanRef> {
161    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
162        let mut vec = Vec::with_capacity(5);
163        let input_schema = self.input.schema();
164        let order_d = OrderDisplay {
165            order: &self.order,
166            input_schema,
167        };
168        vec.push(("order", order_d.distill()));
169        vec.push(("limit", Pretty::debug(&self.limit_attr.limit())));
170        vec.push(("offset", Pretty::debug(&self.offset)));
171        if self.limit_attr.with_ties() {
172            vec.push(("with_ties", Pretty::debug(&true)));
173        }
174        if !self.group_key.is_empty() {
175            let f = |i| Pretty::display(&FieldDisplay(&self.input.schema()[i]));
176            vec.push((
177                "group_key",
178                Pretty::Array(self.group_key.iter().copied().map(f).collect()),
179            ));
180        }
181        childless_record(name, vec)
182    }
183}
184
185impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
186    fn schema(&self) -> Schema {
187        self.input.schema().clone()
188    }
189
190    fn stream_key(&self) -> Option<Vec<usize>> {
191        let input_stream_key = self.input.stream_key()?;
192        let mut stream_key = self.group_key.clone();
193        if !self.limit_attr.max_one_row() {
194            for i in input_stream_key {
195                if !stream_key.contains(i) {
196                    stream_key.push(*i);
197                }
198            }
199        }
200        // else: We can use the group key as the stream key when there is at most one record for each
201        // value of the group key.
202
203        Some(stream_key)
204    }
205
206    fn ctx(&self) -> OptimizerContextRef {
207        self.input.ctx()
208    }
209
210    fn functional_dependency(&self) -> FunctionalDependencySet {
211        self.input.functional_dependency().clone()
212    }
213}
214
215/// `Limit` is used to specify the number of records to return.
216#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
217pub enum TopNLimit {
218    /// The number of records returned is exactly the same as the number after `LIMIT` in the SQL
219    /// query.
220    Simple(u64),
221    /// If the SQL query contains `WITH TIES`, then it is supposed to bring all records with the
222    /// same value even if the number of records exceeds the number specified after `LIMIT` in the
223    /// query.
224    WithTies(u64),
225}
226
227impl TopNLimit {
228    pub fn new(limit: u64, with_ties: bool) -> Self {
229        if with_ties {
230            Self::WithTies(limit)
231        } else {
232            Self::Simple(limit)
233        }
234    }
235
236    pub fn limit(&self) -> u64 {
237        match self {
238            TopNLimit::Simple(limit) => *limit,
239            TopNLimit::WithTies(limit) => *limit,
240        }
241    }
242
243    pub fn with_ties(&self) -> bool {
244        match self {
245            TopNLimit::Simple(_) => false,
246            TopNLimit::WithTies(_) => true,
247        }
248    }
249
250    /// Whether this `Limit` returns at most one record for each value. Only `LIMIT 1` without
251    /// `WITH TIES` satisfies this condition.
252    pub fn max_one_row(&self) -> bool {
253        match self {
254            TopNLimit::Simple(limit) => *limit == 1,
255            TopNLimit::WithTies(_) => false,
256        }
257    }
258}