risingwave_batch_executors/executor/
order_by.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::Schema;
22use risingwave_common::memory::MemoryContext;
23use risingwave_common::types::DataType;
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common::util::memcmp_encoding::encode_chunk;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_pb::Message;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::data::DataChunk as PbDataChunk;
31
32use super::{
33    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
34    WrapStreamExecutor,
35};
36use crate::error::{BatchError, Result};
37use crate::executor::merge_sort::MergeSortExecutor;
38use crate::monitor::BatchSpillMetrics;
39use crate::spill::spill_op::SpillBackend::Disk;
40use crate::spill::spill_op::{
41    DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillOp,
42};
43
44/// Sort Executor
45///
46/// High-level idea:
47/// 1. Load data chunks from child executor
48/// 2. Serialize each row into memcomparable format
49/// 3. Sort the serialized rows by quicksort
50/// 4. Build and yield data chunks according to the row order
51pub struct SortExecutor {
52    child: BoxedExecutor,
53    column_orders: Arc<Vec<ColumnOrder>>,
54    identity: String,
55    schema: Schema,
56    chunk_size: usize,
57    mem_context: MemoryContext,
58    spill_backend: Option<SpillBackend>,
59    spill_metrics: Arc<BatchSpillMetrics>,
60    /// The upper bound of memory usage for this executor.
61    memory_upper_bound: Option<u64>,
62}
63
64impl Executor for SortExecutor {
65    fn schema(&self) -> &Schema {
66        &self.schema
67    }
68
69    fn identity(&self) -> &str {
70        &self.identity
71    }
72
73    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
74        self.do_execute()
75    }
76}
77
78impl BoxedExecutorBuilder for SortExecutor {
79    async fn new_boxed_executor(
80        source: &ExecutorBuilder<'_>,
81        inputs: Vec<BoxedExecutor>,
82    ) -> Result<BoxedExecutor> {
83        let [child]: [_; 1] = inputs.try_into().unwrap();
84
85        let order_by_node =
86            try_match_expand!(source.plan_node().get_node_body().unwrap(), NodeBody::Sort)?;
87
88        let column_orders = order_by_node
89            .column_orders
90            .iter()
91            .map(ColumnOrder::from_protobuf)
92            .collect_vec();
93
94        let identity = source.plan_node().get_identity();
95        Ok(Box::new(SortExecutor::new(
96            child,
97            Arc::new(column_orders),
98            identity.clone(),
99            source.context().get_config().developer.chunk_size,
100            source.context().create_executor_mem_context(identity),
101            if source.context().get_config().enable_spill {
102                Some(Disk)
103            } else {
104                None
105            },
106            source.context().spill_metrics(),
107        )))
108    }
109}
110
111impl SortExecutor {
112    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
113    async fn do_execute(self: Box<Self>) {
114        let child_schema = self.child.schema().clone();
115        let mut need_to_spill = false;
116        // If the memory upper bound is less than 1MB, we don't need to check memory usage.
117        let check_memory = match self.memory_upper_bound {
118            Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
119            None => true,
120        };
121
122        let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
123        let mut chunks = Vec::new_in(self.mem_context.global_allocator());
124
125        let mut input_stream = self.child.execute();
126        #[for_await]
127        for chunk in &mut input_stream {
128            let chunk = chunk?.compact();
129            let chunk_estimated_heap_size = chunk.estimated_heap_size();
130            chunks.push(chunk);
131            if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
132                if self.spill_backend.is_some() {
133                    need_to_spill = true;
134                    break;
135                } else {
136                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
137                }
138            }
139        }
140
141        let mut encoded_rows =
142            Vec::with_capacity_in(chunks.len(), self.mem_context.global_allocator());
143
144        for chunk in &chunks {
145            let encoded_chunk = encode_chunk(chunk, &self.column_orders)?;
146            let chunk_estimated_heap_size = encoded_chunk
147                .iter()
148                .map(|x| x.estimated_heap_size())
149                .sum::<usize>();
150            encoded_rows.extend(
151                encoded_chunk
152                    .into_iter()
153                    .enumerate()
154                    .map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)),
155            );
156            if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
157                if self.spill_backend.is_some() {
158                    need_to_spill = true;
159                    break;
160                } else {
161                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
162                }
163            }
164        }
165
166        if need_to_spill {
167            // A spilling version of sort, a.k.a. external sort.
168            // When SortExecutor told memory is insufficient, SortSpillManager will start to partition the sort buffer and spill to disk.
169            // After spilling the sort buffer, SortSpillManager will consume all chunks from its input executor.
170            // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original input data.
171            // A sub SortExecutor would be used to sort each partition respectively and then a MergeSortExecutor would be used to merge all sorted partitions.
172            // If memory is still not enough in the sub SortExecutor, it will spill its inputs recursively.
173            info!("batch sort executor {} starts to spill out", &self.identity);
174            let mut sort_spill_manager = SortSpillManager::new(
175                self.spill_backend.clone().unwrap(),
176                &self.identity,
177                DEFAULT_SPILL_PARTITION_NUM,
178                child_schema.data_types(),
179                self.chunk_size,
180                self.spill_metrics.clone(),
181            )?;
182            sort_spill_manager.init_writers().await?;
183
184            // Release memory
185            drop(encoded_rows);
186
187            // Spill buffer
188            for chunk in chunks {
189                sort_spill_manager.write_input_chunk(chunk).await?;
190            }
191
192            // Spill input chunks.
193            #[for_await]
194            for chunk in input_stream {
195                let chunk: DataChunk = chunk?;
196                sort_spill_manager.write_input_chunk(chunk).await?;
197            }
198
199            sort_spill_manager.close_writers().await?;
200
201            let partition_num = sort_spill_manager.partition_num;
202            // Merge sorted-partitions
203            let mut sorted_inputs: Vec<BoxedExecutor> = Vec::with_capacity(partition_num);
204            for i in 0..partition_num {
205                let partition_size = sort_spill_manager.estimate_partition_size(i).await?;
206
207                let input_stream = sort_spill_manager.read_input_partition(i).await?;
208
209                let sub_sort_executor: SortExecutor = SortExecutor::new_inner(
210                    Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)),
211                    self.column_orders.clone(),
212                    format!("{}-sub{}", self.identity.clone(), i),
213                    self.chunk_size,
214                    self.mem_context.clone(),
215                    self.spill_backend.clone(),
216                    self.spill_metrics.clone(),
217                    Some(partition_size),
218                );
219
220                debug!(
221                    "create sub_sort {} for sort {} to spill",
222                    sub_sort_executor.identity, self.identity
223                );
224
225                sorted_inputs.push(Box::new(sub_sort_executor));
226            }
227
228            let merge_sort = MergeSortExecutor::new(
229                sorted_inputs,
230                self.column_orders.clone(),
231                self.schema.clone(),
232                format!("{}-merge-sort", self.identity.clone()),
233                self.chunk_size,
234                self.mem_context.clone(),
235            );
236
237            #[for_await]
238            for chunk in Box::new(merge_sort).execute() {
239                yield chunk?;
240            }
241        } else {
242            encoded_rows.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
243
244            for (row, _) in encoded_rows {
245                if let Some(spilled) = chunk_builder.append_one_row(row) {
246                    yield spilled
247                }
248            }
249
250            if let Some(spilled) = chunk_builder.consume_all() {
251                yield spilled
252            }
253        }
254    }
255}
256
257impl SortExecutor {
258    pub fn new(
259        child: BoxedExecutor,
260        column_orders: Arc<Vec<ColumnOrder>>,
261        identity: String,
262        chunk_size: usize,
263        mem_context: MemoryContext,
264        spill_backend: Option<SpillBackend>,
265        spill_metrics: Arc<BatchSpillMetrics>,
266    ) -> Self {
267        Self::new_inner(
268            child,
269            column_orders,
270            identity,
271            chunk_size,
272            mem_context,
273            spill_backend,
274            spill_metrics,
275            None,
276        )
277    }
278
279    fn new_inner(
280        child: BoxedExecutor,
281        column_orders: Arc<Vec<ColumnOrder>>,
282        identity: String,
283        chunk_size: usize,
284        mem_context: MemoryContext,
285        spill_backend: Option<SpillBackend>,
286        spill_metrics: Arc<BatchSpillMetrics>,
287        memory_upper_bound: Option<u64>,
288    ) -> Self {
289        let schema = child.schema().clone();
290        Self {
291            child,
292            column_orders,
293            identity,
294            schema,
295            chunk_size,
296            mem_context,
297            spill_backend,
298            spill_metrics,
299            memory_upper_bound,
300        }
301    }
302}
303
304/// `SortSpillManager` is used to manage how to write spill data file and read them back.
305/// The spill data first need to be partitioned in a round-robin way. Each partition contains 1 file: `input_chunks_file`
306/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes.
307/// Finally, spill file content will look like the below.
308/// The file write pattern is append-only and the read pattern is sequential scan.
309/// This can maximize the disk IO performance.
310///
311/// ```text
312/// [proto_len]
313/// [proto_bytes]
314/// ...
315/// [proto_len]
316/// [proto_bytes]
317/// ```
318struct SortSpillManager {
319    op: SpillOp,
320    partition_num: usize,
321    round_robin_idx: usize,
322    input_writers: Vec<opendal::Writer>,
323    input_chunk_builders: Vec<DataChunkBuilder>,
324    child_data_types: Vec<DataType>,
325    spill_chunk_size: usize,
326    spill_metrics: Arc<BatchSpillMetrics>,
327}
328
329impl SortSpillManager {
330    fn new(
331        spill_backend: SpillBackend,
332        agg_identity: &String,
333        partition_num: usize,
334        child_data_types: Vec<DataType>,
335        spill_chunk_size: usize,
336        spill_metrics: Arc<BatchSpillMetrics>,
337    ) -> Result<Self> {
338        let suffix_uuid = uuid::Uuid::new_v4();
339        let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
340        let op = SpillOp::create(dir, spill_backend)?;
341        let input_writers = Vec::with_capacity(partition_num);
342        let input_chunk_builders = Vec::with_capacity(partition_num);
343        Ok(Self {
344            op,
345            partition_num,
346            input_writers,
347            input_chunk_builders,
348            round_robin_idx: 0,
349            child_data_types,
350            spill_chunk_size,
351            spill_metrics,
352        })
353    }
354
355    async fn init_writers(&mut self) -> Result<()> {
356        for i in 0..self.partition_num {
357            let partition_file_name = format!("input-chunks-p{}", i);
358            let w = self.op.writer_with(&partition_file_name).await?;
359            self.input_writers.push(w);
360            self.input_chunk_builders.push(DataChunkBuilder::new(
361                self.child_data_types.clone(),
362                self.spill_chunk_size,
363            ));
364        }
365        Ok(())
366    }
367
368    async fn write_input_chunk(&mut self, chunk: DataChunk) -> Result<()> {
369        for row in chunk.rows() {
370            let partition = self.round_robin_idx;
371            if let Some(chunk) = self.input_chunk_builders[partition].append_one_row(row) {
372                let chunk_pb: PbDataChunk = chunk.to_protobuf();
373                let buf = Message::encode_to_vec(&chunk_pb);
374                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
375                self.spill_metrics
376                    .batch_spill_write_bytes
377                    .inc_by((buf.len() + len_bytes.len()) as u64);
378                self.input_writers[partition].write(len_bytes).await?;
379                self.input_writers[partition].write(buf).await?;
380            }
381            self.round_robin_idx = (self.round_robin_idx + 1) % self.partition_num;
382        }
383        Ok(())
384    }
385
386    async fn close_writers(&mut self) -> Result<()> {
387        for partition in 0..self.partition_num {
388            if let Some(output_chunk) = self.input_chunk_builders[partition].consume_all() {
389                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
390                let buf = Message::encode_to_vec(&chunk_pb);
391                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
392                self.spill_metrics
393                    .batch_spill_write_bytes
394                    .inc_by((buf.len() + len_bytes.len()) as u64);
395                self.input_writers[partition].write(len_bytes).await?;
396                self.input_writers[partition].write(buf).await?;
397            }
398        }
399
400        for mut w in self.input_writers.drain(..) {
401            w.close().await?;
402        }
403        Ok(())
404    }
405
406    async fn read_input_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
407        let input_partition_file_name = format!("input-chunks-p{}", partition);
408        let r = self.op.reader_with(&input_partition_file_name).await?;
409        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
410    }
411
412    async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
413        let input_partition_file_name = format!("input-chunks-p{}", partition);
414        let input_size = self
415            .op
416            .stat(&input_partition_file_name)
417            .await?
418            .content_length();
419        Ok(input_size)
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use futures::StreamExt;
426    use risingwave_common::array::*;
427    use risingwave_common::catalog::Field;
428    use risingwave_common::types::{Date, F32, Interval, Scalar, StructType, Time, Timestamp};
429    use risingwave_common::util::sort_util::OrderType;
430
431    use super::*;
432    use crate::executor::test_utils::MockExecutor;
433
434    const CHUNK_SIZE: usize = 1024;
435
436    #[tokio::test]
437    async fn test_simple_order_by_executor() {
438        let schema = Schema {
439            fields: vec![
440                Field::unnamed(DataType::Int32),
441                Field::unnamed(DataType::Int32),
442            ],
443        };
444        let mut mock_executor = MockExecutor::new(schema);
445        mock_executor.add(DataChunk::from_pretty(
446            "i i
447             1 3
448             2 2
449             3 1",
450        ));
451        let column_orders = vec![
452            ColumnOrder {
453                column_index: 1,
454                order_type: OrderType::ascending(),
455            },
456            ColumnOrder {
457                column_index: 0,
458                order_type: OrderType::ascending(),
459            },
460        ];
461
462        let order_by_executor = Box::new(SortExecutor::new(
463            Box::new(mock_executor),
464            Arc::new(column_orders),
465            "SortExecutor2".to_owned(),
466            CHUNK_SIZE,
467            MemoryContext::none(),
468            None,
469            BatchSpillMetrics::for_test(),
470        ));
471        let fields = &order_by_executor.schema().fields;
472        assert_eq!(fields[0].data_type, DataType::Int32);
473        assert_eq!(fields[1].data_type, DataType::Int32);
474
475        let mut stream = order_by_executor.execute();
476        let res = stream.next().await;
477        assert!(res.is_some());
478        if let Some(res) = res {
479            let res = res.unwrap();
480            let col0 = res.column_at(0);
481            assert_eq!(col0.as_int32().value_at(0), Some(3));
482            assert_eq!(col0.as_int32().value_at(1), Some(2));
483            assert_eq!(col0.as_int32().value_at(2), Some(1));
484        }
485    }
486
487    #[tokio::test]
488    async fn test_encoding_for_float() {
489        let schema = Schema {
490            fields: vec![
491                Field::unnamed(DataType::Float32),
492                Field::unnamed(DataType::Float64),
493            ],
494        };
495        let mut mock_executor = MockExecutor::new(schema);
496        mock_executor.add(DataChunk::from_pretty(
497            " f    F
498             -2.2  3.3
499             -1.1  2.2
500              1.1  1.1
501              2.2 -1.1
502              3.3 -2.2",
503        ));
504        let column_orders = vec![
505            ColumnOrder {
506                column_index: 1,
507                order_type: OrderType::ascending(),
508            },
509            ColumnOrder {
510                column_index: 0,
511                order_type: OrderType::ascending(),
512            },
513        ];
514        let order_by_executor = Box::new(SortExecutor::new(
515            Box::new(mock_executor),
516            Arc::new(column_orders),
517            "SortExecutor2".to_owned(),
518            CHUNK_SIZE,
519            MemoryContext::none(),
520            None,
521            BatchSpillMetrics::for_test(),
522        ));
523        let fields = &order_by_executor.schema().fields;
524        assert_eq!(fields[0].data_type, DataType::Float32);
525        assert_eq!(fields[1].data_type, DataType::Float64);
526
527        let mut stream = order_by_executor.execute();
528        let res = stream.next().await;
529        assert!(res.is_some());
530        if let Some(res) = res {
531            let res = res.unwrap();
532            let col0 = res.column_at(0);
533            assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
534            assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
535            assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
536            assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
537            assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
538        }
539    }
540
541    #[tokio::test]
542    async fn test_bsc_for_string() {
543        let schema = Schema {
544            fields: vec![
545                Field::unnamed(DataType::Varchar),
546                Field::unnamed(DataType::Varchar),
547            ],
548        };
549        let mut mock_executor = MockExecutor::new(schema);
550        mock_executor.add(DataChunk::from_pretty(
551            "T   T
552             1.1 3.3
553             2.2 2.2
554             3.3 1.1",
555        ));
556        let column_orders = vec![
557            ColumnOrder {
558                column_index: 1,
559                order_type: OrderType::ascending(),
560            },
561            ColumnOrder {
562                column_index: 0,
563                order_type: OrderType::ascending(),
564            },
565        ];
566        let order_by_executor = Box::new(SortExecutor::new(
567            Box::new(mock_executor),
568            Arc::new(column_orders),
569            "SortExecutor2".to_owned(),
570            CHUNK_SIZE,
571            MemoryContext::none(),
572            None,
573            BatchSpillMetrics::for_test(),
574        ));
575        let fields = &order_by_executor.schema().fields;
576        assert_eq!(fields[0].data_type, DataType::Varchar);
577        assert_eq!(fields[1].data_type, DataType::Varchar);
578
579        let mut stream = order_by_executor.execute();
580        let res = stream.next().await;
581        assert!(res.is_some());
582        if let Some(res) = res {
583            let res = res.unwrap();
584            let col0 = res.column_at(0);
585            assert_eq!(col0.as_utf8().value_at(0), Some("3.3"));
586            assert_eq!(col0.as_utf8().value_at(1), Some("2.2"));
587            assert_eq!(col0.as_utf8().value_at(2), Some("1.1"));
588        }
589    }
590
591    // TODO: write following tests in a more concise way
592    #[tokio::test]
593    async fn test_encoding_for_boolean_int32_float64() {
594        let schema = Schema {
595            fields: vec![
596                Field::unnamed(DataType::Boolean),
597                Field::unnamed(DataType::Int32),
598                Field::unnamed(DataType::Float64),
599            ],
600        };
601        // f   3    .
602        // t   3    .
603        // .   .    3.5
604        // .   .    -4.3
605        // .   .    .
606        let input_chunk = DataChunk::new(
607            vec![
608                BoolArray::from_iter([Some(false), Some(true), None, None, None]).into_ref(),
609                I32Array::from_iter([Some(3), Some(3), None, None, None]).into_ref(),
610                F64Array::from_iter([None, None, Some(3.5), Some(-4.3), None]).into_ref(),
611            ],
612            5,
613        );
614        // .   .   -4.3
615        // .   .   3.5
616        // .   .   .
617        // f   3   .
618        // t   3   .
619        let output_chunk = DataChunk::new(
620            vec![
621                BoolArray::from_iter([None, None, None, Some(false), Some(true)]).into_ref(),
622                I32Array::from_iter([None, None, None, Some(3), Some(3)]).into_ref(),
623                F64Array::from_iter([Some(-4.3), Some(3.5), None, None, None]).into_ref(),
624            ],
625            5,
626        );
627        let mut mock_executor = MockExecutor::new(schema);
628        mock_executor.add(input_chunk);
629        let column_orders = vec![
630            ColumnOrder {
631                column_index: 2,
632                order_type: OrderType::ascending(),
633            },
634            ColumnOrder {
635                column_index: 1,
636                order_type: OrderType::descending(),
637            },
638            ColumnOrder {
639                column_index: 0,
640                order_type: OrderType::ascending(),
641            },
642        ];
643        let order_by_executor = Box::new(SortExecutor::new(
644            Box::new(mock_executor),
645            Arc::new(column_orders),
646            "SortExecutor".to_owned(),
647            CHUNK_SIZE,
648            MemoryContext::none(),
649            None,
650            BatchSpillMetrics::for_test(),
651        ));
652
653        let mut stream = order_by_executor.execute();
654        let res = stream.next().await;
655        assert_eq!(res.unwrap().unwrap(), output_chunk)
656    }
657
658    #[tokio::test]
659    async fn test_encoding_for_decimal_date_varchar() {
660        let schema = Schema {
661            fields: vec![
662                Field::unnamed(DataType::Varchar),
663                Field::unnamed(DataType::Decimal),
664                Field::unnamed(DataType::Date),
665            ],
666        };
667        // abc       .     123
668        // b         -3    789
669        // abc       .     456
670        // abcdefgh  .     .
671        // b         7     345
672        let input_chunk = DataChunk::new(
673            vec![
674                Utf8Array::from_iter(["abc", "b", "abc", "abcdefgh", "b"]).into_ref(),
675                DecimalArray::from_iter([None, Some((-3).into()), None, None, Some(7.into())])
676                    .into_ref(),
677                DateArray::from_iter([
678                    Some(Date::with_days_since_ce(123).unwrap()),
679                    Some(Date::with_days_since_ce(789).unwrap()),
680                    Some(Date::with_days_since_ce(456).unwrap()),
681                    None,
682                    Some(Date::with_days_since_ce(345).unwrap()),
683                ])
684                .into_ref(),
685            ],
686            5,
687        );
688        // b         7     345
689        // b         -3    789
690        // abcdefgh  .     .
691        // abc       .     123
692        // abc       .     456
693        let output_chunk = DataChunk::new(
694            vec![
695                Utf8Array::from_iter(["b", "b", "abcdefgh", "abc", "abc"]).into_ref(),
696                DecimalArray::from_iter([Some(7.into()), Some((-3).into()), None, None, None])
697                    .into_ref(),
698                DateArray::from_iter([
699                    Some(Date::with_days_since_ce(345).unwrap()),
700                    Some(Date::with_days_since_ce(789).unwrap()),
701                    None,
702                    Some(Date::with_days_since_ce(123).unwrap()),
703                    Some(Date::with_days_since_ce(456).unwrap()),
704                ])
705                .into_ref(),
706            ],
707            5,
708        );
709        let mut mock_executor = MockExecutor::new(schema);
710        mock_executor.add(input_chunk);
711        let column_orders = vec![
712            ColumnOrder {
713                column_index: 0,
714                order_type: OrderType::descending(),
715            },
716            ColumnOrder {
717                column_index: 1,
718                order_type: OrderType::descending(),
719            },
720            ColumnOrder {
721                column_index: 2,
722                order_type: OrderType::ascending(),
723            },
724        ];
725        let order_by_executor = Box::new(SortExecutor::new(
726            Box::new(mock_executor),
727            Arc::new(column_orders),
728            "SortExecutor".to_owned(),
729            CHUNK_SIZE,
730            MemoryContext::none(),
731            None,
732            BatchSpillMetrics::for_test(),
733        ));
734
735        let mut stream = order_by_executor.execute();
736        let res = stream.next().await;
737        assert_eq!(res.unwrap().unwrap(), output_chunk)
738    }
739
740    #[tokio::test]
741    async fn test_encoding_for_time_timestamp_interval() {
742        let schema = Schema {
743            fields: vec![
744                Field::unnamed(DataType::Time),
745                Field::unnamed(DataType::Timestamp),
746                Field::unnamed(DataType::Interval),
747            ],
748        };
749        // .     1:23  .
750        // 4:56  4:56  1:2:3
751        // .     7:89  .
752        // 4:56  4:56  4:5:6
753        // 7:89  .     .
754        let input_chunk = DataChunk::new(
755            vec![
756                TimeArray::from_iter([
757                    None,
758                    Some(Time::with_secs_nano(4, 56).unwrap()),
759                    None,
760                    Some(Time::with_secs_nano(4, 56).unwrap()),
761                    Some(Time::with_secs_nano(7, 89).unwrap()),
762                ])
763                .into_ref(),
764                TimestampArray::from_iter([
765                    Some(Timestamp::with_secs_nsecs(1, 23).unwrap()),
766                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
767                    Some(Timestamp::with_secs_nsecs(7, 89).unwrap()),
768                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
769                    None,
770                ])
771                .into_ref(),
772                IntervalArray::from_iter([
773                    None,
774                    Some(Interval::from_month_day_usec(1, 2, 3)),
775                    None,
776                    Some(Interval::from_month_day_usec(4, 5, 6)),
777                    None,
778                ])
779                .into_ref(),
780            ],
781            5,
782        );
783        // 4:56  4:56  4:5:6
784        // 4:56  4:56  1:2:3
785        // 7:89  .     .
786        // .     1:23  .
787        // .     7:89  .
788        let output_chunk = DataChunk::new(
789            vec![
790                TimeArray::from_iter([
791                    Some(Time::with_secs_nano(4, 56).unwrap()),
792                    Some(Time::with_secs_nano(4, 56).unwrap()),
793                    Some(Time::with_secs_nano(7, 89).unwrap()),
794                    None,
795                    None,
796                ])
797                .into_ref(),
798                TimestampArray::from_iter([
799                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
800                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
801                    None,
802                    Some(Timestamp::with_secs_nsecs(1, 23).unwrap()),
803                    Some(Timestamp::with_secs_nsecs(7, 89).unwrap()),
804                ])
805                .into_ref(),
806                IntervalArray::from_iter([
807                    Some(Interval::from_month_day_usec(4, 5, 6)),
808                    Some(Interval::from_month_day_usec(1, 2, 3)),
809                    None,
810                    None,
811                    None,
812                ])
813                .into_ref(),
814            ],
815            5,
816        );
817        let mut mock_executor = MockExecutor::new(schema);
818        mock_executor.add(input_chunk);
819        let column_orders = vec![
820            ColumnOrder {
821                column_index: 0,
822                order_type: OrderType::ascending(),
823            },
824            ColumnOrder {
825                column_index: 1,
826                order_type: OrderType::ascending(),
827            },
828            ColumnOrder {
829                column_index: 2,
830                order_type: OrderType::descending(),
831            },
832        ];
833        let order_by_executor = Box::new(SortExecutor::new(
834            Box::new(mock_executor),
835            column_orders.into(),
836            "SortExecutor".to_owned(),
837            CHUNK_SIZE,
838            MemoryContext::none(),
839            None,
840            BatchSpillMetrics::for_test(),
841        ));
842
843        let mut stream = order_by_executor.execute();
844        let res = stream.next().await;
845        assert_eq!(res.unwrap().unwrap(), output_chunk)
846    }
847
848    #[tokio::test]
849    async fn test_encoding_for_struct_list() {
850        let schema = Schema {
851            fields: vec![
852                Field::unnamed(
853                    StructType::unnamed(vec![DataType::Varchar, DataType::Float32]).into(),
854                ),
855                Field::unnamed(DataType::Int64.list()),
856            ],
857        };
858        let mut struct_builder = StructArrayBuilder::with_type(
859            0,
860            DataType::Struct(StructType::unnamed(vec![
861                DataType::Varchar,
862                DataType::Float32,
863            ])),
864        );
865        let mut list_builder = ListArrayBuilder::with_type(0, DataType::Int64.list());
866        // {abcd, -1.2}   .
867        // {c, 0}         [1, ., 3]
868        // {c, .}         .
869        // {c, 0}         [2]
870        // {., 3.4}       .
871        let input_chunk = DataChunk::new(
872            vec![
873                {
874                    struct_builder.append(Some(StructRef::ValueRef {
875                        val: &StructValue::new(vec![
876                            Some("abcd".into()),
877                            Some(F32::from(-1.2).to_scalar_value()),
878                        ]),
879                    }));
880                    struct_builder.append(Some(StructRef::ValueRef {
881                        val: &StructValue::new(vec![
882                            Some("c".into()),
883                            Some(F32::from(0.0).to_scalar_value()),
884                        ]),
885                    }));
886                    struct_builder.append(Some(StructRef::ValueRef {
887                        val: &StructValue::new(vec![Some("c".into()), None]),
888                    }));
889                    struct_builder.append(Some(StructRef::ValueRef {
890                        val: &StructValue::new(vec![
891                            Some("c".into()),
892                            Some(F32::from(0.0).to_scalar_value()),
893                        ]),
894                    }));
895                    struct_builder.append(Some(StructRef::ValueRef {
896                        val: &StructValue::new(vec![None, Some(F32::from(3.4).to_scalar_value())]),
897                    }));
898                    struct_builder.finish().into_ref()
899                },
900                {
901                    list_builder.append(None);
902                    list_builder.append(Some(
903                        ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(),
904                    ));
905                    list_builder.append(None);
906                    list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref()));
907                    list_builder.append(None);
908                    list_builder.finish().into_ref()
909                },
910            ],
911            5,
912        );
913        let mut struct_builder = StructArrayBuilder::with_type(
914            0,
915            DataType::Struct(StructType::unnamed(vec![
916                DataType::Varchar,
917                DataType::Float32,
918            ])),
919        );
920        let mut list_builder = ListArrayBuilder::with_type(0, DataType::Int64.list());
921        // {abcd, -1.2}   .
922        // {c, 0}         [2]
923        // {c, 0}         [1, ., 3]
924        // {c, .}         .
925        // {., 3.4}       .
926        let output_chunk = DataChunk::new(
927            vec![
928                {
929                    struct_builder.append(Some(StructRef::ValueRef {
930                        val: &StructValue::new(vec![
931                            Some("abcd".into()),
932                            Some(F32::from(-1.2).to_scalar_value()),
933                        ]),
934                    }));
935                    struct_builder.append(Some(StructRef::ValueRef {
936                        val: &StructValue::new(vec![
937                            Some("c".into()),
938                            Some(F32::from(0.0).to_scalar_value()),
939                        ]),
940                    }));
941                    struct_builder.append(Some(StructRef::ValueRef {
942                        val: &StructValue::new(vec![
943                            Some("c".into()),
944                            Some(F32::from(0.0).to_scalar_value()),
945                        ]),
946                    }));
947                    struct_builder.append(Some(StructRef::ValueRef {
948                        val: &StructValue::new(vec![Some("c".into()), None]),
949                    }));
950                    struct_builder.append(Some(StructRef::ValueRef {
951                        val: &StructValue::new(vec![None, Some(F32::from(3.4).to_scalar_value())]),
952                    }));
953                    struct_builder.finish().into_ref()
954                },
955                {
956                    list_builder.append(None);
957                    list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref()));
958                    list_builder.append(Some(
959                        ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(),
960                    ));
961                    list_builder.append(None);
962                    list_builder.append(None);
963                    list_builder.finish().into_ref()
964                },
965            ],
966            5,
967        );
968        let mut mock_executor = MockExecutor::new(schema);
969        mock_executor.add(input_chunk);
970        let column_orders = vec![
971            ColumnOrder {
972                column_index: 0,
973                order_type: OrderType::ascending(),
974            },
975            ColumnOrder {
976                column_index: 1,
977                order_type: OrderType::descending(),
978            },
979        ];
980        let order_by_executor = Box::new(SortExecutor::new(
981            Box::new(mock_executor),
982            Arc::new(column_orders),
983            "SortExecutor".to_owned(),
984            CHUNK_SIZE,
985            MemoryContext::none(),
986            None,
987            BatchSpillMetrics::for_test(),
988        ));
989
990        let mut stream = order_by_executor.execute();
991        let res = stream.next().await;
992        assert_eq!(res.unwrap().unwrap(), output_chunk)
993    }
994
995    #[tokio::test]
996    async fn test_spill_out() {
997        let schema = Schema {
998            fields: vec![
999                Field::unnamed(DataType::Float32),
1000                Field::unnamed(DataType::Float64),
1001            ],
1002        };
1003        let mut mock_executor = MockExecutor::new(schema);
1004        mock_executor.add(DataChunk::from_pretty(
1005            " f    F
1006             -2.2  3.3
1007             -1.1  2.2
1008              1.1  1.1
1009              2.2 -1.1
1010              3.3 -2.2",
1011        ));
1012        let column_orders = vec![
1013            ColumnOrder {
1014                column_index: 1,
1015                order_type: OrderType::ascending(),
1016            },
1017            ColumnOrder {
1018                column_index: 0,
1019                order_type: OrderType::ascending(),
1020            },
1021        ];
1022        let order_by_executor = Box::new(SortExecutor::new(
1023            Box::new(mock_executor),
1024            Arc::new(column_orders),
1025            "SortExecutor2".to_owned(),
1026            CHUNK_SIZE,
1027            MemoryContext::for_spill_test(),
1028            Some(SpillBackend::Memory),
1029            BatchSpillMetrics::for_test(),
1030        ));
1031        let fields = &order_by_executor.schema().fields;
1032        assert_eq!(fields[0].data_type, DataType::Float32);
1033        assert_eq!(fields[1].data_type, DataType::Float64);
1034
1035        let mut stream = order_by_executor.execute();
1036        let res = stream.next().await;
1037        assert!(res.is_some());
1038        if let Some(res) = res {
1039            let res = res.unwrap();
1040            let col0 = res.column_at(0);
1041            assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
1042            assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
1043            assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
1044            assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
1045            assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
1046        }
1047    }
1048}