risingwave_batch_executors/executor/
group_top_n.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::mem::swap;
17use std::sync::Arc;
18
19use futures_async_stream::try_stream;
20use hashbrown::HashMap;
21use itertools::Itertools;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bitmap::FilterByBitmap;
24use risingwave_common::catalog::Schema;
25use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
26use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
27use risingwave_common::types::DataType;
28use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::util::memcmp_encoding::encode_chunk;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_pb::batch_plan::plan_node::NodeBody;
33
34use super::top_n::{HeapElem, TopNHeap};
35use crate::error::{BatchError, Result};
36use crate::executor::{
37    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38};
39
40/// Group Top-N Executor
41///
42/// For each group, use a N-heap to store the smallest N rows.
43pub struct GroupTopNExecutor<K: HashKey> {
44    child: BoxedExecutor,
45    column_orders: Vec<ColumnOrder>,
46    offset: usize,
47    limit: usize,
48    group_key: Vec<usize>,
49    with_ties: bool,
50    schema: Schema,
51    identity: String,
52    chunk_size: usize,
53    mem_ctx: MemoryContext,
54    _phantom: PhantomData<K>,
55}
56
57pub struct GroupTopNExecutorBuilder {
58    child: BoxedExecutor,
59    column_orders: Vec<ColumnOrder>,
60    offset: usize,
61    limit: usize,
62    group_key: Vec<usize>,
63    group_key_types: Vec<DataType>,
64    with_ties: bool,
65    identity: String,
66    chunk_size: usize,
67    mem_ctx: MemoryContext,
68}
69
70impl HashKeyDispatcher for GroupTopNExecutorBuilder {
71    type Output = BoxedExecutor;
72
73    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
74        Box::new(GroupTopNExecutor::<K>::new(
75            self.child,
76            self.column_orders,
77            self.offset,
78            self.limit,
79            self.with_ties,
80            self.group_key,
81            self.identity,
82            self.chunk_size,
83            self.mem_ctx,
84        ))
85    }
86
87    fn data_types(&self) -> &[DataType] {
88        &self.group_key_types
89    }
90}
91
92impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
93    async fn new_boxed_executor(
94        source: &ExecutorBuilder<'_>,
95        inputs: Vec<BoxedExecutor>,
96    ) -> Result<BoxedExecutor> {
97        let [child]: [_; 1] = inputs.try_into().unwrap();
98
99        let top_n_node = try_match_expand!(
100            source.plan_node().get_node_body().unwrap(),
101            NodeBody::GroupTopN
102        )?;
103
104        let column_orders = top_n_node
105            .column_orders
106            .iter()
107            .map(ColumnOrder::from_protobuf)
108            .collect();
109
110        let group_key = top_n_node
111            .group_key
112            .iter()
113            .map(|x| *x as usize)
114            .collect_vec();
115        let child_schema = child.schema();
116        let group_key_types = group_key
117            .iter()
118            .map(|x| child_schema.fields[*x].data_type())
119            .collect();
120
121        let identity = source.plan_node().get_identity().clone();
122
123        let builder = Self {
124            child,
125            column_orders,
126            offset: top_n_node.get_offset() as usize,
127            limit: top_n_node.get_limit() as usize,
128            group_key,
129            group_key_types,
130            with_ties: top_n_node.get_with_ties(),
131            identity: identity.clone(),
132            chunk_size: source.context().get_config().developer.chunk_size,
133            mem_ctx: source.context().create_executor_mem_context(&identity),
134        };
135
136        Ok(builder.dispatch())
137    }
138}
139
140impl<K: HashKey> GroupTopNExecutor<K> {
141    pub fn new(
142        child: BoxedExecutor,
143        column_orders: Vec<ColumnOrder>,
144        offset: usize,
145        limit: usize,
146        with_ties: bool,
147        group_key: Vec<usize>,
148        identity: String,
149        chunk_size: usize,
150        mem_ctx: MemoryContext,
151    ) -> Self {
152        let schema = child.schema().clone();
153        Self {
154            child,
155            column_orders,
156            offset,
157            limit,
158            with_ties,
159            group_key,
160            schema,
161            identity,
162            chunk_size,
163            mem_ctx,
164            _phantom: PhantomData,
165        }
166    }
167}
168
169impl<K: HashKey> Executor for GroupTopNExecutor<K> {
170    fn schema(&self) -> &Schema {
171        &self.schema
172    }
173
174    fn identity(&self) -> &str {
175        &self.identity
176    }
177
178    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
179        self.do_execute()
180    }
181}
182
183impl<K: HashKey> GroupTopNExecutor<K> {
184    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
185    async fn do_execute(self: Box<Self>) {
186        if self.limit == 0 {
187            return Ok(());
188        }
189        let mut groups =
190            HashMap::<K, TopNHeap, PrecomputedBuildHasher, MonitoredGlobalAlloc>::with_hasher_in(
191                PrecomputedBuildHasher,
192                self.mem_ctx.global_allocator(),
193            );
194
195        #[for_await]
196        for chunk in self.child.execute() {
197            let chunk = Arc::new(chunk?);
198            let keys = K::build_many(self.group_key.as_slice(), &chunk);
199
200            for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)?
201                .into_iter()
202                .zip_eq_fast(keys.into_iter())
203                .enumerate()
204                .filter_by_bitmap(chunk.visibility())
205            {
206                let heap = groups.entry(key).or_insert_with(|| {
207                    TopNHeap::new(
208                        self.limit,
209                        self.offset,
210                        self.with_ties,
211                        self.mem_ctx.clone(),
212                    )
213                });
214                heap.push(HeapElem::new(encoded_row, chunk.row_at(row_id).0));
215            }
216        }
217
218        let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
219        for (_, h) in &mut groups {
220            let mut heap = TopNHeap::empty();
221            swap(&mut heap, h);
222            for ele in heap.dump() {
223                if let Some(spilled) = chunk_builder.append_one_row(ele.row()) {
224                    yield spilled
225                }
226            }
227        }
228        if let Some(spilled) = chunk_builder.consume_all() {
229            yield spilled
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use futures::stream::StreamExt;
237    use risingwave_common::catalog::Field;
238    use risingwave_common::metrics::LabelGuardedIntGauge;
239    use risingwave_common::test_prelude::DataChunkTestExt;
240    use risingwave_common::util::sort_util::OrderType;
241
242    use super::*;
243    use crate::executor::test_utils::MockExecutor;
244
245    const CHUNK_SIZE: usize = 1024;
246
247    #[tokio::test]
248    async fn test_group_top_n_executor() {
249        let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
250        {
251            let schema = Schema {
252                fields: vec![
253                    Field::unnamed(DataType::Int32),
254                    Field::unnamed(DataType::Int32),
255                    Field::unnamed(DataType::Int32),
256                ],
257            };
258            let mut mock_executor = MockExecutor::new(schema);
259            mock_executor.add(DataChunk::from_pretty(
260                "i i i
261             1 5 1
262             2 4 1
263             3 3 1
264             4 2 1
265             5 1 1
266             1 6 2
267             2 5 2
268             3 4 2
269             4 3 2
270             5 2 2
271             ",
272            ));
273            let column_orders = vec![
274                ColumnOrder {
275                    column_index: 1,
276                    order_type: OrderType::ascending(),
277                },
278                ColumnOrder {
279                    column_index: 0,
280                    order_type: OrderType::ascending(),
281                },
282            ];
283            let mem_ctx = MemoryContext::new(
284                Some(parent_mem.clone()),
285                LabelGuardedIntGauge::<4>::test_int_gauge(),
286            );
287            let top_n_executor = (GroupTopNExecutorBuilder {
288                child: Box::new(mock_executor),
289                column_orders,
290                offset: 1,
291                limit: 3,
292                with_ties: false,
293                group_key: vec![2],
294                group_key_types: vec![DataType::Int32],
295                identity: "GroupTopNExecutor".to_owned(),
296                chunk_size: CHUNK_SIZE,
297                mem_ctx,
298            })
299            .dispatch();
300
301            let fields = &top_n_executor.schema().fields;
302            assert_eq!(fields[0].data_type, DataType::Int32);
303            assert_eq!(fields[1].data_type, DataType::Int32);
304
305            let mut stream = top_n_executor.execute();
306            let res = stream.next().await;
307
308            assert!(res.is_some());
309            if let Some(res) = res {
310                let res = res.unwrap();
311                assert!(
312                    res == DataChunk::from_pretty(
313                        "
314                    i i i
315                    4 2 1
316                    3 3 1
317                    2 4 1
318                    4 3 2
319                    3 4 2
320                    2 5 2
321                    "
322                    ) || res
323                        == DataChunk::from_pretty(
324                            "
325                    i i i
326                    4 3 2
327                    3 4 2
328                    2 5 2
329                    4 2 1
330                    3 3 1
331                    2 4 1
332                    "
333                        )
334                );
335            }
336
337            let res = stream.next().await;
338            assert!(res.is_none());
339        }
340
341        assert_eq!(0, parent_mem.get_bytes_used());
342    }
343}