risingwave_batch_executors/executor/
group_top_n.rs1use std::marker::PhantomData;
16use std::mem::swap;
17use std::sync::Arc;
18
19use futures_async_stream::try_stream;
20use hashbrown::HashMap;
21use itertools::Itertools;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bitmap::FilterByBitmap;
24use risingwave_common::catalog::Schema;
25use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
26use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
27use risingwave_common::types::DataType;
28use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::util::memcmp_encoding::encode_chunk;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_pb::batch_plan::plan_node::NodeBody;
33
34use super::top_n::{HeapElem, TopNHeap};
35use crate::error::{BatchError, Result};
36use crate::executor::{
37 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38};
39
40pub struct GroupTopNExecutor<K: HashKey> {
44 child: BoxedExecutor,
45 column_orders: Vec<ColumnOrder>,
46 offset: usize,
47 limit: usize,
48 group_key: Vec<usize>,
49 with_ties: bool,
50 schema: Schema,
51 identity: String,
52 chunk_size: usize,
53 mem_ctx: MemoryContext,
54 _phantom: PhantomData<K>,
55}
56
57pub struct GroupTopNExecutorBuilder {
58 child: BoxedExecutor,
59 column_orders: Vec<ColumnOrder>,
60 offset: usize,
61 limit: usize,
62 group_key: Vec<usize>,
63 group_key_types: Vec<DataType>,
64 with_ties: bool,
65 identity: String,
66 chunk_size: usize,
67 mem_ctx: MemoryContext,
68}
69
70impl HashKeyDispatcher for GroupTopNExecutorBuilder {
71 type Output = BoxedExecutor;
72
73 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
74 Box::new(GroupTopNExecutor::<K>::new(
75 self.child,
76 self.column_orders,
77 self.offset,
78 self.limit,
79 self.with_ties,
80 self.group_key,
81 self.identity,
82 self.chunk_size,
83 self.mem_ctx,
84 ))
85 }
86
87 fn data_types(&self) -> &[DataType] {
88 &self.group_key_types
89 }
90}
91
92impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
93 async fn new_boxed_executor(
94 source: &ExecutorBuilder<'_>,
95 inputs: Vec<BoxedExecutor>,
96 ) -> Result<BoxedExecutor> {
97 let [child]: [_; 1] = inputs.try_into().unwrap();
98
99 let top_n_node = try_match_expand!(
100 source.plan_node().get_node_body().unwrap(),
101 NodeBody::GroupTopN
102 )?;
103
104 let column_orders = top_n_node
105 .column_orders
106 .iter()
107 .map(ColumnOrder::from_protobuf)
108 .collect();
109
110 let group_key = top_n_node
111 .group_key
112 .iter()
113 .map(|x| *x as usize)
114 .collect_vec();
115 let child_schema = child.schema();
116 let group_key_types = group_key
117 .iter()
118 .map(|x| child_schema.fields[*x].data_type())
119 .collect();
120
121 let identity = source.plan_node().get_identity().clone();
122
123 let builder = Self {
124 child,
125 column_orders,
126 offset: top_n_node.get_offset() as usize,
127 limit: top_n_node.get_limit() as usize,
128 group_key,
129 group_key_types,
130 with_ties: top_n_node.get_with_ties(),
131 identity: identity.clone(),
132 chunk_size: source.context().get_config().developer.chunk_size,
133 mem_ctx: source.context().create_executor_mem_context(&identity),
134 };
135
136 Ok(builder.dispatch())
137 }
138}
139
140impl<K: HashKey> GroupTopNExecutor<K> {
141 pub fn new(
142 child: BoxedExecutor,
143 column_orders: Vec<ColumnOrder>,
144 offset: usize,
145 limit: usize,
146 with_ties: bool,
147 group_key: Vec<usize>,
148 identity: String,
149 chunk_size: usize,
150 mem_ctx: MemoryContext,
151 ) -> Self {
152 let schema = child.schema().clone();
153 Self {
154 child,
155 column_orders,
156 offset,
157 limit,
158 with_ties,
159 group_key,
160 schema,
161 identity,
162 chunk_size,
163 mem_ctx,
164 _phantom: PhantomData,
165 }
166 }
167}
168
169impl<K: HashKey> Executor for GroupTopNExecutor<K> {
170 fn schema(&self) -> &Schema {
171 &self.schema
172 }
173
174 fn identity(&self) -> &str {
175 &self.identity
176 }
177
178 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
179 self.do_execute()
180 }
181}
182
183impl<K: HashKey> GroupTopNExecutor<K> {
184 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
185 async fn do_execute(self: Box<Self>) {
186 if self.limit == 0 {
187 return Ok(());
188 }
189 let mut groups =
190 HashMap::<K, TopNHeap, PrecomputedBuildHasher, MonitoredGlobalAlloc>::with_hasher_in(
191 PrecomputedBuildHasher,
192 self.mem_ctx.global_allocator(),
193 );
194
195 #[for_await]
196 for chunk in self.child.execute() {
197 let chunk = Arc::new(chunk?);
198 let keys = K::build_many(self.group_key.as_slice(), &chunk);
199
200 for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)?
201 .into_iter()
202 .zip_eq_fast(keys.into_iter())
203 .enumerate()
204 .filter_by_bitmap(chunk.visibility())
205 {
206 let heap = groups.entry(key).or_insert_with(|| {
207 TopNHeap::new(
208 self.limit,
209 self.offset,
210 self.with_ties,
211 self.mem_ctx.clone(),
212 )
213 });
214 heap.push(HeapElem::new(encoded_row, chunk.row_at(row_id).0));
215 }
216 }
217
218 let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
219 for (_, h) in &mut groups {
220 let mut heap = TopNHeap::empty();
221 swap(&mut heap, h);
222 for ele in heap.dump() {
223 if let Some(spilled) = chunk_builder.append_one_row(ele.row()) {
224 yield spilled
225 }
226 }
227 }
228 if let Some(spilled) = chunk_builder.consume_all() {
229 yield spilled
230 }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use futures::stream::StreamExt;
237 use risingwave_common::catalog::Field;
238 use risingwave_common::metrics::LabelGuardedIntGauge;
239 use risingwave_common::test_prelude::DataChunkTestExt;
240 use risingwave_common::util::sort_util::OrderType;
241
242 use super::*;
243 use crate::executor::test_utils::MockExecutor;
244
245 const CHUNK_SIZE: usize = 1024;
246
247 #[tokio::test]
248 async fn test_group_top_n_executor() {
249 let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
250 {
251 let schema = Schema {
252 fields: vec![
253 Field::unnamed(DataType::Int32),
254 Field::unnamed(DataType::Int32),
255 Field::unnamed(DataType::Int32),
256 ],
257 };
258 let mut mock_executor = MockExecutor::new(schema);
259 mock_executor.add(DataChunk::from_pretty(
260 "i i i
261 1 5 1
262 2 4 1
263 3 3 1
264 4 2 1
265 5 1 1
266 1 6 2
267 2 5 2
268 3 4 2
269 4 3 2
270 5 2 2
271 ",
272 ));
273 let column_orders = vec![
274 ColumnOrder {
275 column_index: 1,
276 order_type: OrderType::ascending(),
277 },
278 ColumnOrder {
279 column_index: 0,
280 order_type: OrderType::ascending(),
281 },
282 ];
283 let mem_ctx = MemoryContext::new(
284 Some(parent_mem.clone()),
285 LabelGuardedIntGauge::<4>::test_int_gauge(),
286 );
287 let top_n_executor = (GroupTopNExecutorBuilder {
288 child: Box::new(mock_executor),
289 column_orders,
290 offset: 1,
291 limit: 3,
292 with_ties: false,
293 group_key: vec![2],
294 group_key_types: vec![DataType::Int32],
295 identity: "GroupTopNExecutor".to_owned(),
296 chunk_size: CHUNK_SIZE,
297 mem_ctx,
298 })
299 .dispatch();
300
301 let fields = &top_n_executor.schema().fields;
302 assert_eq!(fields[0].data_type, DataType::Int32);
303 assert_eq!(fields[1].data_type, DataType::Int32);
304
305 let mut stream = top_n_executor.execute();
306 let res = stream.next().await;
307
308 assert!(res.is_some());
309 if let Some(res) = res {
310 let res = res.unwrap();
311 assert!(
312 res == DataChunk::from_pretty(
313 "
314 i i i
315 4 2 1
316 3 3 1
317 2 4 1
318 4 3 2
319 3 4 2
320 2 5 2
321 "
322 ) || res
323 == DataChunk::from_pretty(
324 "
325 i i i
326 4 3 2
327 3 4 2
328 2 5 2
329 4 2 1
330 3 3 1
331 2 4 1
332 "
333 )
334 );
335 }
336
337 let res = stream.next().await;
338 assert!(res.is_none());
339 }
340
341 assert_eq!(0, parent_mem.get_bytes_used());
342 }
343}