1use std::ops::Range;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::{Array, ArrayBuilderImpl, ArrayImpl, DataChunk, StreamChunk};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction};
23use risingwave_expr::expr::{BoxedExpression, build_from_prost};
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25
26use crate::error::{BatchError, Result};
27use crate::executor::aggregation::build as build_agg;
28use crate::executor::{
29 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
30};
31use crate::task::ShutdownToken;
32
33pub struct SortAggExecutor {
41 aggs: Vec<BoxedAggregateFunction>,
42 group_key: Vec<BoxedExpression>,
43 child: BoxedExecutor,
44 schema: Schema,
45 identity: String,
46 output_size_limit: usize, shutdown_rx: ShutdownToken,
48}
49
50impl BoxedExecutorBuilder for SortAggExecutor {
51 async fn new_boxed_executor(
52 source: &ExecutorBuilder<'_>,
53 inputs: Vec<BoxedExecutor>,
54 ) -> Result<BoxedExecutor> {
55 let [child]: [_; 1] = inputs.try_into().unwrap();
56
57 let sort_agg_node = try_match_expand!(
58 source.plan_node().get_node_body().unwrap(),
59 NodeBody::SortAgg
60 )?;
61
62 let aggs: Vec<_> = sort_agg_node
63 .get_agg_calls()
64 .iter()
65 .map(|agg| AggCall::from_protobuf(agg).and_then(|agg| build_agg(&agg)))
66 .try_collect()?;
67
68 let group_key: Vec<_> = sort_agg_node
69 .get_group_key()
70 .iter()
71 .map(build_from_prost)
72 .try_collect()?;
73
74 let fields = group_key
75 .iter()
76 .map(|e| e.return_type())
77 .chain(aggs.iter().map(|e| e.return_type()))
78 .map(Field::unnamed)
79 .collect::<Vec<Field>>();
80
81 Ok(Box::new(Self {
82 aggs,
83 group_key,
84 child,
85 schema: Schema { fields },
86 identity: source.plan_node().get_identity().clone(),
87 output_size_limit: source.context().get_config().developer.chunk_size,
88 shutdown_rx: source.shutdown_rx().clone(),
89 }))
90 }
91}
92
93impl Executor for SortAggExecutor {
94 fn schema(&self) -> &Schema {
95 &self.schema
96 }
97
98 fn identity(&self) -> &str {
99 &self.identity
100 }
101
102 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
103 self.do_execute()
104 }
105}
106
107impl SortAggExecutor {
108 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
109 async fn do_execute(mut self: Box<Self>) {
110 let mut left_capacity = self.output_size_limit;
111 let mut agg_states: Vec<_> = self
112 .aggs
113 .iter()
114 .map(|agg| agg.create_state())
115 .try_collect()?;
116 let (mut group_builders, mut agg_builders) =
117 Self::create_builders(&self.group_key, &self.aggs);
118 let mut curr_group = if self.group_key.is_empty() {
119 Some(Vec::new())
120 } else {
121 None
122 };
123
124 #[for_await]
125 for child_chunk in self.child.execute() {
126 let child_chunk = StreamChunk::from(child_chunk?.compact());
127 let mut group_columns = Vec::with_capacity(self.group_key.len());
128 for expr in &mut self.group_key {
129 self.shutdown_rx.check()?;
130 let result = expr.eval(&child_chunk).await?;
131 group_columns.push(result);
132 }
133
134 let groups = if group_columns.is_empty() {
135 EqGroups::single_with_len(child_chunk.cardinality())
136 } else {
137 let groups: Vec<_> = group_columns
138 .iter()
139 .map(|col| EqGroups::detect(col))
140 .try_collect()?;
141 EqGroups::intersect(&groups)
142 };
143
144 for range in groups.ranges() {
145 self.shutdown_rx.check()?;
146 let group: Vec<_> = group_columns
147 .iter()
148 .map(|col| col.datum_at(range.start))
149 .collect();
150
151 if curr_group.as_ref() != Some(&group)
152 && let Some(group) = curr_group.replace(group)
153 {
154 group_builders
155 .iter_mut()
156 .zip_eq_fast(group.into_iter())
157 .for_each(|(builder, datum)| {
158 builder.append(datum);
159 });
160 Self::output_agg_states(&self.aggs, &mut agg_states, &mut agg_builders).await?;
161 left_capacity -= 1;
162
163 if left_capacity == 0 {
164 let output = DataChunk::new(
165 group_builders
166 .into_iter()
167 .chain(agg_builders)
168 .map(|b| b.finish().into())
169 .collect(),
170 self.output_size_limit,
171 );
172 yield output;
173
174 (group_builders, agg_builders) =
175 Self::create_builders(&self.group_key, &self.aggs);
176 left_capacity = self.output_size_limit;
177 }
178 }
179
180 Self::update_agg_states(&self.aggs, &mut agg_states, &child_chunk, range).await?;
181 }
182 }
183
184 if let Some(group) = curr_group.take() {
185 group_builders
186 .iter_mut()
187 .zip_eq_fast(group.into_iter())
188 .for_each(|(builder, datum)| {
189 builder.append(datum);
190 });
191 Self::output_agg_states(&self.aggs, &mut agg_states, &mut agg_builders).await?;
192 left_capacity -= 1;
193
194 let output = DataChunk::new(
195 group_builders
196 .into_iter()
197 .chain(agg_builders)
198 .map(|b| b.finish().into())
199 .collect(),
200 self.output_size_limit - left_capacity,
201 );
202 yield output;
203 }
204 }
205
206 async fn update_agg_states(
207 aggs: &[BoxedAggregateFunction],
208 agg_states: &mut [AggregateState],
209 child_chunk: &StreamChunk,
210 range: Range<usize>,
211 ) -> Result<()> {
212 for (agg, state) in aggs.iter().zip_eq_fast(agg_states.iter_mut()) {
213 agg.update_range(state, child_chunk, range.clone()).await?;
214 }
215 Ok(())
216 }
217
218 async fn output_agg_states(
219 aggs: &[BoxedAggregateFunction],
220 agg_states: &mut [AggregateState],
221 agg_builders: &mut [ArrayBuilderImpl],
222 ) -> Result<()> {
223 for ((agg, state), builder) in aggs
224 .iter()
225 .zip_eq_fast(agg_states.iter_mut())
226 .zip_eq_fast(agg_builders)
227 {
228 let result = agg.get_result(state).await?;
229 builder.append(result);
230 *state = agg.create_state()?;
231 }
232 Ok(())
233 }
234
235 fn create_builders(
236 group_key: &[BoxedExpression],
237 aggs: &[BoxedAggregateFunction],
238 ) -> (Vec<ArrayBuilderImpl>, Vec<ArrayBuilderImpl>) {
239 let group_builders = group_key
240 .iter()
241 .map(|e| e.return_type().create_array_builder(1))
242 .collect();
243
244 let agg_builders = aggs
245 .iter()
246 .map(|e| e.return_type().create_array_builder(1))
247 .collect();
248
249 (group_builders, agg_builders)
250 }
251}
252
253#[derive(Default, Debug)]
254struct EqGroups {
255 indices: Vec<usize>,
258}
259
260impl EqGroups {
261 fn new(indices: Vec<usize>) -> Self {
262 EqGroups { indices }
263 }
264
265 fn single_with_len(len: usize) -> Self {
266 EqGroups {
267 indices: vec![0, len],
268 }
269 }
270
271 fn ranges(&self) -> impl Iterator<Item = Range<usize>> + '_ {
272 EqGroupsIter {
273 indices: &self.indices,
274 curr: 0,
275 }
276 }
277
278 fn detect(array: &ArrayImpl) -> Result<EqGroups> {
280 dispatch_array_variants!(array, array, { Ok(Self::detect_inner(array)) })
281 }
282
283 fn detect_inner<T>(array: &T) -> EqGroups
284 where
285 T: Array,
286 for<'a> T::RefItem<'a>: Eq,
287 {
288 let mut indices = vec![0];
289 if array.is_empty() {
290 return EqGroups { indices };
291 }
292 let mut curr_group = array.value_at(0);
293 for i in 1..array.len() {
294 let v = array.value_at(i);
295 if v == curr_group {
296 continue;
297 }
298 curr_group = v;
299 indices.push(i);
300 }
301 indices.push(array.len());
302 EqGroups::new(indices)
303 }
304
305 fn intersect(columns: &[EqGroups]) -> EqGroups {
314 let mut indices = Vec::new();
315 use std::cmp::Reverse;
321 use std::collections::BinaryHeap;
322 let mut heap = BinaryHeap::new();
323 for (ci, column) in columns.iter().enumerate() {
324 if let Some(ri) = column.indices.first() {
325 heap.push(Reverse((ri, ci, 0)));
326 }
327 }
328 while let Some(Reverse((ri, ci, idx))) = heap.pop() {
329 if let Some(ri_next) = columns[ci].indices.get(idx + 1) {
330 heap.push(Reverse((ri_next, ci, idx + 1)));
331 }
332 if indices.last() == Some(ri) {
333 continue;
334 }
335 indices.push(*ri);
336 }
337 EqGroups::new(indices)
338 }
339}
340
341struct EqGroupsIter<'a> {
342 indices: &'a [usize],
343 curr: usize,
344}
345
346impl Iterator for EqGroupsIter<'_> {
347 type Item = Range<usize>;
348
349 fn next(&mut self) -> Option<Self::Item> {
350 if self.curr + 1 >= self.indices.len() {
351 return None;
352 }
353 let ret = self.indices[self.curr]..self.indices[self.curr + 1];
354 self.curr += 1;
355 Some(ret)
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use assert_matches::assert_matches;
362 use futures::StreamExt;
363 use futures_async_stream::for_await;
364 use risingwave_common::array::{Array as _, I64Array};
365 use risingwave_common::test_prelude::DataChunkTestExt;
366 use risingwave_common::types::DataType;
367 use risingwave_expr::expr::build_from_pretty;
368
369 use super::*;
370 use crate::executor::test_utils::MockExecutor;
371
372 #[tokio::test]
373 async fn execute_count_star_int32() -> Result<()> {
374 let schema = Schema {
376 fields: vec![
377 Field::unnamed(DataType::Int32),
378 Field::unnamed(DataType::Int32),
379 Field::unnamed(DataType::Int32),
380 ],
381 };
382 let mut child = MockExecutor::new(schema);
383 child.add(DataChunk::from_pretty(
384 "i i i
385 1 1 7
386 2 1 8
387 3 3 8
388 4 3 9",
389 ));
390 child.add(DataChunk::from_pretty(
391 "i i i
392 1 3 9
393 2 4 9
394 3 4 9
395 4 5 9",
396 ));
397 child.add(DataChunk::from_pretty(
398 "i i i
399 1 5 9
400 2 5 9
401 3 5 9
402 4 5 9",
403 ));
404
405 let count_star = build_agg(&AggCall::from_pretty("(count:int8)"))?;
406 let group_exprs: Vec<BoxedExpression> = vec![];
407 let aggs = vec![count_star];
408
409 let fields = group_exprs
411 .iter()
412 .map(|e| e.return_type())
413 .chain(aggs.iter().map(|e| e.return_type()))
414 .map(Field::unnamed)
415 .collect::<Vec<Field>>();
416
417 let executor = Box::new(SortAggExecutor {
418 aggs,
419 group_key: group_exprs,
420 child: Box::new(child),
421 schema: Schema { fields },
422 identity: "SortAggExecutor".to_owned(),
423 output_size_limit: 3,
424 shutdown_rx: ShutdownToken::empty(),
425 });
426
427 let fields = &executor.schema().fields;
428 assert_eq!(fields.len(), 1);
429 assert_eq!(fields[0].data_type, DataType::Int64);
430
431 let mut stream = executor.execute();
432 let res = stream.next().await.unwrap();
433 assert_matches!(res, Ok(_));
434 assert_matches!(stream.next().await, None);
435
436 let chunk = res?;
437 assert_eq!(chunk.cardinality(), 1);
438 let actual = chunk.column_at(0);
439 let actual_agg: &I64Array = actual.as_ref().into();
440 let v = actual_agg.iter().collect::<Vec<Option<i64>>>();
441
442 assert_eq!(v, vec![Some(12)]);
444 Ok(())
445 }
446
447 #[tokio::test]
448 async fn execute_count_star_int32_grouped() -> Result<()> {
449 let schema = Schema {
451 fields: vec![
452 Field::unnamed(DataType::Int32),
453 Field::unnamed(DataType::Int32),
454 Field::unnamed(DataType::Int32),
455 ],
456 };
457 let mut child = MockExecutor::new(schema);
458 child.add(DataChunk::from_pretty(
459 "i i i
460 1 1 7
461 2 1 8
462 3 3 8
463 4 3 9
464 5 4 9",
465 ));
466 child.add(DataChunk::from_pretty(
467 "i i i
468 1 4 9
469 2 4 9
470 3 4 9
471 4 5 9
472 5 6 9
473 6 7 9
474 7 7 9
475 8 8 9",
476 ));
477 child.add(DataChunk::from_pretty(
478 "i i i
479 1 8 9
480 2 8 9
481 3 8 9
482 4 8 9
483 5 8 9",
484 ));
485
486 let count_star = build_agg(&AggCall::from_pretty("(count:int8)"))?;
487 let group_exprs: Vec<_> = (1..=2)
488 .map(|idx| build_from_pretty(format!("${idx}:int4")))
489 .collect();
490
491 let aggs = vec![count_star];
492
493 let fields = group_exprs
495 .iter()
496 .map(|e| e.return_type())
497 .chain(aggs.iter().map(|e| e.return_type()))
498 .map(Field::unnamed)
499 .collect::<Vec<Field>>();
500
501 let executor = Box::new(SortAggExecutor {
502 aggs,
503 group_key: group_exprs,
504 child: Box::new(child),
505 schema: Schema { fields },
506 identity: "SortAggExecutor".to_owned(),
507 output_size_limit: 3,
508 shutdown_rx: ShutdownToken::empty(),
509 });
510
511 let fields = &executor.schema().fields;
512 assert_eq!(fields[0].data_type, DataType::Int32);
513 assert_eq!(fields[1].data_type, DataType::Int32);
514 assert_eq!(fields[2].data_type, DataType::Int64);
515
516 let mut stream = executor.execute();
517 let res = stream.next().await.unwrap();
518 assert_matches!(res, Ok(_));
519
520 let chunk = res?;
521 assert_eq!(chunk.cardinality(), 3);
522 let actual = chunk.column_at(2);
523 let actual_agg: &I64Array = actual.as_ref().into();
524 let v = actual_agg.iter().collect::<Vec<Option<i64>>>();
525
526 assert_eq!(v, vec![Some(1), Some(1), Some(1)]);
528 check_group_key_column(&chunk, 0, vec![Some(1), Some(1), Some(3)]);
529 check_group_key_column(&chunk, 1, vec![Some(7), Some(8), Some(8)]);
530
531 let res = stream.next().await.unwrap();
532 assert_matches!(res, Ok(_));
533
534 let chunk = res?;
535 assert_eq!(chunk.cardinality(), 3);
536 let actual = chunk.column_at(2);
537 let actual_agg: &I64Array = actual.as_ref().into();
538 let v = actual_agg.iter().collect::<Vec<Option<i64>>>();
539
540 assert_eq!(v, vec![Some(1), Some(4), Some(1)]);
541 check_group_key_column(&chunk, 0, vec![Some(3), Some(4), Some(5)]);
542 check_group_key_column(&chunk, 1, vec![Some(9), Some(9), Some(9)]);
543
544 let res = stream.next().await.unwrap();
546 assert_matches!(res, Ok(_));
547
548 let chunk = res?;
549 assert_eq!(chunk.cardinality(), 3);
550 let actual = chunk.column_at(2);
551 let actual_agg: &I64Array = actual.as_ref().into();
552 let v = actual_agg.iter().collect::<Vec<Option<i64>>>();
553
554 assert_eq!(v, vec![Some(1), Some(2), Some(6)]);
556 check_group_key_column(&chunk, 0, vec![Some(6), Some(7), Some(8)]);
557 check_group_key_column(&chunk, 1, vec![Some(9), Some(9), Some(9)]);
558
559 assert_matches!(stream.next().await, None);
560 Ok(())
561 }
562
563 #[tokio::test]
564 async fn execute_sum_int32() -> Result<()> {
565 let schema = Schema {
566 fields: vec![Field::unnamed(DataType::Int32)],
567 };
568 let mut child = MockExecutor::new(schema);
569 child.add(DataChunk::from_pretty(
570 " i
571 1
572 2
573 3
574 4
575 5
576 6
577 7
578 8
579 9
580 10",
581 ));
582
583 let sum_agg = build_agg(&AggCall::from_pretty("(sum:int8 $0:int4)"))?;
584
585 let group_exprs: Vec<BoxedExpression> = vec![];
586 let aggs = vec![sum_agg];
587 let fields = group_exprs
588 .iter()
589 .map(|e| e.return_type())
590 .chain(aggs.iter().map(|e| e.return_type()))
591 .map(Field::unnamed)
592 .collect::<Vec<Field>>();
593 let executor = Box::new(SortAggExecutor {
594 aggs,
595 group_key: vec![],
596 child: Box::new(child),
597 schema: Schema { fields },
598 identity: "SortAggExecutor".to_owned(),
599 output_size_limit: 4,
600 shutdown_rx: ShutdownToken::empty(),
601 });
602
603 let mut stream = executor.execute();
604 let chunk = stream.next().await.unwrap()?;
605 assert_matches!(stream.next().await, None);
606
607 let actual = chunk.column_at(0);
608 let actual: &I64Array = actual.as_ref().into();
609 let v = actual.iter().collect::<Vec<Option<i64>>>();
610 assert_eq!(v, vec![Some(55)]);
611
612 assert_matches!(stream.next().await, None);
613 Ok(())
614 }
615
616 #[tokio::test]
617 async fn execute_sum_int32_grouped() -> Result<()> {
618 let schema = Schema {
620 fields: vec![
621 Field::unnamed(DataType::Int32),
622 Field::unnamed(DataType::Int32),
623 Field::unnamed(DataType::Int32),
624 ],
625 };
626 let mut child = MockExecutor::new(schema);
627 child.add(DataChunk::from_pretty(
628 "i i i
629 1 1 7
630 2 1 8
631 3 3 8
632 4 3 9",
633 ));
634 child.add(DataChunk::from_pretty(
635 "i i i
636 1 3 9
637 2 4 9
638 3 4 9
639 4 5 9",
640 ));
641 child.add(DataChunk::from_pretty(
642 "i i i
643 1 5 9
644 2 5 9
645 3 5 9
646 4 5 9",
647 ));
648
649 let sum_agg = build_agg(&AggCall::from_pretty("(sum:int8 $0:int4)"))?;
650 let group_exprs: Vec<_> = (1..=2)
651 .map(|idx| build_from_pretty(format!("${idx}:int4")))
652 .collect();
653
654 let aggs = vec![sum_agg];
655
656 let fields = group_exprs
658 .iter()
659 .map(|e| e.return_type())
660 .chain(aggs.iter().map(|e| e.return_type()))
661 .map(Field::unnamed)
662 .collect::<Vec<Field>>();
663
664 let output_size_limit = 4;
665 let executor = Box::new(SortAggExecutor {
666 aggs,
667 group_key: group_exprs,
668 child: Box::new(child),
669 schema: Schema { fields },
670 identity: "SortAggExecutor".to_owned(),
671 output_size_limit,
672 shutdown_rx: ShutdownToken::empty(),
673 });
674
675 let fields = &executor.schema().fields;
676 assert_eq!(fields[0].data_type, DataType::Int32);
677 assert_eq!(fields[1].data_type, DataType::Int32);
678 assert_eq!(fields[2].data_type, DataType::Int64);
679
680 let mut stream = executor.execute();
681 let res = stream.next().await.unwrap();
682 assert_matches!(res, Ok(_));
683
684 let chunk = res?;
685 let actual = chunk.column_at(2);
686 let actual_agg: &I64Array = actual.as_ref().into();
687 let v = actual_agg.iter().collect::<Vec<Option<i64>>>();
688
689 assert_eq!(v, vec![Some(1), Some(2), Some(3), Some(5)]);
691 check_group_key_column(&chunk, 0, vec![Some(1), Some(1), Some(3), Some(3)]);
692 check_group_key_column(&chunk, 1, vec![Some(7), Some(8), Some(8), Some(9)]);
693
694 let res = stream.next().await.unwrap();
695 assert_matches!(res, Ok(_));
696
697 let chunk = res?;
698 let actual2 = chunk.column_at(2);
699 let actual_agg2: &I64Array = actual2.as_ref().into();
700 let v = actual_agg2.iter().collect::<Vec<Option<i64>>>();
701
702 assert_eq!(v, vec![Some(5), Some(14)]);
704 check_group_key_column(&chunk, 0, vec![Some(4), Some(5)]);
705 check_group_key_column(&chunk, 1, vec![Some(9), Some(9)]);
706
707 assert_matches!(stream.next().await, None);
708 Ok(())
709 }
710
711 #[tokio::test]
712 async fn execute_sum_int32_grouped_exceed_limit() -> Result<()> {
713 let schema = Schema {
715 fields: vec![
716 Field::unnamed(DataType::Int32),
717 Field::unnamed(DataType::Int32),
718 Field::unnamed(DataType::Int32),
719 ],
720 };
721 let mut child = MockExecutor::new(schema);
722 child.add(DataChunk::from_pretty(
723 " i i i
724 1 1 7
725 2 1 8
726 3 3 8
727 4 3 8
728 5 4 9
729 6 4 9
730 7 5 9
731 8 5 9
732 9 6 10
733 10 6 10",
734 ));
735 child.add(DataChunk::from_pretty(
736 " i i i
737 1 6 10
738 2 7 12",
739 ));
740
741 let sum_agg = build_agg(&AggCall::from_pretty("(sum:int8 $0:int4)"))?;
742 let group_exprs: Vec<_> = (1..=2)
743 .map(|idx| build_from_pretty(format!("${idx}:int4")))
744 .collect();
745
746 let aggs = vec![sum_agg];
747
748 let fields = group_exprs
750 .iter()
751 .map(|e| e.return_type())
752 .chain(aggs.iter().map(|e| e.return_type()))
753 .map(Field::unnamed)
754 .collect::<Vec<Field>>();
755
756 let executor = Box::new(SortAggExecutor {
757 aggs,
758 group_key: group_exprs,
759 child: Box::new(child),
760 schema: Schema { fields },
761 identity: "SortAggExecutor".to_owned(),
762 output_size_limit: 3,
763 shutdown_rx: ShutdownToken::empty(),
764 });
765
766 let fields = &executor.schema().fields;
767 assert_eq!(fields[0].data_type, DataType::Int32);
768 assert_eq!(fields[1].data_type, DataType::Int32);
769 assert_eq!(fields[2].data_type, DataType::Int64);
770
771 let mut stream = executor.execute();
773 let res = stream.next().await.unwrap();
774 assert_matches!(res, Ok(_));
775
776 let chunk = res?;
777 let actual = chunk.column_at(2);
778 let actual_agg: &I64Array = actual.as_ref().into();
779 let v = actual_agg.iter().collect::<Vec<Option<i64>>>();
780 assert_eq!(v, vec![Some(1), Some(2), Some(7)]);
781 check_group_key_column(&chunk, 0, vec![Some(1), Some(1), Some(3)]);
782 check_group_key_column(&chunk, 1, vec![Some(7), Some(8), Some(8)]);
783
784 let res = stream.next().await.unwrap();
786 assert_matches!(res, Ok(_));
787
788 let chunk = res?;
789 let actual2 = chunk.column_at(2);
790 let actual_agg2: &I64Array = actual2.as_ref().into();
791 let v = actual_agg2.iter().collect::<Vec<Option<i64>>>();
792 assert_eq!(v, vec![Some(11), Some(15), Some(20)]);
793 check_group_key_column(&chunk, 0, vec![Some(4), Some(5), Some(6)]);
794 check_group_key_column(&chunk, 1, vec![Some(9), Some(9), Some(10)]);
795
796 let res = stream.next().await.unwrap();
798 assert_matches!(res, Ok(_));
799
800 let chunk = res?;
801 let actual2 = chunk.column_at(2);
802 let actual_agg2: &I64Array = actual2.as_ref().into();
803 let v = actual_agg2.iter().collect::<Vec<Option<i64>>>();
804
805 assert_eq!(v, vec![Some(2)]);
806 check_group_key_column(&chunk, 0, vec![Some(7)]);
807 check_group_key_column(&chunk, 1, vec![Some(12)]);
808
809 assert_matches!(stream.next().await, None);
810 Ok(())
811 }
812
813 fn check_group_key_column(actual: &DataChunk, col_idx: usize, expect: Vec<Option<i32>>) {
814 assert_eq!(
815 actual
816 .column_at(col_idx)
817 .as_int32()
818 .iter()
819 .collect::<Vec<_>>(),
820 expect
821 );
822 }
823
824 #[tokio::test]
825 async fn test_shutdown_rx() -> Result<()> {
826 let child = MockExecutor::with_chunk(
827 DataChunk::from_pretty(
828 "i
829 4",
830 ),
831 Schema::new(vec![Field::unnamed(DataType::Int32)]),
832 );
833
834 let sum_agg = build_agg(&AggCall::from_pretty("(sum:int8 $0:int4)"))?;
835 let group_exprs: Vec<_> = (1..=2)
836 .map(|idx| build_from_pretty(format!("${idx}:int4")))
837 .collect();
838
839 let aggs = vec![sum_agg];
840
841 let fields = group_exprs
843 .iter()
844 .map(|e| e.return_type())
845 .chain(aggs.iter().map(|e| e.return_type()))
846 .map(Field::unnamed)
847 .collect::<Vec<Field>>();
848
849 let output_size_limit = 4;
850 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
851 let executor = Box::new(SortAggExecutor {
852 aggs,
853 group_key: group_exprs,
854 child: Box::new(child),
855 schema: Schema { fields },
856 identity: "SortAggExecutor".to_owned(),
857 output_size_limit,
858 shutdown_rx,
859 });
860 shutdown_tx.cancel();
861 #[for_await]
862 for data in executor.execute() {
863 assert!(data.is_err());
864 break;
865 }
866
867 Ok(())
868 }
869}