risingwave_batch_executors/executor/
limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::Schema;
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23
24use crate::error::{BatchError, Result};
25use crate::executor::{
26    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
27};
28
29/// Limit executor.
30pub struct LimitExecutor {
31    child: BoxedExecutor,
32    /// limit parameter
33    limit: usize,
34    /// offset parameter
35    offset: usize,
36    /// Identity string of the executor
37    identity: String,
38}
39
40impl BoxedExecutorBuilder for LimitExecutor {
41    async fn new_boxed_executor(
42        source: &ExecutorBuilder<'_>,
43        inputs: Vec<BoxedExecutor>,
44    ) -> Result<BoxedExecutor> {
45        let [child]: [_; 1] = inputs.try_into().unwrap();
46
47        let limit_node =
48            try_match_expand!(source.plan_node().get_node_body().unwrap(), NodeBody::Limit)?;
49
50        let limit = limit_node.get_limit() as usize;
51        let offset = limit_node.get_offset() as usize;
52
53        Ok(Box::new(Self::new(
54            child,
55            limit,
56            offset,
57            source.plan_node().get_identity().clone(),
58        )))
59    }
60}
61
62impl LimitExecutor {
63    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
64    async fn do_execute(self: Box<Self>) {
65        if self.limit == 0 {
66            return Ok(());
67        }
68        // the number of rows have been skipped due to offset
69        let mut skipped = 0;
70        // the number of rows have been returned as execute result
71        let mut returned = 0;
72
73        #[for_await]
74        for data_chunk in self.child.execute() {
75            if returned == self.limit {
76                break;
77            }
78            let data_chunk = data_chunk?;
79            let cardinality = data_chunk.cardinality();
80            if cardinality + skipped <= self.offset {
81                skipped += cardinality;
82                continue;
83            }
84
85            if skipped == self.offset && cardinality + returned <= self.limit {
86                returned += cardinality;
87                yield data_chunk;
88                continue;
89            }
90            // process chunk
91            let mut new_vis;
92            if !data_chunk.is_compacted() {
93                new_vis = data_chunk.visibility().iter().collect_vec();
94                for vis in new_vis.iter_mut().filter(|x| **x) {
95                    if skipped < self.offset {
96                        skipped += 1;
97                        *vis = false;
98                    } else if returned < self.limit {
99                        returned += 1;
100                    } else {
101                        *vis = false;
102                    }
103                }
104            } else {
105                let chunk_size = data_chunk.capacity();
106                new_vis = vec![false; chunk_size];
107                let l = self.offset - skipped;
108                let r = min(l + self.limit - returned, chunk_size);
109                new_vis[l..r].fill(true);
110                returned += r - l;
111                skipped += l;
112            }
113            yield data_chunk
114                .with_visibility(new_vis.into_iter().collect::<Bitmap>())
115                .compact();
116        }
117    }
118}
119
120impl Executor for LimitExecutor {
121    fn schema(&self) -> &Schema {
122        self.child.schema()
123    }
124
125    fn identity(&self) -> &str {
126        &self.identity
127    }
128
129    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
130        self.do_execute()
131    }
132}
133
134impl LimitExecutor {
135    pub fn new(child: BoxedExecutor, limit: usize, offset: usize, identity: String) -> Self {
136        Self {
137            child,
138            limit,
139            offset,
140            identity,
141        }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147
148    use std::vec;
149
150    use futures_async_stream::for_await;
151    use risingwave_common::array::{Array, ArrayRef, BoolArray, PrimitiveArray};
152    use risingwave_common::catalog::Field;
153    use risingwave_common::types::DataType;
154    use risingwave_common::util::iter_util::ZipEqDebug;
155
156    use super::*;
157    use crate::executor::test_utils::MockExecutor;
158
159    fn create_column(vec: &[Option<i32>]) -> ArrayRef {
160        PrimitiveArray::from_iter(vec).into_ref()
161    }
162
163    async fn test_limit_all_visible(
164        row_num: usize,
165        chunk_size: usize,
166        limit: usize,
167        offset: usize,
168    ) {
169        let col = create_column(
170            (0..row_num)
171                .map(|x| Some(x as i32))
172                .collect_vec()
173                .as_slice(),
174        );
175        let schema = Schema {
176            fields: vec![Field::unnamed(DataType::Int32)],
177        };
178        let mut mock_executor = MockExecutor::new(schema);
179
180        let data_chunk = DataChunk::new([col].to_vec(), row_num);
181
182        DataChunk::rechunk(&[data_chunk], chunk_size)
183            .unwrap()
184            .into_iter()
185            .for_each(|x| mock_executor.add(x));
186        let limit_executor = Box::new(LimitExecutor {
187            child: Box::new(mock_executor),
188            limit,
189            offset,
190            identity: "LimitExecutor2".to_owned(),
191        });
192        let fields = &limit_executor.schema().fields;
193        assert_eq!(fields[0].data_type, DataType::Int32);
194        let mut results = vec![];
195        let stream = limit_executor.execute();
196        #[for_await]
197        for chunk in stream {
198            let chunk = chunk.unwrap();
199            results.push(chunk);
200        }
201        let chunks =
202            DataChunk::rechunk(results.into_iter().collect_vec().as_slice(), row_num).unwrap();
203        assert_eq!(chunks.len(), 1);
204        let result = chunks.into_iter().next().unwrap();
205        let col = result.column_at(0);
206        assert_eq!(result.cardinality(), min(limit, row_num - offset));
207        for i in 0..result.cardinality() {
208            assert_eq!(col.as_int32().value_at(i), Some((offset + i) as i32));
209        }
210    }
211
212    pub(crate) struct MockLimitIter {
213        tot_row: usize,
214        limit: usize,
215        offset: usize,
216        visible: Vec<bool>,
217        returned: usize,
218        skipped: usize,
219        cur_row: usize,
220    }
221
222    impl MockLimitIter {
223        fn new(tot_row: usize, limit: usize, offset: usize, visible: Vec<bool>) -> Self {
224            assert_eq!(tot_row, visible.len());
225            let mut cur_row = 0;
226            while cur_row != tot_row && !visible[cur_row] {
227                cur_row += 1;
228            }
229            Self {
230                tot_row,
231                limit,
232                offset,
233                visible,
234                returned: 0,
235                skipped: 0,
236                cur_row,
237            }
238        }
239
240        fn next_visible(&mut self) {
241            self.cur_row += 1;
242            while self.cur_row != self.tot_row && !self.visible[self.cur_row] {
243                self.cur_row += 1;
244            }
245        }
246    }
247
248    impl Iterator for MockLimitIter {
249        type Item = usize;
250
251        fn next(&mut self) -> Option<Self::Item> {
252            if self.cur_row == self.tot_row {
253                return None;
254            }
255            if self.returned == self.limit {
256                return None;
257            }
258            while self.skipped < self.offset {
259                self.next_visible();
260                if self.cur_row == self.tot_row {
261                    return None;
262                }
263                self.skipped += 1;
264            }
265            let ret = self.cur_row;
266            self.next_visible();
267            self.returned += 1;
268            Some(ret)
269        }
270    }
271
272    async fn test_limit_with_visibility(
273        row_num: usize,
274        chunk_size: usize,
275        limit: usize,
276        offset: usize,
277        visible: Vec<bool>,
278    ) {
279        assert_eq!(visible.len(), row_num);
280        let col0 = create_column(
281            (0..row_num)
282                .map(|x| Some(x as i32))
283                .collect_vec()
284                .as_slice(),
285        );
286
287        let visible_array = BoolArray::from_iter(visible.iter().cloned());
288
289        let col1 = visible_array.into_ref();
290        let schema = Schema {
291            fields: vec![
292                Field::unnamed(DataType::Int32),
293                Field::unnamed(DataType::Boolean),
294            ],
295        };
296        let mut mock_executor = MockExecutor::new(schema);
297
298        let data_chunk = DataChunk::new([col0, col1].to_vec(), row_num);
299
300        DataChunk::rechunk(&[data_chunk], chunk_size)
301            .unwrap()
302            .into_iter()
303            .for_each(|x| {
304                mock_executor
305                    .add(x.with_visibility((x.column_at(1).as_bool()).iter().collect::<Bitmap>()))
306            });
307
308        let limit_executor = Box::new(LimitExecutor {
309            child: Box::new(mock_executor),
310            limit,
311            offset,
312            identity: "LimitExecutor2".to_owned(),
313        });
314
315        let mut results = vec![];
316        let stream = limit_executor.execute();
317        #[for_await]
318        for chunk in stream {
319            results.push(chunk.unwrap().compact());
320        }
321        let chunks =
322            DataChunk::rechunk(results.into_iter().collect_vec().as_slice(), row_num).unwrap();
323
324        if chunks.is_empty() {
325            assert_eq!(
326                MockLimitIter::new(row_num, limit, offset, visible).count(),
327                0
328            );
329            return;
330        }
331        assert_eq!(chunks.len(), 1);
332        let result = chunks.into_iter().next().unwrap();
333        let col0 = result.column_at(0);
334        let col1 = result.column_at(1);
335        assert_eq!(
336            MockLimitIter::new(row_num, limit, offset, visible.clone()).count(),
337            result.cardinality()
338        );
339        MockLimitIter::new(row_num, limit, offset, visible)
340            .zip_eq_debug(0..result.cardinality())
341            .for_each(|(expect, chunk_idx)| {
342                assert_eq!(col1.as_bool().value_at(chunk_idx), Some(true));
343                assert_eq!(col0.as_int32().value_at(chunk_idx), Some(expect as i32));
344            });
345    }
346
347    #[tokio::test]
348    async fn test_limit_executor() {
349        test_limit_all_visible(18, 18, 11, 0).await;
350        test_limit_all_visible(18, 3, 9, 0).await;
351        test_limit_all_visible(18, 3, 10, 0).await;
352        test_limit_all_visible(18, 3, 11, 0).await;
353    }
354
355    #[tokio::test]
356    async fn test_limit_executor_large() {
357        test_limit_all_visible(1024, 1024, 512, 0).await;
358        test_limit_all_visible(1024, 33, 512, 0).await;
359        test_limit_all_visible(1024, 33, 515, 0).await;
360    }
361
362    #[tokio::test]
363    async fn test_limit_executor_with_offset() {
364        for limit in 9..12 {
365            for offset in 3..6 {
366                test_limit_all_visible(18, 3, limit, offset).await;
367            }
368        }
369    }
370
371    #[tokio::test]
372    async fn test_limit_executor_with_visibility() {
373        let tot_row = 6;
374        for mask in 0..(1 << tot_row) {
375            let mut visibility = vec![];
376            for i in 0..tot_row {
377                visibility.push((mask >> i) & 1 == 1);
378            }
379            test_limit_with_visibility(tot_row, 2, 2, 2, visibility).await;
380        }
381    }
382}