risingwave_batch_executors/executor/
limit.rs1use std::cmp::min;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::Schema;
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23
24use crate::error::{BatchError, Result};
25use crate::executor::{
26 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
27};
28
29pub struct LimitExecutor {
31 child: BoxedExecutor,
32 limit: usize,
34 offset: usize,
36 identity: String,
38}
39
40impl BoxedExecutorBuilder for LimitExecutor {
41 async fn new_boxed_executor(
42 source: &ExecutorBuilder<'_>,
43 inputs: Vec<BoxedExecutor>,
44 ) -> Result<BoxedExecutor> {
45 let [child]: [_; 1] = inputs.try_into().unwrap();
46
47 let limit_node =
48 try_match_expand!(source.plan_node().get_node_body().unwrap(), NodeBody::Limit)?;
49
50 let limit = limit_node.get_limit() as usize;
51 let offset = limit_node.get_offset() as usize;
52
53 Ok(Box::new(Self::new(
54 child,
55 limit,
56 offset,
57 source.plan_node().get_identity().clone(),
58 )))
59 }
60}
61
62impl LimitExecutor {
63 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
64 async fn do_execute(self: Box<Self>) {
65 if self.limit == 0 {
66 return Ok(());
67 }
68 let mut skipped = 0;
70 let mut returned = 0;
72
73 #[for_await]
74 for data_chunk in self.child.execute() {
75 if returned == self.limit {
76 break;
77 }
78 let data_chunk = data_chunk?;
79 let cardinality = data_chunk.cardinality();
80 if cardinality + skipped <= self.offset {
81 skipped += cardinality;
82 continue;
83 }
84
85 if skipped == self.offset && cardinality + returned <= self.limit {
86 returned += cardinality;
87 yield data_chunk;
88 continue;
89 }
90 let mut new_vis;
92 if !data_chunk.is_compacted() {
93 new_vis = data_chunk.visibility().iter().collect_vec();
94 for vis in new_vis.iter_mut().filter(|x| **x) {
95 if skipped < self.offset {
96 skipped += 1;
97 *vis = false;
98 } else if returned < self.limit {
99 returned += 1;
100 } else {
101 *vis = false;
102 }
103 }
104 } else {
105 let chunk_size = data_chunk.capacity();
106 new_vis = vec![false; chunk_size];
107 let l = self.offset - skipped;
108 let r = min(l + self.limit - returned, chunk_size);
109 new_vis[l..r].fill(true);
110 returned += r - l;
111 skipped += l;
112 }
113 yield data_chunk
114 .with_visibility(new_vis.into_iter().collect::<Bitmap>())
115 .compact();
116 }
117 }
118}
119
120impl Executor for LimitExecutor {
121 fn schema(&self) -> &Schema {
122 self.child.schema()
123 }
124
125 fn identity(&self) -> &str {
126 &self.identity
127 }
128
129 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
130 self.do_execute()
131 }
132}
133
134impl LimitExecutor {
135 pub fn new(child: BoxedExecutor, limit: usize, offset: usize, identity: String) -> Self {
136 Self {
137 child,
138 limit,
139 offset,
140 identity,
141 }
142 }
143}
144
145#[cfg(test)]
146mod tests {
147
148 use std::vec;
149
150 use futures_async_stream::for_await;
151 use risingwave_common::array::{Array, ArrayRef, BoolArray, PrimitiveArray};
152 use risingwave_common::catalog::Field;
153 use risingwave_common::types::DataType;
154 use risingwave_common::util::iter_util::ZipEqDebug;
155
156 use super::*;
157 use crate::executor::test_utils::MockExecutor;
158
159 fn create_column(vec: &[Option<i32>]) -> ArrayRef {
160 PrimitiveArray::from_iter(vec).into_ref()
161 }
162
163 async fn test_limit_all_visible(
164 row_num: usize,
165 chunk_size: usize,
166 limit: usize,
167 offset: usize,
168 ) {
169 let col = create_column(
170 (0..row_num)
171 .map(|x| Some(x as i32))
172 .collect_vec()
173 .as_slice(),
174 );
175 let schema = Schema {
176 fields: vec![Field::unnamed(DataType::Int32)],
177 };
178 let mut mock_executor = MockExecutor::new(schema);
179
180 let data_chunk = DataChunk::new([col].to_vec(), row_num);
181
182 DataChunk::rechunk(&[data_chunk], chunk_size)
183 .unwrap()
184 .into_iter()
185 .for_each(|x| mock_executor.add(x));
186 let limit_executor = Box::new(LimitExecutor {
187 child: Box::new(mock_executor),
188 limit,
189 offset,
190 identity: "LimitExecutor2".to_owned(),
191 });
192 let fields = &limit_executor.schema().fields;
193 assert_eq!(fields[0].data_type, DataType::Int32);
194 let mut results = vec![];
195 let stream = limit_executor.execute();
196 #[for_await]
197 for chunk in stream {
198 let chunk = chunk.unwrap();
199 results.push(chunk);
200 }
201 let chunks =
202 DataChunk::rechunk(results.into_iter().collect_vec().as_slice(), row_num).unwrap();
203 assert_eq!(chunks.len(), 1);
204 let result = chunks.into_iter().next().unwrap();
205 let col = result.column_at(0);
206 assert_eq!(result.cardinality(), min(limit, row_num - offset));
207 for i in 0..result.cardinality() {
208 assert_eq!(col.as_int32().value_at(i), Some((offset + i) as i32));
209 }
210 }
211
212 pub(crate) struct MockLimitIter {
213 tot_row: usize,
214 limit: usize,
215 offset: usize,
216 visible: Vec<bool>,
217 returned: usize,
218 skipped: usize,
219 cur_row: usize,
220 }
221
222 impl MockLimitIter {
223 fn new(tot_row: usize, limit: usize, offset: usize, visible: Vec<bool>) -> Self {
224 assert_eq!(tot_row, visible.len());
225 let mut cur_row = 0;
226 while cur_row != tot_row && !visible[cur_row] {
227 cur_row += 1;
228 }
229 Self {
230 tot_row,
231 limit,
232 offset,
233 visible,
234 returned: 0,
235 skipped: 0,
236 cur_row,
237 }
238 }
239
240 fn next_visible(&mut self) {
241 self.cur_row += 1;
242 while self.cur_row != self.tot_row && !self.visible[self.cur_row] {
243 self.cur_row += 1;
244 }
245 }
246 }
247
248 impl Iterator for MockLimitIter {
249 type Item = usize;
250
251 fn next(&mut self) -> Option<Self::Item> {
252 if self.cur_row == self.tot_row {
253 return None;
254 }
255 if self.returned == self.limit {
256 return None;
257 }
258 while self.skipped < self.offset {
259 self.next_visible();
260 if self.cur_row == self.tot_row {
261 return None;
262 }
263 self.skipped += 1;
264 }
265 let ret = self.cur_row;
266 self.next_visible();
267 self.returned += 1;
268 Some(ret)
269 }
270 }
271
272 async fn test_limit_with_visibility(
273 row_num: usize,
274 chunk_size: usize,
275 limit: usize,
276 offset: usize,
277 visible: Vec<bool>,
278 ) {
279 assert_eq!(visible.len(), row_num);
280 let col0 = create_column(
281 (0..row_num)
282 .map(|x| Some(x as i32))
283 .collect_vec()
284 .as_slice(),
285 );
286
287 let visible_array = BoolArray::from_iter(visible.iter().cloned());
288
289 let col1 = visible_array.into_ref();
290 let schema = Schema {
291 fields: vec![
292 Field::unnamed(DataType::Int32),
293 Field::unnamed(DataType::Boolean),
294 ],
295 };
296 let mut mock_executor = MockExecutor::new(schema);
297
298 let data_chunk = DataChunk::new([col0, col1].to_vec(), row_num);
299
300 DataChunk::rechunk(&[data_chunk], chunk_size)
301 .unwrap()
302 .into_iter()
303 .for_each(|x| {
304 mock_executor
305 .add(x.with_visibility((x.column_at(1).as_bool()).iter().collect::<Bitmap>()))
306 });
307
308 let limit_executor = Box::new(LimitExecutor {
309 child: Box::new(mock_executor),
310 limit,
311 offset,
312 identity: "LimitExecutor2".to_owned(),
313 });
314
315 let mut results = vec![];
316 let stream = limit_executor.execute();
317 #[for_await]
318 for chunk in stream {
319 results.push(chunk.unwrap().compact());
320 }
321 let chunks =
322 DataChunk::rechunk(results.into_iter().collect_vec().as_slice(), row_num).unwrap();
323
324 if chunks.is_empty() {
325 assert_eq!(
326 MockLimitIter::new(row_num, limit, offset, visible).count(),
327 0
328 );
329 return;
330 }
331 assert_eq!(chunks.len(), 1);
332 let result = chunks.into_iter().next().unwrap();
333 let col0 = result.column_at(0);
334 let col1 = result.column_at(1);
335 assert_eq!(
336 MockLimitIter::new(row_num, limit, offset, visible.clone()).count(),
337 result.cardinality()
338 );
339 MockLimitIter::new(row_num, limit, offset, visible)
340 .zip_eq_debug(0..result.cardinality())
341 .for_each(|(expect, chunk_idx)| {
342 assert_eq!(col1.as_bool().value_at(chunk_idx), Some(true));
343 assert_eq!(col0.as_int32().value_at(chunk_idx), Some(expect as i32));
344 });
345 }
346
347 #[tokio::test]
348 async fn test_limit_executor() {
349 test_limit_all_visible(18, 18, 11, 0).await;
350 test_limit_all_visible(18, 3, 9, 0).await;
351 test_limit_all_visible(18, 3, 10, 0).await;
352 test_limit_all_visible(18, 3, 11, 0).await;
353 }
354
355 #[tokio::test]
356 async fn test_limit_executor_large() {
357 test_limit_all_visible(1024, 1024, 512, 0).await;
358 test_limit_all_visible(1024, 33, 512, 0).await;
359 test_limit_all_visible(1024, 33, 515, 0).await;
360 }
361
362 #[tokio::test]
363 async fn test_limit_executor_with_offset() {
364 for limit in 9..12 {
365 for offset in 3..6 {
366 test_limit_all_visible(18, 3, limit, offset).await;
367 }
368 }
369 }
370
371 #[tokio::test]
372 async fn test_limit_executor_with_visibility() {
373 let tot_row = 6;
374 for mask in 0..(1 << tot_row) {
375 let mut visibility = vec![];
376 for i in 0..tot_row {
377 visibility.push((mask >> i) & 1 == 1);
378 }
379 test_limit_with_visibility(tot_row, 2, 2, 2, visibility).await;
380 }
381 }
382}