risingwave_frontend/optimizer/plan_node/generic/
top_n.rs1use std::collections::HashSet;
16
17use pretty_xmlish::{Pretty, Str, XmlNode};
18use risingwave_common::catalog::{FieldDisplay, Schema};
19use risingwave_common::util::sort_util::OrderType;
20
21use super::super::utils::TableCatalogBuilder;
22use super::{DistillUnit, GenericPlanNode, GenericPlanRef, stream};
23use crate::TableCatalog;
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::utils::childless_record;
26use crate::optimizer::property::{FunctionalDependencySet, Order, OrderDisplay};
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct TopN<PlanRef> {
31 pub input: PlanRef,
32 pub limit_attr: TopNLimit,
33 pub offset: u64,
34 pub order: Order,
35 pub group_key: Vec<usize>,
36}
37
38impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
39 pub fn infer_internal_table_catalog(
42 &self,
43 schema: &Schema,
44 _ctx: OptimizerContextRef,
45 input_stream_key: &[usize],
46 vnode_col_idx: Option<usize>,
47 ) -> TableCatalog {
48 let columns_fields = schema.fields().to_vec();
49 let column_orders = &self.order.column_orders;
50 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
51 columns_fields.iter().for_each(|field| {
52 internal_table_catalog_builder.add_column(field);
53 });
54 let mut order_cols = HashSet::new();
55
56 self.group_key.iter().for_each(|&idx| {
62 internal_table_catalog_builder.add_order_column(idx, OrderType::ascending());
63 order_cols.insert(idx);
64 });
65 let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();
66
67 column_orders.iter().for_each(|order| {
68 internal_table_catalog_builder.add_order_column(order.column_index, order.order_type);
69 order_cols.insert(order.column_index);
70 });
71
72 input_stream_key.iter().for_each(|idx| {
73 if order_cols.insert(*idx) {
74 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
75 }
76 });
77 if let Some(vnode_col_idx) = vnode_col_idx {
78 internal_table_catalog_builder.set_vnode_col_idx(vnode_col_idx);
79 }
80
81 internal_table_catalog_builder.build(
82 self.input.distribution().dist_column_indices().to_vec(),
83 read_prefix_len_hint,
84 )
85 }
86
87 pub fn decompose(self) -> (PlanRef, u64, u64, bool, Order, Vec<usize>) {
89 let (limit, with_ties) = match self.limit_attr {
90 TopNLimit::Simple(limit) => (limit, false),
91 TopNLimit::WithTies(limit) => (limit, true),
92 };
93 (
94 self.input,
95 limit,
96 self.offset,
97 with_ties,
98 self.order,
99 self.group_key,
100 )
101 }
102}
103
104impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
105 pub fn with_group(
106 input: PlanRef,
107 limit_attr: TopNLimit,
108 offset: u64,
109 order: Order,
110 group_key: Vec<usize>,
111 ) -> Self {
112 if limit_attr.with_ties() {
113 assert!(offset == 0, "WITH TIES is not supported with OFFSET");
114 }
115
116 debug_assert!(
117 group_key.iter().all(|&idx| idx < input.schema().len()),
118 "Invalid group keys {:?} input schema size = {}",
119 &group_key,
120 input.schema().len()
121 );
122
123 Self {
124 input,
125 limit_attr,
126 offset,
127 order,
128 group_key,
129 }
130 }
131
132 pub fn without_group(input: PlanRef, limit_attr: TopNLimit, offset: u64, order: Order) -> Self {
133 if limit_attr.with_ties() {
134 assert!(offset == 0, "WITH TIES is not supported with OFFSET");
135 }
136
137 Self {
138 input,
139 limit_attr,
140 offset,
141 order,
142 group_key: vec![],
143 }
144 }
145}
146
147impl<PlanRef: GenericPlanRef> DistillUnit for TopN<PlanRef> {
148 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
149 let mut vec = Vec::with_capacity(5);
150 let input_schema = self.input.schema();
151 let order_d = OrderDisplay {
152 order: &self.order,
153 input_schema,
154 };
155 vec.push(("order", order_d.distill()));
156 vec.push(("limit", Pretty::debug(&self.limit_attr.limit())));
157 vec.push(("offset", Pretty::debug(&self.offset)));
158 if self.limit_attr.with_ties() {
159 vec.push(("with_ties", Pretty::debug(&true)));
160 }
161 if !self.group_key.is_empty() {
162 let f = |i| Pretty::display(&FieldDisplay(&self.input.schema()[i]));
163 vec.push((
164 "group_key",
165 Pretty::Array(self.group_key.iter().copied().map(f).collect()),
166 ));
167 }
168 childless_record(name, vec)
169 }
170}
171
172impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
173 fn schema(&self) -> Schema {
174 self.input.schema().clone()
175 }
176
177 fn stream_key(&self) -> Option<Vec<usize>> {
178 let input_stream_key = self.input.stream_key()?;
179 let mut stream_key = self.group_key.clone();
180 if !self.limit_attr.max_one_row() {
181 for i in input_stream_key {
182 if !stream_key.contains(i) {
183 stream_key.push(*i);
184 }
185 }
186 }
187 Some(stream_key)
191 }
192
193 fn ctx(&self) -> OptimizerContextRef {
194 self.input.ctx()
195 }
196
197 fn functional_dependency(&self) -> FunctionalDependencySet {
198 self.input.functional_dependency().clone()
199 }
200}
201
202#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
204pub enum TopNLimit {
205 Simple(u64),
208 WithTies(u64),
212}
213
214impl TopNLimit {
215 pub fn new(limit: u64, with_ties: bool) -> Self {
216 if with_ties {
217 Self::WithTies(limit)
218 } else {
219 Self::Simple(limit)
220 }
221 }
222
223 pub fn limit(&self) -> u64 {
224 match self {
225 TopNLimit::Simple(limit) => *limit,
226 TopNLimit::WithTies(limit) => *limit,
227 }
228 }
229
230 pub fn with_ties(&self) -> bool {
231 match self {
232 TopNLimit::Simple(_) => false,
233 TopNLimit::WithTies(_) => true,
234 }
235 }
236
237 pub fn max_one_row(&self) -> bool {
240 match self {
241 TopNLimit::Simple(limit) => *limit == 1,
242 TopNLimit::WithTies(_) => false,
243 }
244 }
245}