risingwave_batch_executors/executor/
order_by.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::Schema;
22use risingwave_common::memory::MemoryContext;
23use risingwave_common::types::DataType;
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common::util::memcmp_encoding::encode_chunk;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_pb::Message;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::data::DataChunk as PbDataChunk;
31
32use super::{
33    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
34    WrapStreamExecutor,
35};
36use crate::error::{BatchError, Result};
37use crate::executor::merge_sort::MergeSortExecutor;
38use crate::monitor::BatchSpillMetrics;
39use crate::spill::spill_op::SpillBackend::Disk;
40use crate::spill::spill_op::{
41    DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillOp,
42};
43
44/// Sort Executor
45///
46/// High-level idea:
47/// 1. Load data chunks from child executor
48/// 2. Serialize each row into memcomparable format
49/// 3. Sort the serialized rows by quicksort
50/// 4. Build and yield data chunks according to the row order
51pub struct SortExecutor {
52    child: BoxedExecutor,
53    column_orders: Arc<Vec<ColumnOrder>>,
54    identity: String,
55    schema: Schema,
56    chunk_size: usize,
57    mem_context: MemoryContext,
58    spill_backend: Option<SpillBackend>,
59    spill_metrics: Arc<BatchSpillMetrics>,
60    /// The upper bound of memory usage for this executor.
61    memory_upper_bound: Option<u64>,
62}
63
64impl Executor for SortExecutor {
65    fn schema(&self) -> &Schema {
66        &self.schema
67    }
68
69    fn identity(&self) -> &str {
70        &self.identity
71    }
72
73    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
74        self.do_execute()
75    }
76}
77
78impl BoxedExecutorBuilder for SortExecutor {
79    async fn new_boxed_executor(
80        source: &ExecutorBuilder<'_>,
81        inputs: Vec<BoxedExecutor>,
82    ) -> Result<BoxedExecutor> {
83        let [child]: [_; 1] = inputs.try_into().unwrap();
84
85        let order_by_node =
86            try_match_expand!(source.plan_node().get_node_body().unwrap(), NodeBody::Sort)?;
87
88        let column_orders = order_by_node
89            .column_orders
90            .iter()
91            .map(ColumnOrder::from_protobuf)
92            .collect_vec();
93
94        let identity = source.plan_node().get_identity();
95        Ok(Box::new(SortExecutor::new(
96            child,
97            Arc::new(column_orders),
98            identity.clone(),
99            source.context().get_config().developer.chunk_size,
100            source.context().create_executor_mem_context(identity),
101            if source.context().get_config().enable_spill {
102                Some(Disk)
103            } else {
104                None
105            },
106            source.context().spill_metrics(),
107        )))
108    }
109}
110
111impl SortExecutor {
112    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
113    async fn do_execute(self: Box<Self>) {
114        let child_schema = self.child.schema().clone();
115        let mut need_to_spill = false;
116        // If the memory upper bound is less than 1MB, we don't need to check memory usage.
117        let check_memory = match self.memory_upper_bound {
118            Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
119            None => true,
120        };
121
122        let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
123        let mut chunks = Vec::new_in(self.mem_context.global_allocator());
124
125        let mut input_stream = self.child.execute();
126        #[for_await]
127        for chunk in &mut input_stream {
128            let chunk = chunk?.compact();
129            let chunk_estimated_heap_size = chunk.estimated_heap_size();
130            chunks.push(chunk);
131            if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
132                if self.spill_backend.is_some() {
133                    need_to_spill = true;
134                    break;
135                } else {
136                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
137                }
138            }
139        }
140
141        let mut encoded_rows =
142            Vec::with_capacity_in(chunks.len(), self.mem_context.global_allocator());
143
144        for chunk in &chunks {
145            let encoded_chunk = encode_chunk(chunk, &self.column_orders)?;
146            let chunk_estimated_heap_size = encoded_chunk
147                .iter()
148                .map(|x| x.estimated_heap_size())
149                .sum::<usize>();
150            encoded_rows.extend(
151                encoded_chunk
152                    .into_iter()
153                    .enumerate()
154                    .map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)),
155            );
156            if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
157                if self.spill_backend.is_some() {
158                    need_to_spill = true;
159                    break;
160                } else {
161                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
162                }
163            }
164        }
165
166        if need_to_spill {
167            // A spilling version of sort, a.k.a. external sort.
168            // When SortExecutor told memory is insufficient, SortSpillManager will start to partition the sort buffer and spill to disk.
169            // After spilling the sort buffer, SortSpillManager will consume all chunks from its input executor.
170            // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original input data.
171            // A sub SortExecutor would be used to sort each partition respectively and then a MergeSortExecutor would be used to merge all sorted partitions.
172            // If memory is still not enough in the sub SortExecutor, it will spill its inputs recursively.
173            info!("batch sort executor {} starts to spill out", &self.identity);
174            let mut sort_spill_manager = SortSpillManager::new(
175                self.spill_backend.clone().unwrap(),
176                &self.identity,
177                DEFAULT_SPILL_PARTITION_NUM,
178                child_schema.data_types(),
179                self.chunk_size,
180                self.spill_metrics.clone(),
181            )?;
182            sort_spill_manager.init_writers().await?;
183
184            // Release memory
185            drop(encoded_rows);
186
187            // Spill buffer
188            for chunk in chunks {
189                sort_spill_manager.write_input_chunk(chunk).await?;
190            }
191
192            // Spill input chunks.
193            #[for_await]
194            for chunk in input_stream {
195                let chunk: DataChunk = chunk?;
196                sort_spill_manager.write_input_chunk(chunk).await?;
197            }
198
199            sort_spill_manager.close_writers().await?;
200
201            let partition_num = sort_spill_manager.partition_num;
202            // Merge sorted-partitions
203            let mut sorted_inputs: Vec<BoxedExecutor> = Vec::with_capacity(partition_num);
204            for i in 0..partition_num {
205                let partition_size = sort_spill_manager.estimate_partition_size(i).await?;
206
207                let input_stream = sort_spill_manager.read_input_partition(i).await?;
208
209                let sub_sort_executor: SortExecutor = SortExecutor::new_inner(
210                    Box::new(WrapStreamExecutor::new(child_schema.clone(), input_stream)),
211                    self.column_orders.clone(),
212                    format!("{}-sub{}", self.identity.clone(), i),
213                    self.chunk_size,
214                    self.mem_context.clone(),
215                    self.spill_backend.clone(),
216                    self.spill_metrics.clone(),
217                    Some(partition_size),
218                );
219
220                debug!(
221                    "create sub_sort {} for sort {} to spill",
222                    sub_sort_executor.identity, self.identity
223                );
224
225                sorted_inputs.push(Box::new(sub_sort_executor));
226            }
227
228            let merge_sort = MergeSortExecutor::new(
229                sorted_inputs,
230                self.column_orders.clone(),
231                self.schema.clone(),
232                format!("{}-merge-sort", self.identity.clone()),
233                self.chunk_size,
234                self.mem_context.clone(),
235            );
236
237            #[for_await]
238            for chunk in Box::new(merge_sort).execute() {
239                yield chunk?;
240            }
241        } else {
242            encoded_rows.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
243
244            for (row, _) in encoded_rows {
245                if let Some(spilled) = chunk_builder.append_one_row(row) {
246                    yield spilled
247                }
248            }
249
250            if let Some(spilled) = chunk_builder.consume_all() {
251                yield spilled
252            }
253        }
254    }
255}
256
257impl SortExecutor {
258    pub fn new(
259        child: BoxedExecutor,
260        column_orders: Arc<Vec<ColumnOrder>>,
261        identity: String,
262        chunk_size: usize,
263        mem_context: MemoryContext,
264        spill_backend: Option<SpillBackend>,
265        spill_metrics: Arc<BatchSpillMetrics>,
266    ) -> Self {
267        Self::new_inner(
268            child,
269            column_orders,
270            identity,
271            chunk_size,
272            mem_context,
273            spill_backend,
274            spill_metrics,
275            None,
276        )
277    }
278
279    fn new_inner(
280        child: BoxedExecutor,
281        column_orders: Arc<Vec<ColumnOrder>>,
282        identity: String,
283        chunk_size: usize,
284        mem_context: MemoryContext,
285        spill_backend: Option<SpillBackend>,
286        spill_metrics: Arc<BatchSpillMetrics>,
287        memory_upper_bound: Option<u64>,
288    ) -> Self {
289        let schema = child.schema().clone();
290        Self {
291            child,
292            column_orders,
293            identity,
294            schema,
295            chunk_size,
296            mem_context,
297            spill_backend,
298            spill_metrics,
299            memory_upper_bound,
300        }
301    }
302}
303
304/// `SortSpillManager` is used to manage how to write spill data file and read them back.
305/// The spill data first need to be partitioned in a round-robin way. Each partition contains 1 file: `input_chunks_file`
306/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes.
307/// Finally, spill file content will look like the below.
308/// The file write pattern is append-only and the read pattern is sequential scan.
309/// This can maximize the disk IO performance.
310///
311/// ```text
312/// [proto_len]
313/// [proto_bytes]
314/// ...
315/// [proto_len]
316/// [proto_bytes]
317/// ```
318struct SortSpillManager {
319    op: SpillOp,
320    partition_num: usize,
321    round_robin_idx: usize,
322    input_writers: Vec<opendal::Writer>,
323    input_chunk_builders: Vec<DataChunkBuilder>,
324    child_data_types: Vec<DataType>,
325    spill_chunk_size: usize,
326    spill_metrics: Arc<BatchSpillMetrics>,
327}
328
329impl SortSpillManager {
330    fn new(
331        spill_backend: SpillBackend,
332        agg_identity: &String,
333        partition_num: usize,
334        child_data_types: Vec<DataType>,
335        spill_chunk_size: usize,
336        spill_metrics: Arc<BatchSpillMetrics>,
337    ) -> Result<Self> {
338        let suffix_uuid = uuid::Uuid::new_v4();
339        let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
340        let op = SpillOp::create(dir, spill_backend)?;
341        let input_writers = Vec::with_capacity(partition_num);
342        let input_chunk_builders = Vec::with_capacity(partition_num);
343        Ok(Self {
344            op,
345            partition_num,
346            input_writers,
347            input_chunk_builders,
348            round_robin_idx: 0,
349            child_data_types,
350            spill_chunk_size,
351            spill_metrics,
352        })
353    }
354
355    async fn init_writers(&mut self) -> Result<()> {
356        for i in 0..self.partition_num {
357            let partition_file_name = format!("input-chunks-p{}", i);
358            let w = self.op.writer_with(&partition_file_name).await?;
359            self.input_writers.push(w);
360            self.input_chunk_builders.push(DataChunkBuilder::new(
361                self.child_data_types.clone(),
362                self.spill_chunk_size,
363            ));
364        }
365        Ok(())
366    }
367
368    async fn write_input_chunk(&mut self, chunk: DataChunk) -> Result<()> {
369        for row in chunk.rows() {
370            let partition = self.round_robin_idx;
371            if let Some(chunk) = self.input_chunk_builders[partition].append_one_row(row) {
372                let chunk_pb: PbDataChunk = chunk.to_protobuf();
373                let buf = Message::encode_to_vec(&chunk_pb);
374                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
375                self.spill_metrics
376                    .batch_spill_write_bytes
377                    .inc_by((buf.len() + len_bytes.len()) as u64);
378                self.input_writers[partition].write(len_bytes).await?;
379                self.input_writers[partition].write(buf).await?;
380            }
381            self.round_robin_idx = (self.round_robin_idx + 1) % self.partition_num;
382        }
383        Ok(())
384    }
385
386    async fn close_writers(&mut self) -> Result<()> {
387        for partition in 0..self.partition_num {
388            if let Some(output_chunk) = self.input_chunk_builders[partition].consume_all() {
389                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
390                let buf = Message::encode_to_vec(&chunk_pb);
391                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
392                self.spill_metrics
393                    .batch_spill_write_bytes
394                    .inc_by((buf.len() + len_bytes.len()) as u64);
395                self.input_writers[partition].write(len_bytes).await?;
396                self.input_writers[partition].write(buf).await?;
397            }
398        }
399
400        for mut w in self.input_writers.drain(..) {
401            w.close().await?;
402        }
403        Ok(())
404    }
405
406    async fn read_input_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
407        let input_partition_file_name = format!("input-chunks-p{}", partition);
408        let r = self.op.reader_with(&input_partition_file_name).await?;
409        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
410    }
411
412    async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
413        let input_partition_file_name = format!("input-chunks-p{}", partition);
414        let input_size = self
415            .op
416            .stat(&input_partition_file_name)
417            .await?
418            .content_length();
419        Ok(input_size)
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use futures::StreamExt;
426    use risingwave_common::array::*;
427    use risingwave_common::catalog::Field;
428    use risingwave_common::types::{Date, F32, Interval, Scalar, StructType, Time, Timestamp};
429    use risingwave_common::util::sort_util::OrderType;
430
431    use super::*;
432    use crate::executor::test_utils::MockExecutor;
433
434    const CHUNK_SIZE: usize = 1024;
435
436    #[tokio::test]
437    async fn test_simple_order_by_executor() {
438        let schema = Schema {
439            fields: vec![
440                Field::unnamed(DataType::Int32),
441                Field::unnamed(DataType::Int32),
442            ],
443        };
444        let mut mock_executor = MockExecutor::new(schema);
445        mock_executor.add(DataChunk::from_pretty(
446            "i i
447             1 3
448             2 2
449             3 1",
450        ));
451        let column_orders = vec![
452            ColumnOrder {
453                column_index: 1,
454                order_type: OrderType::ascending(),
455            },
456            ColumnOrder {
457                column_index: 0,
458                order_type: OrderType::ascending(),
459            },
460        ];
461
462        let order_by_executor = Box::new(SortExecutor::new(
463            Box::new(mock_executor),
464            Arc::new(column_orders),
465            "SortExecutor2".to_owned(),
466            CHUNK_SIZE,
467            MemoryContext::none(),
468            None,
469            BatchSpillMetrics::for_test(),
470        ));
471        let fields = &order_by_executor.schema().fields;
472        assert_eq!(fields[0].data_type, DataType::Int32);
473        assert_eq!(fields[1].data_type, DataType::Int32);
474
475        let mut stream = order_by_executor.execute();
476        let res = stream.next().await;
477        assert!(res.is_some());
478        if let Some(res) = res {
479            let res = res.unwrap();
480            let col0 = res.column_at(0);
481            assert_eq!(col0.as_int32().value_at(0), Some(3));
482            assert_eq!(col0.as_int32().value_at(1), Some(2));
483            assert_eq!(col0.as_int32().value_at(2), Some(1));
484        }
485    }
486
487    #[tokio::test]
488    async fn test_encoding_for_float() {
489        let schema = Schema {
490            fields: vec![
491                Field::unnamed(DataType::Float32),
492                Field::unnamed(DataType::Float64),
493            ],
494        };
495        let mut mock_executor = MockExecutor::new(schema);
496        mock_executor.add(DataChunk::from_pretty(
497            " f    F
498             -2.2  3.3
499             -1.1  2.2
500              1.1  1.1
501              2.2 -1.1
502              3.3 -2.2",
503        ));
504        let column_orders = vec![
505            ColumnOrder {
506                column_index: 1,
507                order_type: OrderType::ascending(),
508            },
509            ColumnOrder {
510                column_index: 0,
511                order_type: OrderType::ascending(),
512            },
513        ];
514        let order_by_executor = Box::new(SortExecutor::new(
515            Box::new(mock_executor),
516            Arc::new(column_orders),
517            "SortExecutor2".to_owned(),
518            CHUNK_SIZE,
519            MemoryContext::none(),
520            None,
521            BatchSpillMetrics::for_test(),
522        ));
523        let fields = &order_by_executor.schema().fields;
524        assert_eq!(fields[0].data_type, DataType::Float32);
525        assert_eq!(fields[1].data_type, DataType::Float64);
526
527        let mut stream = order_by_executor.execute();
528        let res = stream.next().await;
529        assert!(res.is_some());
530        if let Some(res) = res {
531            let res = res.unwrap();
532            let col0 = res.column_at(0);
533            assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
534            assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
535            assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
536            assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
537            assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
538        }
539    }
540
541    #[tokio::test]
542    async fn test_bsc_for_string() {
543        let schema = Schema {
544            fields: vec![
545                Field::unnamed(DataType::Varchar),
546                Field::unnamed(DataType::Varchar),
547            ],
548        };
549        let mut mock_executor = MockExecutor::new(schema);
550        mock_executor.add(DataChunk::from_pretty(
551            "T   T
552             1.1 3.3
553             2.2 2.2
554             3.3 1.1",
555        ));
556        let column_orders = vec![
557            ColumnOrder {
558                column_index: 1,
559                order_type: OrderType::ascending(),
560            },
561            ColumnOrder {
562                column_index: 0,
563                order_type: OrderType::ascending(),
564            },
565        ];
566        let order_by_executor = Box::new(SortExecutor::new(
567            Box::new(mock_executor),
568            Arc::new(column_orders),
569            "SortExecutor2".to_owned(),
570            CHUNK_SIZE,
571            MemoryContext::none(),
572            None,
573            BatchSpillMetrics::for_test(),
574        ));
575        let fields = &order_by_executor.schema().fields;
576        assert_eq!(fields[0].data_type, DataType::Varchar);
577        assert_eq!(fields[1].data_type, DataType::Varchar);
578
579        let mut stream = order_by_executor.execute();
580        let res = stream.next().await;
581        assert!(res.is_some());
582        if let Some(res) = res {
583            let res = res.unwrap();
584            let col0 = res.column_at(0);
585            assert_eq!(col0.as_utf8().value_at(0), Some("3.3"));
586            assert_eq!(col0.as_utf8().value_at(1), Some("2.2"));
587            assert_eq!(col0.as_utf8().value_at(2), Some("1.1"));
588        }
589    }
590
591    // TODO: write following tests in a more concise way
592    #[tokio::test]
593    async fn test_encoding_for_boolean_int32_float64() {
594        let schema = Schema {
595            fields: vec![
596                Field::unnamed(DataType::Boolean),
597                Field::unnamed(DataType::Int32),
598                Field::unnamed(DataType::Float64),
599            ],
600        };
601        // f   3    .
602        // t   3    .
603        // .   .    3.5
604        // .   .    -4.3
605        // .   .    .
606        let input_chunk = DataChunk::new(
607            vec![
608                BoolArray::from_iter([Some(false), Some(true), None, None, None]).into_ref(),
609                I32Array::from_iter([Some(3), Some(3), None, None, None]).into_ref(),
610                F64Array::from_iter([None, None, Some(3.5), Some(-4.3), None]).into_ref(),
611            ],
612            5,
613        );
614        // .   .   -4.3
615        // .   .   3.5
616        // .   .   .
617        // f   3   .
618        // t   3   .
619        let output_chunk = DataChunk::new(
620            vec![
621                BoolArray::from_iter([None, None, None, Some(false), Some(true)]).into_ref(),
622                I32Array::from_iter([None, None, None, Some(3), Some(3)]).into_ref(),
623                F64Array::from_iter([Some(-4.3), Some(3.5), None, None, None]).into_ref(),
624            ],
625            5,
626        );
627        let mut mock_executor = MockExecutor::new(schema);
628        mock_executor.add(input_chunk);
629        let column_orders = vec![
630            ColumnOrder {
631                column_index: 2,
632                order_type: OrderType::ascending(),
633            },
634            ColumnOrder {
635                column_index: 1,
636                order_type: OrderType::descending(),
637            },
638            ColumnOrder {
639                column_index: 0,
640                order_type: OrderType::ascending(),
641            },
642        ];
643        let order_by_executor = Box::new(SortExecutor::new(
644            Box::new(mock_executor),
645            Arc::new(column_orders),
646            "SortExecutor".to_owned(),
647            CHUNK_SIZE,
648            MemoryContext::none(),
649            None,
650            BatchSpillMetrics::for_test(),
651        ));
652
653        let mut stream = order_by_executor.execute();
654        let res = stream.next().await;
655        assert_eq!(res.unwrap().unwrap(), output_chunk)
656    }
657
658    #[tokio::test]
659    async fn test_encoding_for_decimal_date_varchar() {
660        let schema = Schema {
661            fields: vec![
662                Field::unnamed(DataType::Varchar),
663                Field::unnamed(DataType::Decimal),
664                Field::unnamed(DataType::Date),
665            ],
666        };
667        // abc       .     123
668        // b         -3    789
669        // abc       .     456
670        // abcdefgh  .     .
671        // b         7     345
672        let input_chunk = DataChunk::new(
673            vec![
674                Utf8Array::from_iter(["abc", "b", "abc", "abcdefgh", "b"]).into_ref(),
675                DecimalArray::from_iter([None, Some((-3).into()), None, None, Some(7.into())])
676                    .into_ref(),
677                DateArray::from_iter([
678                    Some(Date::with_days_since_ce(123).unwrap()),
679                    Some(Date::with_days_since_ce(789).unwrap()),
680                    Some(Date::with_days_since_ce(456).unwrap()),
681                    None,
682                    Some(Date::with_days_since_ce(345).unwrap()),
683                ])
684                .into_ref(),
685            ],
686            5,
687        );
688        // b         7     345
689        // b         -3    789
690        // abcdefgh  .     .
691        // abc       .     123
692        // abc       .     456
693        let output_chunk = DataChunk::new(
694            vec![
695                Utf8Array::from_iter(["b", "b", "abcdefgh", "abc", "abc"]).into_ref(),
696                DecimalArray::from_iter([Some(7.into()), Some((-3).into()), None, None, None])
697                    .into_ref(),
698                DateArray::from_iter([
699                    Some(Date::with_days_since_ce(345).unwrap()),
700                    Some(Date::with_days_since_ce(789).unwrap()),
701                    None,
702                    Some(Date::with_days_since_ce(123).unwrap()),
703                    Some(Date::with_days_since_ce(456).unwrap()),
704                ])
705                .into_ref(),
706            ],
707            5,
708        );
709        let mut mock_executor = MockExecutor::new(schema);
710        mock_executor.add(input_chunk);
711        let column_orders = vec![
712            ColumnOrder {
713                column_index: 0,
714                order_type: OrderType::descending(),
715            },
716            ColumnOrder {
717                column_index: 1,
718                order_type: OrderType::descending(),
719            },
720            ColumnOrder {
721                column_index: 2,
722                order_type: OrderType::ascending(),
723            },
724        ];
725        let order_by_executor = Box::new(SortExecutor::new(
726            Box::new(mock_executor),
727            Arc::new(column_orders),
728            "SortExecutor".to_owned(),
729            CHUNK_SIZE,
730            MemoryContext::none(),
731            None,
732            BatchSpillMetrics::for_test(),
733        ));
734
735        let mut stream = order_by_executor.execute();
736        let res = stream.next().await;
737        assert_eq!(res.unwrap().unwrap(), output_chunk)
738    }
739
740    #[tokio::test]
741    async fn test_encoding_for_time_timestamp_interval() {
742        let schema = Schema {
743            fields: vec![
744                Field::unnamed(DataType::Time),
745                Field::unnamed(DataType::Timestamp),
746                Field::unnamed(DataType::Interval),
747            ],
748        };
749        // .     1:23  .
750        // 4:56  4:56  1:2:3
751        // .     7:89  .
752        // 4:56  4:56  4:5:6
753        // 7:89  .     .
754        let input_chunk = DataChunk::new(
755            vec![
756                TimeArray::from_iter([
757                    None,
758                    Some(Time::with_secs_nano(4, 56).unwrap()),
759                    None,
760                    Some(Time::with_secs_nano(4, 56).unwrap()),
761                    Some(Time::with_secs_nano(7, 89).unwrap()),
762                ])
763                .into_ref(),
764                TimestampArray::from_iter([
765                    Some(Timestamp::with_secs_nsecs(1, 23).unwrap()),
766                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
767                    Some(Timestamp::with_secs_nsecs(7, 89).unwrap()),
768                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
769                    None,
770                ])
771                .into_ref(),
772                IntervalArray::from_iter([
773                    None,
774                    Some(Interval::from_month_day_usec(1, 2, 3)),
775                    None,
776                    Some(Interval::from_month_day_usec(4, 5, 6)),
777                    None,
778                ])
779                .into_ref(),
780            ],
781            5,
782        );
783        // 4:56  4:56  4:5:6
784        // 4:56  4:56  1:2:3
785        // 7:89  .     .
786        // .     1:23  .
787        // .     7:89  .
788        let output_chunk = DataChunk::new(
789            vec![
790                TimeArray::from_iter([
791                    Some(Time::with_secs_nano(4, 56).unwrap()),
792                    Some(Time::with_secs_nano(4, 56).unwrap()),
793                    Some(Time::with_secs_nano(7, 89).unwrap()),
794                    None,
795                    None,
796                ])
797                .into_ref(),
798                TimestampArray::from_iter([
799                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
800                    Some(Timestamp::with_secs_nsecs(4, 56).unwrap()),
801                    None,
802                    Some(Timestamp::with_secs_nsecs(1, 23).unwrap()),
803                    Some(Timestamp::with_secs_nsecs(7, 89).unwrap()),
804                ])
805                .into_ref(),
806                IntervalArray::from_iter([
807                    Some(Interval::from_month_day_usec(4, 5, 6)),
808                    Some(Interval::from_month_day_usec(1, 2, 3)),
809                    None,
810                    None,
811                    None,
812                ])
813                .into_ref(),
814            ],
815            5,
816        );
817        let mut mock_executor = MockExecutor::new(schema);
818        mock_executor.add(input_chunk);
819        let column_orders = vec![
820            ColumnOrder {
821                column_index: 0,
822                order_type: OrderType::ascending(),
823            },
824            ColumnOrder {
825                column_index: 1,
826                order_type: OrderType::ascending(),
827            },
828            ColumnOrder {
829                column_index: 2,
830                order_type: OrderType::descending(),
831            },
832        ];
833        let order_by_executor = Box::new(SortExecutor::new(
834            Box::new(mock_executor),
835            column_orders.into(),
836            "SortExecutor".to_owned(),
837            CHUNK_SIZE,
838            MemoryContext::none(),
839            None,
840            BatchSpillMetrics::for_test(),
841        ));
842
843        let mut stream = order_by_executor.execute();
844        let res = stream.next().await;
845        assert_eq!(res.unwrap().unwrap(), output_chunk)
846    }
847
848    #[tokio::test]
849    async fn test_encoding_for_struct_list() {
850        let schema = Schema {
851            fields: vec![
852                Field::unnamed(
853                    StructType::unnamed(vec![DataType::Varchar, DataType::Float32]).into(),
854                ),
855                Field::unnamed(DataType::List(Box::new(DataType::Int64))),
856            ],
857        };
858        let mut struct_builder = StructArrayBuilder::with_type(
859            0,
860            DataType::Struct(StructType::unnamed(vec![
861                DataType::Varchar,
862                DataType::Float32,
863            ])),
864        );
865        let mut list_builder =
866            ListArrayBuilder::with_type(0, DataType::List(Box::new(DataType::Int64)));
867        // {abcd, -1.2}   .
868        // {c, 0}         [1, ., 3]
869        // {c, .}         .
870        // {c, 0}         [2]
871        // {., 3.4}       .
872        let input_chunk = DataChunk::new(
873            vec![
874                {
875                    struct_builder.append(Some(StructRef::ValueRef {
876                        val: &StructValue::new(vec![
877                            Some("abcd".into()),
878                            Some(F32::from(-1.2).to_scalar_value()),
879                        ]),
880                    }));
881                    struct_builder.append(Some(StructRef::ValueRef {
882                        val: &StructValue::new(vec![
883                            Some("c".into()),
884                            Some(F32::from(0.0).to_scalar_value()),
885                        ]),
886                    }));
887                    struct_builder.append(Some(StructRef::ValueRef {
888                        val: &StructValue::new(vec![Some("c".into()), None]),
889                    }));
890                    struct_builder.append(Some(StructRef::ValueRef {
891                        val: &StructValue::new(vec![
892                            Some("c".into()),
893                            Some(F32::from(0.0).to_scalar_value()),
894                        ]),
895                    }));
896                    struct_builder.append(Some(StructRef::ValueRef {
897                        val: &StructValue::new(vec![None, Some(F32::from(3.4).to_scalar_value())]),
898                    }));
899                    struct_builder.finish().into_ref()
900                },
901                {
902                    list_builder.append(None);
903                    list_builder.append(Some(
904                        ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(),
905                    ));
906                    list_builder.append(None);
907                    list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref()));
908                    list_builder.append(None);
909                    list_builder.finish().into_ref()
910                },
911            ],
912            5,
913        );
914        let mut struct_builder = StructArrayBuilder::with_type(
915            0,
916            DataType::Struct(StructType::unnamed(vec![
917                DataType::Varchar,
918                DataType::Float32,
919            ])),
920        );
921        let mut list_builder =
922            ListArrayBuilder::with_type(0, DataType::List(Box::new(DataType::Int64)));
923        // {abcd, -1.2}   .
924        // {c, 0}         [2]
925        // {c, 0}         [1, ., 3]
926        // {c, .}         .
927        // {., 3.4}       .
928        let output_chunk = DataChunk::new(
929            vec![
930                {
931                    struct_builder.append(Some(StructRef::ValueRef {
932                        val: &StructValue::new(vec![
933                            Some("abcd".into()),
934                            Some(F32::from(-1.2).to_scalar_value()),
935                        ]),
936                    }));
937                    struct_builder.append(Some(StructRef::ValueRef {
938                        val: &StructValue::new(vec![
939                            Some("c".into()),
940                            Some(F32::from(0.0).to_scalar_value()),
941                        ]),
942                    }));
943                    struct_builder.append(Some(StructRef::ValueRef {
944                        val: &StructValue::new(vec![
945                            Some("c".into()),
946                            Some(F32::from(0.0).to_scalar_value()),
947                        ]),
948                    }));
949                    struct_builder.append(Some(StructRef::ValueRef {
950                        val: &StructValue::new(vec![Some("c".into()), None]),
951                    }));
952                    struct_builder.append(Some(StructRef::ValueRef {
953                        val: &StructValue::new(vec![None, Some(F32::from(3.4).to_scalar_value())]),
954                    }));
955                    struct_builder.finish().into_ref()
956                },
957                {
958                    list_builder.append(None);
959                    list_builder.append(Some(ListValue::from_iter([2i64]).as_scalar_ref()));
960                    list_builder.append(Some(
961                        ListValue::from_iter([Some(1i64), None, Some(3i64)]).as_scalar_ref(),
962                    ));
963                    list_builder.append(None);
964                    list_builder.append(None);
965                    list_builder.finish().into_ref()
966                },
967            ],
968            5,
969        );
970        let mut mock_executor = MockExecutor::new(schema);
971        mock_executor.add(input_chunk);
972        let column_orders = vec![
973            ColumnOrder {
974                column_index: 0,
975                order_type: OrderType::ascending(),
976            },
977            ColumnOrder {
978                column_index: 1,
979                order_type: OrderType::descending(),
980            },
981        ];
982        let order_by_executor = Box::new(SortExecutor::new(
983            Box::new(mock_executor),
984            Arc::new(column_orders),
985            "SortExecutor".to_owned(),
986            CHUNK_SIZE,
987            MemoryContext::none(),
988            None,
989            BatchSpillMetrics::for_test(),
990        ));
991
992        let mut stream = order_by_executor.execute();
993        let res = stream.next().await;
994        assert_eq!(res.unwrap().unwrap(), output_chunk)
995    }
996
997    #[tokio::test]
998    async fn test_spill_out() {
999        let schema = Schema {
1000            fields: vec![
1001                Field::unnamed(DataType::Float32),
1002                Field::unnamed(DataType::Float64),
1003            ],
1004        };
1005        let mut mock_executor = MockExecutor::new(schema);
1006        mock_executor.add(DataChunk::from_pretty(
1007            " f    F
1008             -2.2  3.3
1009             -1.1  2.2
1010              1.1  1.1
1011              2.2 -1.1
1012              3.3 -2.2",
1013        ));
1014        let column_orders = vec![
1015            ColumnOrder {
1016                column_index: 1,
1017                order_type: OrderType::ascending(),
1018            },
1019            ColumnOrder {
1020                column_index: 0,
1021                order_type: OrderType::ascending(),
1022            },
1023        ];
1024        let order_by_executor = Box::new(SortExecutor::new(
1025            Box::new(mock_executor),
1026            Arc::new(column_orders),
1027            "SortExecutor2".to_owned(),
1028            CHUNK_SIZE,
1029            MemoryContext::for_spill_test(),
1030            Some(SpillBackend::Memory),
1031            BatchSpillMetrics::for_test(),
1032        ));
1033        let fields = &order_by_executor.schema().fields;
1034        assert_eq!(fields[0].data_type, DataType::Float32);
1035        assert_eq!(fields[1].data_type, DataType::Float64);
1036
1037        let mut stream = order_by_executor.execute();
1038        let res = stream.next().await;
1039        assert!(res.is_some());
1040        if let Some(res) = res {
1041            let res = res.unwrap();
1042            let col0 = res.column_at(0);
1043            assert_eq!(col0.as_float32().value_at(0), Some(3.3.into()));
1044            assert_eq!(col0.as_float32().value_at(1), Some(2.2.into()));
1045            assert_eq!(col0.as_float32().value_at(2), Some(1.1.into()));
1046            assert_eq!(col0.as_float32().value_at(3), Some((-1.1).into()));
1047            assert_eq!(col0.as_float32().value_at(4), Some((-2.2).into()));
1048        }
1049    }
1050}