1use std::sync::Arc;
16
17use bytes::Bytes;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::Schema;
22use risingwave_common::memory::MemoryContext;
23use risingwave_common::types::DataType;
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common::util::memcmp_encoding::encode_chunk;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_pb::Message;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::data::DataChunk as PbDataChunk;
31
32use super::{
33 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
34 WrapStreamExecutor,
35};
36use crate::error::{BatchError, Result};
37use crate::executor::merge_sort::MergeSortExecutor;
38use crate::monitor::BatchSpillMetrics;
39use crate::spill::spill_op::SpillBackend::Disk;
40use crate::spill::spill_op::{
41 DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillOp,
42};
43
44pub struct SortExecutor {
52 child: BoxedExecutor,
53 column_orders: Arc<Vec<ColumnOrder>>,
54 identity: String,
55 schema: Schema,
56 chunk_size: usize,
57 mem_context: MemoryContext,
58 spill_backend: Option<SpillBackend>,
59 spill_metrics: Arc<BatchSpillMetrics>,
60 memory_upper_bound: Option<u64>,
62}
63
64impl Executor for SortExecutor {
65 fn schema(&self) -> &Schema {
66 &self.schema
67 }
68
69 fn identity(&self) -> &str {
70 &self.identity
71 }
72
73 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
74 self.do_execute()
75 }
76}
77
78impl BoxedExecutorBuilder for SortExecutor {
79 async fn new_boxed_executor(
80 source: &ExecutorBuilder<'_>,
81 inputs: Vec<BoxedExecutor>,
82 ) -> Result<BoxedExecutor> {
83 let [child]: [_; 1] = inputs.try_into().unwrap();
84
85 let order_by_node =
86 try_match_expand!(source.plan_node().get_node_body().unwrap(), NodeBody::Sort)?;
87
88 let column_orders = order_by_node
89 .column_orders
90 .iter()
91 .map(ColumnOrder::from_protobuf)
92 .collect_vec();
93
94 let identity = source.plan_node().get_identity();
95 Ok(Box::new(SortExecutor::new(
96 child,
97 Arc::new(column_orders),
98 identity.clone(),
99 source.context().get_config().developer.chunk_size,
100 source.context().create_executor_mem_context(identity),
101 if source.context().get_config().enable_spill {
102 Some(Disk)
103 } else {
104 None
105 },
106 source.context().spill_metrics(),
107 )))
108 }
109}
110
111impl SortExecutor {
112 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
113 async fn do_execute(self: Box<Self>) {
114 let child_schema = self.child.schema().clone();
115 let mut need_to_spill = false;
116 let check_memory = match self.memory_upper_bound {
118 Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
119 None => true,
120 };
121
122 let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
123 let mut chunks = Vec::new_in(self.mem_context.global_allocator());
124
125 let mut input_stream = self.child.execute();
126 #[for_await]
127 for chunk in &mut input_stream {
128 let chunk = chunk?.compact();
129 let chunk_estimated_heap_size = chunk.estimated_heap_size();
130 chunks.push(chunk);
131 if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
132 if self.spill_backend.is_some() {
133 need_to_spill = true;
134 break;
135 } else {
136 Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
137 }
138 }
139 }
140
141 let mut encoded_rows =
142 Vec::with_capacity_in(chunks.len(), self.mem_context.global_allocator());
143
144 for chunk in &chunks {
145 let encoded_chunk = encode_chunk(chunk, &self.column_orders)?;
146 let chunk_estimated_heap_size = encoded_chunk
147 .iter()
148 .map(|x| x.estimated_heap_size())
149 .sum::<usize>();
150 encoded_rows.extend(
151 encoded_chunk
152 .into_iter()
153 .enumerate()
154 .map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)),
155 );
156 if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
157 if self.spill_backend.is_some() {
158 need_to_spill = true;
159 break;
160 } else {
161 Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
162 }
163 }
164 }
165
166 if need_to_spill {
167 info!("batch sort executor {} starts to spill out", &self.identity);
174 let mut sort_spill_manager = SortSpillManager::new(
175 self.spill_backend.clone().unwrap(),
176 &self.identity,
177 DEFAULT_SPILL_PARTITION_NUM,
178 child_schema.data_types(),
179 self.chunk_size,
180 self.spill_metrics.clone(),
181 )?;
182 sort_spill_manager.init_writers().await?;
183
184 drop(encoded_rows);
186
187 for chunk in chunks {
189 sort_spill_manager.write_input_chunk(chunk).await?;
190 }
191
192 #[for_await]
194 for chunk in input_stream {
195 let chunk: DataChunk = chunk?;
196 sort_spill_manager.write_input_chunk(chunk).await?;
197 }
198
199 sort_spill_manager.close_writers().await?;
200
201 let partition_num = sort_spill_manager.partition_num;
202 let mut sorted_inputs: Vec<BoxedExecutor> = Vec::with_capacity(partition_num);
204 for i in 0..partition_num {
205 let partition_size = sort_spill_manager.estimate_partition_size(i).await?;
206
207 let input_stream = sort_spill_manager.read_input_partition(i).await?;
208
209 let sub_sort_executor: SortExecutor = SortExecutor::new_inner(
210 Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)),
211 self.column_orders.clone(),
212 format!("{}-sub{}", self.identity.clone(), i),
213 self.chunk_size,
214 self.mem_context.clone(),
215 self.spill_backend.clone(),
216 self.spill_metrics.clone(),
217 Some(partition_size),
218 );
219
220 debug!(
221 "create sub_sort {} for sort {} to spill",
222 sub_sort_executor.identity, self.identity
223 );
224
225 sorted_inputs.push(Box::new(sub_sort_executor));
226 }
227
228 let merge_sort = MergeSortExecutor::new(
229 sorted_inputs,
230 self.column_orders.clone(),
231 self.schema.clone(),
232 format!("{}-merge-sort", self.identity.clone()),
233 self.chunk_size,
234 self.mem_context.clone(),
235 );
236
237 #[for_await]
238 for chunk in Box::new(merge_sort).execute() {
239 yield chunk?;
240 }
241 } else {
242 encoded_rows.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
243
244 for (row, _) in encoded_rows {
245 if let Some(spilled) = chunk_builder.append_one_row(row) {
246 yield spilled
247 }
248 }
249
250 if let Some(spilled) = chunk_builder.consume_all() {
251 yield spilled
252 }
253 }
254 }
255}
256
257impl SortExecutor {
258 pub fn new(
259 child: BoxedExecutor,
260 column_orders: Arc<Vec<ColumnOrder>>,
261 identity: String,
262 chunk_size: usize,
263 mem_context: MemoryContext,
264 spill_backend: Option<SpillBackend>,
265 spill_metrics: Arc<BatchSpillMetrics>,
266 ) -> Self {
267 Self::new_inner(
268 child,
269 column_orders,
270 identity,
271 chunk_size,
272 mem_context,
273 spill_backend,
274 spill_metrics,
275 None,
276 )
277 }
278
279 fn new_inner(
280 child: BoxedExecutor,
281 column_orders: Arc<Vec<ColumnOrder>>,
282 identity: String,
283 chunk_size: usize,
284 mem_context: MemoryContext,
285 spill_backend: Option<SpillBackend>,
286 spill_metrics: Arc<BatchSpillMetrics>,
287 memory_upper_bound: Option<u64>,
288 ) -> Self {
289 let schema = child.schema().clone();
290 Self {
291 child,
292 column_orders,
293 identity,
294 schema,
295 chunk_size,
296 mem_context,
297 spill_backend,
298 spill_metrics,
299 memory_upper_bound,
300 }
301 }
302}
303
304struct SortSpillManager {
319 op: SpillOp,
320 partition_num: usize,
321 round_robin_idx: usize,
322 input_writers: Vec<opendal::Writer>,
323 input_chunk_builders: Vec<DataChunkBuilder>,
324 child_data_types: Vec<DataType>,
325 spill_chunk_size: usize,
326 spill_metrics: Arc<BatchSpillMetrics>,
327}
328
329impl SortSpillManager {
330 fn new(
331 spill_backend: SpillBackend,
332 agg_identity: &String,
333 partition_num: usize,
334 child_data_types: Vec<DataType>,
335 spill_chunk_size: usize,
336 spill_metrics: Arc<BatchSpillMetrics>,
337 ) -> Result<Self> {
338 let suffix_uuid = uuid::Uuid::new_v4();
339 let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
340 let op = SpillOp::create(dir, spill_backend)?;
341 let input_writers = Vec::with_capacity(partition_num);
342 let input_chunk_builders = Vec::with_capacity(partition_num);
343 Ok(Self {
344 op,
345 partition_num,
346 input_writers,
347 input_chunk_builders,
348 round_robin_idx: 0,
349 child_data_types,
350 spill_chunk_size,
351 spill_metrics,
352 })
353 }
354
355 async fn init_writers(&mut self) -> Result<()> {
356 for i in 0..self.partition_num {
357 let partition_file_name = format!("input-chunks-p{}", i);
358 let w = self.op.writer_with(&partition_file_name).await?;
359 self.input_writers.push(w);
360 self.input_chunk_builders.push(DataChunkBuilder::new(
361 self.child_data_types.clone(),
362 self.spill_chunk_size,
363 ));
364 }
365 Ok(())
366 }
367
368 async fn write_input_chunk(&mut self, chunk: DataChunk) -> Result<()> {
369 for row in chunk.rows() {
370 let partition = self.round_robin_idx;
371 if let Some(chunk) = self.input_chunk_builders[partition].append_one_row(row) {
372 let chunk_pb: PbDataChunk = chunk.to_protobuf();
373 let buf = Message::encode_to_vec(&chunk_pb);
374 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
375 self.spill_metrics
376 .batch_spill_write_bytes
377 .inc_by((buf.len() + len_bytes.len()) as u64);
378 self.input_writers[partition].write(len_bytes).await?;
379 self.input_writers[partition].write(buf).await?;
380 }
381 self.round_robin_idx = (self.round_robin_idx + 1) % self.partition_num;
382 }
383 Ok(())
384 }
385
386 async fn close_writers(&mut self) -> Result<()> {
387 for partition in 0..self.partition_num {
388 if let Some(output_chunk) = self.input_chunk_builders[partition].consume_all() {
389 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
390 let buf = Message::encode_to_vec(&chunk_pb);
391 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
392 self.spill_metrics
393 .batch_spill_write_bytes
394 .inc_by((buf.len() + len_bytes.len()) as u64);
395 self.input_writers[partition].write(len_bytes).await?;
396 self.input_writers[partition].write(buf).await?;
397 }
398 }
399
400 for mut w in self.input_writers.drain(..) {
401 w.close().await?;
402 }
403 Ok(())
404 }
405
406 async fn read_input_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
407 let input_partition_file_name = format!("input-chunks-p{}", partition);
408 let r = self.op.reader_with(&input_partition_file_name).await?;
409 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
410 }
411
412 async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
413 let input_partition_file_name = format!("input-chunks-p{}", partition);
414 let input_size = self
415 .op
416 .stat(&input_partition_file_name)
417 .await?
418 .content_length();
419 Ok(input_size)
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use futures::StreamExt;
426 use risingwave_common::array::*;
427 use risingwave_common::catalog::Field;
428 use risingwave_common::types::{Date, F32, Interval, Scalar, StructType, Time, Timestamp};
429 use risingwave_common::util::sort_util::OrderType;
430
431 use super::*;
432 use crate::executor::test_utils::MockExecutor;
433
434 const CHUNK_SIZE: usize = 1024;
435
436 #[tokio::test]
437 async fn test_simple_order_by_executor() {
438 let schema = Schema {
439 fields: vec![
440 Field::unnamed(DataType::Int32),
441 Field::unnamed(DataType::Int32),
442 ],
443 };
444 let mut mock_executor = MockExecutor::new(schema);
445 mock_executor.add(DataChunk::from_pretty(
446 "i i
447 1 3
448 2 2
449 3 1",
450 ));
451 let column_orders = vec![
452 ColumnOrder {
453 column_index: 1,
454 order_type: OrderType::ascending(),
455 },
456 ColumnOrder {
457 column_index: 0,
458 order_type: OrderType::ascending(),
459 },
460 ];
461
462 let order_by_executor = Box::new(SortExecutor::new(
463 Box::new(mock_executor),
464 Arc::new(column_orders),
465 "SortExecutor2".to_owned(),
466 CHUNK_SIZE,
467 MemoryContext::none(),
468 None,
469 BatchSpillMetrics::for_test(),
470 ));
471 let fields = &order_by_executor.schema().fields;
472 assert_eq!(fields[0].data_type, DataType::Int32);
473 assert_eq!(fields[1].data_type, DataType::Int32);
474
475 let mut stream = order_by_executor.execute();
476 let res = stream.next().await;
477 assert!(res.is_some());
478 if let Some(res) = res {
479 let res = res.unwrap();
480 let col0 = res.column_at(0);
481 assert_eq!(col0.as_int32().value_at(0), Some(3));
482 assert_eq!(col0.as_int32().value_at(1), Some(2));
483 assert_eq!(col0.as_int32().value_at(2), Some(1));
484 }
485 }
486
487 #[tokio::test]
488 async fn test_encoding_for_float() {
489 let schema = Schema {
490 fields: vec![
491 Field::unnamed(DataType::Float32),
492 Field::unnamed(DataType::Float64),
493 ],
494 };
495 let mut mock_executor = MockExecutor::new(schema);
496 mock_executor.add(DataChunk::from_pretty(
497 " f F
498 -2.2 3.3
499 -1.1 2.2
500 1.1 1.1
501 2.2 -1.1
502 3.3 -2.2",
503 ));
504 let column_orders = vec![
505 ColumnOrder {
506 column_index: 1,
507 order_type: OrderType::ascending(),
508 },
509 ColumnOrder {
510 column_index: 0,
511 order_type: OrderType::ascending(),
512 },
513 ];
514 let order_by_executor = Box::new(SortExecutor::new(
515 Box::new(mock_executor),
516 Arc::new(column_orders),
517 "SortExecutor2".to_owned(),
518 CHUNK_SIZE,
519 MemoryContext::none(),
520 None,
521 BatchSpillMetrics::for_test(),
522 ));
523 let fields = &order_by_executor.schema().fields;
524 assert_eq!(fields[0].data_type, DataType::Float32);
525 assert_eq!(fields[1].data_type, DataType::Float64);
526
527 let mut stream = order_by_executor.execute();
528 let res = stream.next().await;
529 assert!(res.is_some());
530 if let Some(res) = res {
531 let res = res.unwrap();
532 let col0 = res.column_at(0);
533 assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
534 assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
535 assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
536 assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
537 assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
538 }
539 }
540
541 #[tokio::test]
542 async fn test_bsc_for_string() {
543 let schema = Schema {
544 fields: vec![
545 Field::unnamed(DataType::Varchar),
546 Field::unnamed(DataType::Varchar),
547 ],
548 };
549 let mut mock_executor = MockExecutor::new(schema);
550 mock_executor.add(DataChunk::from_pretty(
551 "T T
552 1.1 3.3
553 2.2 2.2
554 3.3 1.1",
555 ));
556 let column_orders = vec![
557 ColumnOrder {
558 column_index: 1,
559 order_type: OrderType::ascending(),
560 },
561 ColumnOrder {
562 column_index: 0,
563 order_type: OrderType::ascending(),
564 },
565 ];
566 let order_by_executor = Box::new(SortExecutor::new(
567 Box::new(mock_executor),
568 Arc::new(column_orders),
569 "SortExecutor2".to_owned(),
570 CHUNK_SIZE,
571 MemoryContext::none(),
572 None,
573 BatchSpillMetrics::for_test(),
574 ));
575 let fields = &order_by_executor.schema().fields;
576 assert_eq!(fields[0].data_type, DataType::Varchar);
577 assert_eq!(fields[1].data_type, DataType::Varchar);
578
579 let mut stream = order_by_executor.execute();
580 let res = stream.next().await;
581 assert!(res.is_some());
582 if let Some(res) = res {
583 let res = res.unwrap();
584 let col0 = res.column_at(0);
585 assert_eq!(col0.as_utf8().value_at(0), Some("3.3"));
586 assert_eq!(col0.as_utf8().value_at(1), Some("2.2"));
587 assert_eq!(col0.as_utf8().value_at(2), Some("1.1"));
588 }
589 }
590
591 #[tokio::test]
593 async fn test_encoding_for_boolean_int32_float64() {
594 let schema = Schema {
595 fields: vec![
596 Field::unnamed(DataType::Boolean),
597 Field::unnamed(DataType::Int32),
598 Field::unnamed(DataType::Float64),
599 ],
600 };
601 let input_chunk = DataChunk::new(
607 vec![
608 BoolArray::from_iter([Some(false), Some(true), None, None, None]).into_ref(),
609 I32Array::from_iter([Some(3), Some(3), None, None, None]).into_ref(),
610 F64Array::from_iter([None, None, Some(3.5), Some(-4.3), None]).into_ref(),
611 ],
612 5,
613 );
614 let output_chunk = DataChunk::new(
620 vec![
621 BoolArray::from_iter([None, None, None, Some(false), Some(true)]).into_ref(),
622 I32Array::from_iter([None, None, None, Some(3), Some(3)]).into_ref(),
623 F64Array::from_iter([Some(-4.3), Some(3.5), None, None, None]).into_ref(),
624 ],
625 5,
626 );
627 let mut mock_executor = MockExecutor::new(schema);
628 mock_executor.add(input_chunk);
629 let column_orders = vec![
630 ColumnOrder {
631 column_index: 2,
632 order_type: OrderType::ascending(),
633 },
634 ColumnOrder {
635 column_index: 1,
636 order_type: OrderType::descending(),
637 },
638 ColumnOrder {
639 column_index: 0,
640 order_type: OrderType::ascending(),
641 },
642 ];
643 let order_by_executor = Box::new(SortExecutor::new(
644 Box::new(mock_executor),
645 Arc::new(column_orders),
646 "SortExecutor".to_owned(),
647 CHUNK_SIZE,
648 MemoryContext::none(),
649 None,
650 BatchSpillMetrics::for_test(),
651 ));
652
653 let mut stream = order_by_executor.execute();
654 let res = stream.next().await;
655 assert_eq!(res.unwrap().unwrap(), output_chunk)
656 }
657
658 #[tokio::test]
659 async fn test_encoding_for_decimal_date_varchar() {
660 let schema = Schema {
661 fields: vec![
662 Field::unnamed(DataType::Varchar),
663 Field::unnamed(DataType::Decimal),
664 Field::unnamed(DataType::Date),
665 ],
666 };
667 let input_chunk = DataChunk::new(
673 vec![
674 Utf8Array::from_iter(["abc", "b", "abc", "abcdefgh", "b"]).into_ref(),
675 DecimalArray::from_iter([None, Some((-3).into()), None, None, Some(7.into())])
676 .into_ref(),
677 DateArray::from_iter([
678 Some(Date::with_days_since_ce(123).unwrap()),
679 Some(Date::with_days_since_ce(789).unwrap()),
680 Some(Date::with_days_since_ce(456).unwrap()),
681 None,
682 Some(Date::with_days_since_ce(345).unwrap()),
683 ])
684 .into_ref(),
685 ],
686 5,
687 );
688 let output_chunk = DataChunk::new(
694 vec![
695 Utf8Array::from_iter(["b", "b", "abcdefgh", "abc", "abc"]).into_ref(),
696 DecimalArray::from_iter([Some(7.into()), Some((-3).into()), None, None, None])
697 .into_ref(),
698 DateArray::from_iter([
699 Some(Date::with_days_since_ce(345).unwrap()),
700 Some(Date::with_days_since_ce(789).unwrap()),
701 None,
702 Some(Date::with_days_since_ce(123).unwrap()),
703 Some(Date::with_days_since_ce(456).unwrap()),
704 ])
705 .into_ref(),
706 ],
707 5,
708 );
709 let mut mock_executor = MockExecutor::new(schema);
710 mock_executor.add(input_chunk);
711 let column_orders = vec![
712 ColumnOrder {
713 column_index: 0,
714 order_type: OrderType::descending(),
715 },
716 ColumnOrder {
717 column_index: 1,
718 order_type: OrderType::descending(),
719 },
720 ColumnOrder {
721 column_index: 2,
722 order_type: OrderType::ascending(),
723 },
724 ];
725 let order_by_executor = Box::new(SortExecutor::new(
726 Box::new(mock_executor),
727 Arc::new(column_orders),
728 "SortExecutor".to_owned(),
729 CHUNK_SIZE,
730 MemoryContext::none(),
731 None,
732 BatchSpillMetrics::for_test(),
733 ));
734
735 let mut stream = order_by_executor.execute();
736 let res = stream.next().await;
737 assert_eq!(res.unwrap().unwrap(), output_chunk)
738 }
739
740 #[tokio::test]
741 async fn test_encoding_for_time_timestamp_interval() {
742 let schema = Schema {
743 fields: vec![
744 Field::unnamed(DataType::Time),
745 Field::unnamed(DataType::Timestamp),
746 Field::unnamed(DataType::Interval),
747 ],
748 };
749 let input_chunk = DataChunk::new(
755 vec![
756 TimeArray::from_iter([
757 None,
758 Some(Time::with_secs_nano(4, 56).unwrap()),
759 None,
760 Some(Time::with_secs_nano(4, 56).unwrap()),
761 Some(Time::with_secs_nano(7, 89).unwrap()),
762 ])
763 .into_ref(),
764 TimestampArray::from_iter([
765 Some(Timestamp::with_secs_nsecs(1, 23).unwrap()),
766 Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
767 Some(Timestamp::with_secs_nsecs(7, 89).unwrap()),
768 Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
769 None,
770 ])
771 .into_ref(),
772 IntervalArray::from_iter([
773 None,
774 Some(Interval::from_month_day_usec(1, 2, 3)),
775 None,
776 Some(Interval::from_month_day_usec(4, 5, 6)),
777 None,
778 ])
779 .into_ref(),
780 ],
781 5,
782 );
783 let output_chunk = DataChunk::new(
789 vec![
790 TimeArray::from_iter([
791 Some(Time::with_secs_nano(4, 56).unwrap()),
792 Some(Time::with_secs_nano(4, 56).unwrap()),
793 Some(Time::with_secs_nano(7, 89).unwrap()),
794 None,
795 None,
796 ])
797 .into_ref(),
798 TimestampArray::from_iter([
799 Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
800 Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
801 None,
802 Some(Timestamp::with_secs_nsecs(1, 23).unwrap()),
803 Some(Timestamp::with_secs_nsecs(7, 89).unwrap()),
804 ])
805 .into_ref(),
806 IntervalArray::from_iter([
807 Some(Interval::from_month_day_usec(4, 5, 6)),
808 Some(Interval::from_month_day_usec(1, 2, 3)),
809 None,
810 None,
811 None,
812 ])
813 .into_ref(),
814 ],
815 5,
816 );
817 let mut mock_executor = MockExecutor::new(schema);
818 mock_executor.add(input_chunk);
819 let column_orders = vec![
820 ColumnOrder {
821 column_index: 0,
822 order_type: OrderType::ascending(),
823 },
824 ColumnOrder {
825 column_index: 1,
826 order_type: OrderType::ascending(),
827 },
828 ColumnOrder {
829 column_index: 2,
830 order_type: OrderType::descending(),
831 },
832 ];
833 let order_by_executor = Box::new(SortExecutor::new(
834 Box::new(mock_executor),
835 column_orders.into(),
836 "SortExecutor".to_owned(),
837 CHUNK_SIZE,
838 MemoryContext::none(),
839 None,
840 BatchSpillMetrics::for_test(),
841 ));
842
843 let mut stream = order_by_executor.execute();
844 let res = stream.next().await;
845 assert_eq!(res.unwrap().unwrap(), output_chunk)
846 }
847
848 #[tokio::test]
849 async fn test_encoding_for_struct_list() {
850 let schema = Schema {
851 fields: vec![
852 Field::unnamed(
853 StructType::unnamed(vec![DataType::Varchar, DataType::Float32]).into(),
854 ),
855 Field::unnamed(DataType::List(Box::new(DataType::Int64))),
856 ],
857 };
858 let mut struct_builder = StructArrayBuilder::with_type(
859 0,
860 DataType::Struct(StructType::unnamed(vec![
861 DataType::Varchar,
862 DataType::Float32,
863 ])),
864 );
865 let mut list_builder =
866 ListArrayBuilder::with_type(0, DataType::List(Box::new(DataType::Int64)));
867 let input_chunk = DataChunk::new(
873 vec![
874 {
875 struct_builder.append(Some(StructRef::ValueRef {
876 val: &StructValue::new(vec![
877 Some("abcd".into()),
878 Some(F32::from(-1.2).to_scalar_value()),
879 ]),
880 }));
881 struct_builder.append(Some(StructRef::ValueRef {
882 val: &StructValue::new(vec![
883 Some("c".into()),
884 Some(F32::from(0.0).to_scalar_value()),
885 ]),
886 }));
887 struct_builder.append(Some(StructRef::ValueRef {
888 val: &StructValue::new(vec![Some("c".into()), None]),
889 }));
890 struct_builder.append(Some(StructRef::ValueRef {
891 val: &StructValue::new(vec![
892 Some("c".into()),
893 Some(F32::from(0.0).to_scalar_value()),
894 ]),
895 }));
896 struct_builder.append(Some(StructRef::ValueRef {
897 val: &StructValue::new(vec![None, Some(F32::from(3.4).to_scalar_value())]),
898 }));
899 struct_builder.finish().into_ref()
900 },
901 {
902 list_builder.append(None);
903 list_builder.append(Some(
904 ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(),
905 ));
906 list_builder.append(None);
907 list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref()));
908 list_builder.append(None);
909 list_builder.finish().into_ref()
910 },
911 ],
912 5,
913 );
914 let mut struct_builder = StructArrayBuilder::with_type(
915 0,
916 DataType::Struct(StructType::unnamed(vec![
917 DataType::Varchar,
918 DataType::Float32,
919 ])),
920 );
921 let mut list_builder =
922 ListArrayBuilder::with_type(0, DataType::List(Box::new(DataType::Int64)));
923 let output_chunk = DataChunk::new(
929 vec![
930 {
931 struct_builder.append(Some(StructRef::ValueRef {
932 val: &StructValue::new(vec![
933 Some("abcd".into()),
934 Some(F32::from(-1.2).to_scalar_value()),
935 ]),
936 }));
937 struct_builder.append(Some(StructRef::ValueRef {
938 val: &StructValue::new(vec![
939 Some("c".into()),
940 Some(F32::from(0.0).to_scalar_value()),
941 ]),
942 }));
943 struct_builder.append(Some(StructRef::ValueRef {
944 val: &StructValue::new(vec![
945 Some("c".into()),
946 Some(F32::from(0.0).to_scalar_value()),
947 ]),
948 }));
949 struct_builder.append(Some(StructRef::ValueRef {
950 val: &StructValue::new(vec![Some("c".into()), None]),
951 }));
952 struct_builder.append(Some(StructRef::ValueRef {
953 val: &StructValue::new(vec![None, Some(F32::from(3.4).to_scalar_value())]),
954 }));
955 struct_builder.finish().into_ref()
956 },
957 {
958 list_builder.append(None);
959 list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref()));
960 list_builder.append(Some(
961 ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(),
962 ));
963 list_builder.append(None);
964 list_builder.append(None);
965 list_builder.finish().into_ref()
966 },
967 ],
968 5,
969 );
970 let mut mock_executor = MockExecutor::new(schema);
971 mock_executor.add(input_chunk);
972 let column_orders = vec![
973 ColumnOrder {
974 column_index: 0,
975 order_type: OrderType::ascending(),
976 },
977 ColumnOrder {
978 column_index: 1,
979 order_type: OrderType::descending(),
980 },
981 ];
982 let order_by_executor = Box::new(SortExecutor::new(
983 Box::new(mock_executor),
984 Arc::new(column_orders),
985 "SortExecutor".to_owned(),
986 CHUNK_SIZE,
987 MemoryContext::none(),
988 None,
989 BatchSpillMetrics::for_test(),
990 ));
991
992 let mut stream = order_by_executor.execute();
993 let res = stream.next().await;
994 assert_eq!(res.unwrap().unwrap(), output_chunk)
995 }
996
997 #[tokio::test]
998 async fn test_spill_out() {
999 let schema = Schema {
1000 fields: vec![
1001 Field::unnamed(DataType::Float32),
1002 Field::unnamed(DataType::Float64),
1003 ],
1004 };
1005 let mut mock_executor = MockExecutor::new(schema);
1006 mock_executor.add(DataChunk::from_pretty(
1007 " f F
1008 -2.2 3.3
1009 -1.1 2.2
1010 1.1 1.1
1011 2.2 -1.1
1012 3.3 -2.2",
1013 ));
1014 let column_orders = vec![
1015 ColumnOrder {
1016 column_index: 1,
1017 order_type: OrderType::ascending(),
1018 },
1019 ColumnOrder {
1020 column_index: 0,
1021 order_type: OrderType::ascending(),
1022 },
1023 ];
1024 let order_by_executor = Box::new(SortExecutor::new(
1025 Box::new(mock_executor),
1026 Arc::new(column_orders),
1027 "SortExecutor2".to_owned(),
1028 CHUNK_SIZE,
1029 MemoryContext::for_spill_test(),
1030 Some(SpillBackend::Memory),
1031 BatchSpillMetrics::for_test(),
1032 ));
1033 let fields = &order_by_executor.schema().fields;
1034 assert_eq!(fields[0].data_type, DataType::Float32);
1035 assert_eq!(fields[1].data_type, DataType::Float64);
1036
1037 let mut stream = order_by_executor.execute();
1038 let res = stream.next().await;
1039 assert!(res.is_some());
1040 if let Some(res) = res {
1041 let res = res.unwrap();
1042 let col0 = res.column_at(0);
1043 assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
1044 assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
1045 assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
1046 assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
1047 assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
1048 }
1049 }
1050}