risingwave_frontend/optimizer/plan_node/generic/
top_n.rs1use std::collections::HashSet;
16
17use pretty_xmlish::{Pretty, Str, XmlNode};
18use risingwave_common::catalog::{FieldDisplay, Schema};
19use risingwave_common::util::sort_util::OrderType;
20
21use super::super::utils::TableCatalogBuilder;
22use super::{DistillUnit, GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
23use crate::TableCatalog;
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::StreamPlanRef;
26use crate::optimizer::plan_node::utils::childless_record;
27use crate::optimizer::property::{FunctionalDependencySet, Order, OrderDisplay};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct TopN<PlanRef> {
32 pub input: PlanRef,
33 pub limit_attr: TopNLimit,
34 pub offset: u64,
35 pub order: Order,
36 pub group_key: Vec<usize>,
37}
38
39impl TopN<StreamPlanRef> {
40 pub fn infer_internal_table_catalog(
43 &self,
44 schema: &Schema,
45 _ctx: OptimizerContextRef,
46 input_stream_key: &[usize],
47 vnode_col_idx: Option<usize>,
48 ) -> TableCatalog {
49 let columns_fields = schema.fields().to_vec();
50 let column_orders = &self.order.column_orders;
51 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
52 columns_fields.iter().for_each(|field| {
53 internal_table_catalog_builder.add_column(field);
54 });
55 let mut order_cols = HashSet::new();
56
57 self.group_key.iter().for_each(|&idx| {
63 internal_table_catalog_builder.add_order_column(idx, OrderType::ascending());
64 order_cols.insert(idx);
65 });
66 let read_prefix_len_hint = internal_table_catalog_builder.get_current_pk_len();
67
68 column_orders.iter().for_each(|order| {
69 internal_table_catalog_builder.add_order_column(order.column_index, order.order_type);
70 order_cols.insert(order.column_index);
71 });
72
73 input_stream_key.iter().for_each(|idx| {
74 if order_cols.insert(*idx) {
75 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
76 }
77 });
78 if let Some(vnode_col_idx) = vnode_col_idx {
79 internal_table_catalog_builder.set_vnode_col_idx(vnode_col_idx);
80 }
81
82 internal_table_catalog_builder.build(
83 self.input.distribution().dist_column_indices().to_vec(),
84 read_prefix_len_hint,
85 )
86 }
87}
88
89impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
90 pub fn decompose(self) -> (PlanRef, u64, u64, bool, Order, Vec<usize>) {
92 let (limit, with_ties) = match self.limit_attr {
93 TopNLimit::Simple(limit) => (limit, false),
94 TopNLimit::WithTies(limit) => (limit, true),
95 };
96 (
97 self.input,
98 limit,
99 self.offset,
100 with_ties,
101 self.order,
102 self.group_key,
103 )
104 }
105}
106
107impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
108 pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> TopN<OtherPlanRef> {
109 TopN {
110 input,
111 limit_attr: self.limit_attr,
112 offset: self.offset,
113 order: self.order.clone(),
114 group_key: self.group_key.clone(),
115 }
116 }
117
118 pub fn with_group(
119 input: PlanRef,
120 limit_attr: TopNLimit,
121 offset: u64,
122 order: Order,
123 group_key: Vec<usize>,
124 ) -> Self {
125 if limit_attr.with_ties() {
126 assert!(offset == 0, "WITH TIES is not supported with OFFSET");
127 }
128
129 debug_assert!(
130 group_key.iter().all(|&idx| idx < input.schema().len()),
131 "Invalid group keys {:?} input schema size = {}",
132 &group_key,
133 input.schema().len()
134 );
135
136 Self {
137 input,
138 limit_attr,
139 offset,
140 order,
141 group_key,
142 }
143 }
144
145 pub fn without_group(input: PlanRef, limit_attr: TopNLimit, offset: u64, order: Order) -> Self {
146 if limit_attr.with_ties() {
147 assert!(offset == 0, "WITH TIES is not supported with OFFSET");
148 }
149
150 Self {
151 input,
152 limit_attr,
153 offset,
154 order,
155 group_key: vec![],
156 }
157 }
158}
159
160impl<PlanRef: GenericPlanRef> DistillUnit for TopN<PlanRef> {
161 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
162 let mut vec = Vec::with_capacity(5);
163 let input_schema = self.input.schema();
164 let order_d = OrderDisplay {
165 order: &self.order,
166 input_schema,
167 };
168 vec.push(("order", order_d.distill()));
169 vec.push(("limit", Pretty::debug(&self.limit_attr.limit())));
170 vec.push(("offset", Pretty::debug(&self.offset)));
171 if self.limit_attr.with_ties() {
172 vec.push(("with_ties", Pretty::debug(&true)));
173 }
174 if !self.group_key.is_empty() {
175 let f = |i| Pretty::display(&FieldDisplay(&self.input.schema()[i]));
176 vec.push((
177 "group_key",
178 Pretty::Array(self.group_key.iter().copied().map(f).collect()),
179 ));
180 }
181 childless_record(name, vec)
182 }
183}
184
185impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
186 fn schema(&self) -> Schema {
187 self.input.schema().clone()
188 }
189
190 fn stream_key(&self) -> Option<Vec<usize>> {
191 let input_stream_key = self.input.stream_key()?;
192 let mut stream_key = self.group_key.clone();
193 if !self.limit_attr.max_one_row() {
194 for i in input_stream_key {
195 if !stream_key.contains(i) {
196 stream_key.push(*i);
197 }
198 }
199 }
200 Some(stream_key)
204 }
205
206 fn ctx(&self) -> OptimizerContextRef {
207 self.input.ctx()
208 }
209
210 fn functional_dependency(&self) -> FunctionalDependencySet {
211 self.input.functional_dependency().clone()
212 }
213}
214
215#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
217pub enum TopNLimit {
218 Simple(u64),
221 WithTies(u64),
225}
226
227impl TopNLimit {
228 pub fn new(limit: u64, with_ties: bool) -> Self {
229 if with_ties {
230 Self::WithTies(limit)
231 } else {
232 Self::Simple(limit)
233 }
234 }
235
236 pub fn limit(&self) -> u64 {
237 match self {
238 TopNLimit::Simple(limit) => *limit,
239 TopNLimit::WithTies(limit) => *limit,
240 }
241 }
242
243 pub fn with_ties(&self) -> bool {
244 match self {
245 TopNLimit::Simple(_) => false,
246 TopNLimit::WithTies(_) => true,
247 }
248 }
249
250 pub fn max_one_row(&self) -> bool {
253 match self {
254 TopNLimit::Simple(limit) => *limit == 1,
255 TopNLimit::WithTies(_) => false,
256 }
257 }
258}