risingwave_common/util/
sort_util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt;
17use std::sync::Arc;
18
19use parse_display::Display;
20use risingwave_common_estimate_size::EstimateSize;
21use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
22
23use super::iter_util::ZipEqDebug;
24use crate::array::{Array, DataChunk};
25use crate::catalog::{FieldDisplay, Schema};
26use crate::dispatch_array_variants;
27use crate::row::Row;
28use crate::types::{DefaultOrdered, ToDatumRef};
29
30/// Sort direction, ascending/descending.
31#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Display, Default)]
32pub enum Direction {
33    #[default]
34    #[display("ASC")]
35    Ascending,
36    #[display("DESC")]
37    Descending,
38}
39
40impl Direction {
41    fn from_protobuf(direction: &PbDirection) -> Self {
42        match direction {
43            PbDirection::Ascending => Self::Ascending,
44            PbDirection::Descending => Self::Descending,
45            PbDirection::Unspecified => unreachable!(),
46        }
47    }
48
49    fn to_protobuf(self) -> PbDirection {
50        match self {
51            Self::Ascending => PbDirection::Ascending,
52            Self::Descending => PbDirection::Descending,
53        }
54    }
55}
56
57impl Direction {
58    fn reverse(self) -> Self {
59        match self {
60            Self::Ascending => Self::Descending,
61            Self::Descending => Self::Ascending,
62        }
63    }
64}
65
66/// Nulls are largest/smallest.
67#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Display, Default)]
68enum NullsAre {
69    #[default]
70    #[display("LARGEST")]
71    Largest,
72    #[display("SMALLEST")]
73    Smallest,
74}
75
76impl NullsAre {
77    fn from_protobuf(nulls_are: &PbNullsAre) -> Self {
78        match nulls_are {
79            PbNullsAre::Largest => Self::Largest,
80            PbNullsAre::Smallest => Self::Smallest,
81            PbNullsAre::Unspecified => unreachable!(),
82        }
83    }
84
85    fn to_protobuf(self) -> PbNullsAre {
86        match self {
87            Self::Largest => PbNullsAre::Largest,
88            Self::Smallest => PbNullsAre::Smallest,
89        }
90    }
91}
92
93/// Order type of a column.
94#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Default)]
95pub struct OrderType {
96    direction: Direction,
97    nulls_are: NullsAre,
98}
99
100impl OrderType {
101    pub fn from_protobuf(order_type: &PbOrderType) -> OrderType {
102        OrderType {
103            direction: Direction::from_protobuf(&order_type.direction()),
104            nulls_are: NullsAre::from_protobuf(&order_type.nulls_are()),
105        }
106    }
107
108    pub fn to_protobuf(self) -> PbOrderType {
109        PbOrderType {
110            direction: self.direction.to_protobuf() as _,
111            nulls_are: self.nulls_are.to_protobuf() as _,
112        }
113    }
114}
115
116impl OrderType {
117    fn new(direction: Direction, nulls_are: NullsAre) -> Self {
118        Self {
119            direction,
120            nulls_are,
121        }
122    }
123
124    fn nulls_first(direction: Direction) -> Self {
125        match direction {
126            Direction::Ascending => Self::new(direction, NullsAre::Smallest),
127            Direction::Descending => Self::new(direction, NullsAre::Largest),
128        }
129    }
130
131    fn nulls_last(direction: Direction) -> Self {
132        match direction {
133            Direction::Ascending => Self::new(direction, NullsAre::Largest),
134            Direction::Descending => Self::new(direction, NullsAre::Smallest),
135        }
136    }
137
138    pub fn from_bools(asc: Option<bool>, nulls_first: Option<bool>) -> Self {
139        let direction = match asc {
140            None => Direction::default(),
141            Some(true) => Direction::Ascending,
142            Some(false) => Direction::Descending,
143        };
144        match nulls_first {
145            None => Self::new(direction, NullsAre::default()),
146            Some(true) => Self::nulls_first(direction),
147            Some(false) => Self::nulls_last(direction),
148        }
149    }
150
151    // TODO(rc): Many places that call `ascending` should've call `default`.
152    /// Create an `ASC` order type.
153    pub fn ascending() -> Self {
154        Self {
155            direction: Direction::Ascending,
156            nulls_are: NullsAre::default(),
157        }
158    }
159
160    /// Create a `DESC` order type.
161    pub fn descending() -> Self {
162        Self {
163            direction: Direction::Descending,
164            nulls_are: NullsAre::default(),
165        }
166    }
167
168    /// Create an `ASC NULLS FIRST` order type.
169    pub fn ascending_nulls_first() -> Self {
170        Self::nulls_first(Direction::Ascending)
171    }
172
173    /// Create an `ASC NULLS LAST` order type.
174    pub fn ascending_nulls_last() -> Self {
175        Self::nulls_last(Direction::Ascending)
176    }
177
178    /// Create a `DESC NULLS FIRST` order type.
179    pub fn descending_nulls_first() -> Self {
180        Self::nulls_first(Direction::Descending)
181    }
182
183    /// Create a `DESC NULLS LAST` order type.
184    pub fn descending_nulls_last() -> Self {
185        Self::nulls_last(Direction::Descending)
186    }
187
188    pub fn direction(&self) -> Direction {
189        self.direction
190    }
191
192    pub fn is_ascending(&self) -> bool {
193        self.direction == Direction::Ascending
194    }
195
196    pub fn is_descending(&self) -> bool {
197        self.direction == Direction::Descending
198    }
199
200    pub fn nulls_are_largest(&self) -> bool {
201        self.nulls_are == NullsAre::Largest
202    }
203
204    pub fn nulls_are_smallest(&self) -> bool {
205        self.nulls_are == NullsAre::Smallest
206    }
207
208    pub fn nulls_are_first(&self) -> bool {
209        self.is_ascending() && self.nulls_are_smallest()
210            || self.is_descending() && self.nulls_are_largest()
211    }
212
213    pub fn nulls_are_last(&self) -> bool {
214        !self.nulls_are_first()
215    }
216
217    pub fn reverse(self) -> Self {
218        Self::new(self.direction.reverse(), self.nulls_are)
219    }
220}
221
222impl fmt::Display for OrderType {
223    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224        write!(f, "{}", self.direction)?;
225        if self.nulls_are != NullsAre::default() {
226            write!(
227                f,
228                " NULLS {}",
229                if self.nulls_are_first() {
230                    "FIRST"
231                } else {
232                    "LAST"
233                }
234            )?;
235        }
236        Ok(())
237    }
238}
239
240/// Column index with an order type (ASC or DESC). Used to represent a sort key
241/// (`Vec<ColumnOrder>`).
242///
243/// Corresponds to protobuf [`PbColumnOrder`].
244#[derive(Clone, PartialEq, Eq, Hash)]
245pub struct ColumnOrder {
246    pub column_index: usize,
247    pub order_type: OrderType,
248}
249
250impl ColumnOrder {
251    pub fn new(column_index: usize, order_type: OrderType) -> Self {
252        Self {
253            column_index,
254            order_type,
255        }
256    }
257
258    /// Shift the column index with offset.
259    pub fn shift_with_offset(&mut self, offset: isize) {
260        self.column_index = (self.column_index as isize + offset) as usize;
261    }
262}
263
264impl ColumnOrder {
265    pub fn from_protobuf(column_order: &PbColumnOrder) -> Self {
266        ColumnOrder {
267            column_index: column_order.column_index as _,
268            order_type: OrderType::from_protobuf(column_order.get_order_type().unwrap()),
269        }
270    }
271
272    pub fn to_protobuf(&self) -> PbColumnOrder {
273        PbColumnOrder {
274            column_index: self.column_index as _,
275            order_type: Some(self.order_type.to_protobuf()),
276        }
277    }
278}
279
280impl fmt::Display for ColumnOrder {
281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282        write!(f, "${} {}", self.column_index, self.order_type)
283    }
284}
285
286impl fmt::Debug for ColumnOrder {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        write!(f, "{}", self)
289    }
290}
291
292pub struct ColumnOrderDisplay<'a> {
293    pub column_order: &'a ColumnOrder,
294    pub input_schema: &'a Schema,
295}
296
297impl fmt::Display for ColumnOrderDisplay<'_> {
298    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299        let that = self.column_order;
300        write!(
301            f,
302            "{} {}",
303            FieldDisplay(self.input_schema.fields.get(that.column_index).unwrap()),
304            that.order_type
305        )
306    }
307}
308
309#[derive(Clone, Debug)]
310pub struct HeapElem {
311    column_orders: Arc<Vec<ColumnOrder>>,
312    chunk: DataChunk,
313    chunk_idx: usize,
314    elem_idx: usize,
315    /// `DataChunk` can be encoded to accelerate the comparison.
316    /// Use `risingwave_common::util::encoding_for_comparison::encode_chunk`
317    /// to perform encoding, otherwise the comparison will be performed
318    /// column by column.
319    encoded_chunk: Option<Arc<Vec<Vec<u8>>>>,
320    estimated_size: usize,
321}
322
323impl HeapElem {
324    pub fn new(
325        column_orders: Arc<Vec<ColumnOrder>>,
326        chunk: DataChunk,
327        chunk_idx: usize,
328        elem_idx: usize,
329        encoded_chunk: Option<Arc<Vec<Vec<u8>>>>,
330    ) -> Self {
331        let estimated_size = encoded_chunk
332            .as_ref()
333            .map(|inner| inner.iter().map(|i| i.capacity()).sum())
334            .unwrap_or(0);
335
336        Self {
337            column_orders,
338            chunk,
339            chunk_idx,
340            elem_idx,
341            encoded_chunk,
342            estimated_size,
343        }
344    }
345
346    #[inline(always)]
347    pub fn chunk_idx(&self) -> usize {
348        self.chunk_idx
349    }
350
351    #[inline(always)]
352    pub fn elem_idx(&self) -> usize {
353        self.elem_idx
354    }
355
356    pub fn chunk(&self) -> &DataChunk {
357        &self.chunk
358    }
359}
360
361impl Ord for HeapElem {
362    fn cmp(&self, other: &Self) -> Ordering {
363        let ord = if let (Some(lhs_encoded_chunk), Some(rhs_encoded_chunk)) =
364            (self.encoded_chunk.as_ref(), other.encoded_chunk.as_ref())
365        {
366            lhs_encoded_chunk[self.elem_idx]
367                .as_slice()
368                .cmp(rhs_encoded_chunk[other.elem_idx].as_slice())
369        } else {
370            compare_rows_in_chunk(
371                &self.chunk,
372                self.elem_idx,
373                &other.chunk,
374                other.elem_idx,
375                self.column_orders.as_ref(),
376            )
377        };
378        ord.reverse()
379    }
380}
381
382impl EstimateSize for HeapElem {
383    fn estimated_heap_size(&self) -> usize {
384        self.estimated_size
385    }
386}
387
388impl PartialOrd for HeapElem {
389    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
390        Some(self.cmp(other))
391    }
392}
393
394impl PartialEq for HeapElem {
395    fn eq(&self, other: &Self) -> bool {
396        self.cmp(other) == Ordering::Equal
397    }
398}
399
400impl Eq for HeapElem {}
401
402fn generic_partial_cmp<T: PartialOrd>(
403    lhs: Option<&T>,
404    rhs: Option<&T>,
405    order_type: OrderType,
406) -> Option<Ordering> {
407    let ord = match (lhs, rhs, order_type.nulls_are) {
408        (Some(l), Some(r), _) => l.partial_cmp(r),
409        (None, None, _) => Some(Ordering::Equal),
410        (Some(_), None, NullsAre::Largest) => Some(Ordering::Less),
411        (Some(_), None, NullsAre::Smallest) => Some(Ordering::Greater),
412        (None, Some(_), NullsAre::Largest) => Some(Ordering::Greater),
413        (None, Some(_), NullsAre::Smallest) => Some(Ordering::Less),
414    };
415    ord.map(|o| {
416        if order_type.is_descending() {
417            o.reverse()
418        } else {
419            o
420        }
421    })
422}
423
424fn compare_values_in_array<'a, T>(
425    lhs_array: &'a T,
426    lhs_idx: usize,
427    rhs_array: &'a T,
428    rhs_idx: usize,
429    order_type: OrderType,
430) -> Ordering
431where
432    T: Array,
433    <T as Array>::RefItem<'a>: PartialOrd,
434{
435    generic_partial_cmp(
436        lhs_array.value_at(lhs_idx).as_ref(),
437        rhs_array.value_at(rhs_idx).as_ref(),
438        order_type,
439    )
440    .expect("items in the same `Array` type should be able to compare")
441}
442
443fn compare_rows_in_chunk(
444    lhs_data_chunk: &DataChunk,
445    lhs_idx: usize,
446    rhs_data_chunk: &DataChunk,
447    rhs_idx: usize,
448    column_orders: &[ColumnOrder],
449) -> Ordering {
450    for column_order in column_orders {
451        let lhs_array = lhs_data_chunk.column_at(column_order.column_index);
452        let rhs_array = rhs_data_chunk.column_at(column_order.column_index);
453
454        let res = dispatch_array_variants!(&**lhs_array, lhs_inner, {
455            #[expect(
456                clippy::unnecessary_fallible_conversions,
457                reason = "FIXME: Array's `into` is not safe. We should revise it."
458            )]
459            let rhs_inner = (&**rhs_array).try_into().unwrap_or_else(|_| {
460                panic!(
461                    "Unmatched array types, lhs array is: {}, rhs array is: {}",
462                    lhs_array.get_ident(),
463                    rhs_array.get_ident(),
464                )
465            });
466            compare_values_in_array(
467                lhs_inner,
468                lhs_idx,
469                rhs_inner,
470                rhs_idx,
471                column_order.order_type,
472            )
473        });
474
475        if res != Ordering::Equal {
476            return res;
477        }
478    }
479    Ordering::Equal
480}
481
482/// Partial compare two `Datum`s with specified order type.
483pub fn partial_cmp_datum(
484    lhs: impl ToDatumRef,
485    rhs: impl ToDatumRef,
486    order_type: OrderType,
487) -> Option<Ordering> {
488    let lhs = lhs.to_datum_ref().map(DefaultOrdered);
489    let rhs = rhs.to_datum_ref().map(DefaultOrdered);
490    generic_partial_cmp(lhs.as_ref(), rhs.as_ref(), order_type)
491}
492
493/// Compare two `Datum`s with specified order type.
494///
495/// # Panics
496///
497/// Panics if the data types of `lhs` and `rhs` are not matched.
498pub fn cmp_datum(lhs: impl ToDatumRef, rhs: impl ToDatumRef, order_type: OrderType) -> Ordering {
499    let lhs = lhs.to_datum_ref();
500    let rhs = rhs.to_datum_ref();
501    partial_cmp_datum(lhs, rhs, order_type)
502        .unwrap_or_else(|| panic!("cannot compare {lhs:?} with {rhs:?}"))
503}
504
505/// Compare two `Datum` iterators with specified order types.
506pub fn partial_cmp_datum_iter(
507    lhs: impl IntoIterator<Item = impl ToDatumRef>,
508    rhs: impl IntoIterator<Item = impl ToDatumRef>,
509    order_types: impl IntoIterator<Item = OrderType>,
510) -> Option<Ordering> {
511    let mut order_types_iter = order_types.into_iter();
512    lhs.into_iter().partial_cmp_by(rhs, |x, y| {
513        let order_type = order_types_iter.next()?;
514        partial_cmp_datum(x, y, order_type)
515    })
516}
517
518/// Compare two `Datum` iterators with specified order types.
519///
520/// # Panics
521///
522/// Panics if the number of `OrderType`s is smaller than the number of `Datum`s,
523/// or if the data types of `lhs` and `rhs` are not matched.
524pub fn cmp_datum_iter(
525    lhs: impl IntoIterator<Item = impl ToDatumRef>,
526    rhs: impl IntoIterator<Item = impl ToDatumRef>,
527    order_types: impl IntoIterator<Item = OrderType>,
528) -> Ordering {
529    let mut order_types_iter = order_types.into_iter();
530    lhs.into_iter().cmp_by(rhs, |x, y| {
531        let order_type = order_types_iter
532            .next()
533            .expect("number of `OrderType`s is not enough");
534        cmp_datum(x, y, order_type)
535    })
536}
537
538/// Partial compare two `Row`s with specified order types.
539///
540/// NOTE: This function returns `None` if two rows have different schema.
541pub fn partial_cmp_rows(
542    lhs: impl Row,
543    rhs: impl Row,
544    order_types: &[OrderType],
545) -> Option<Ordering> {
546    if lhs.len() != rhs.len() {
547        return None;
548    }
549    lhs.iter()
550        .zip_eq_debug(rhs.iter())
551        .zip_eq_debug(order_types)
552        .try_fold(Ordering::Equal, |acc, ((l, r), order_type)| match acc {
553            Ordering::Equal => partial_cmp_datum(l, r, *order_type),
554            acc => Some(acc),
555        })
556}
557
558/// Compare two `Row`s with specified order types.
559///
560/// # Panics
561///
562/// Panics if the length of `lhs`, `rhs` and `order_types` are not equal,
563/// or, if the schemas of `lhs` and `rhs` are not matched.
564pub fn cmp_rows(lhs: impl Row, rhs: impl Row, order_types: &[OrderType]) -> Ordering {
565    assert_eq!(lhs.len(), rhs.len());
566    lhs.iter()
567        .zip_eq_debug(rhs.iter())
568        .zip_eq_debug(order_types)
569        .fold(Ordering::Equal, |acc, ((l, r), order_type)| match acc {
570            Ordering::Equal => cmp_datum(l, r, *order_type),
571            acc => acc,
572        })
573}
574
575#[cfg(test)]
576mod tests {
577    use itertools::Itertools;
578
579    use super::*;
580    use crate::array::{ListValue, StructValue};
581    use crate::row::OwnedRow;
582    use crate::types::{DataType, Datum, ScalarImpl, StructType};
583
584    #[test]
585    fn test_order_type() {
586        assert_eq!(OrderType::default(), OrderType::ascending());
587        assert_eq!(
588            OrderType::default(),
589            OrderType::new(Direction::Ascending, NullsAre::Largest)
590        );
591        assert_eq!(
592            OrderType::default(),
593            OrderType::from_bools(Some(true), Some(false))
594        );
595        assert_eq!(OrderType::default(), OrderType::from_bools(None, None));
596
597        assert!(OrderType::ascending().is_ascending());
598        assert!(OrderType::ascending().nulls_are_largest());
599        assert!(OrderType::ascending().nulls_are_last());
600
601        assert!(OrderType::descending().is_descending());
602        assert!(OrderType::descending().nulls_are_largest());
603        assert!(OrderType::descending().nulls_are_first());
604
605        assert!(OrderType::ascending_nulls_first().is_ascending());
606        assert!(OrderType::ascending_nulls_first().nulls_are_smallest());
607        assert!(OrderType::ascending_nulls_first().nulls_are_first());
608
609        assert!(OrderType::ascending_nulls_last().is_ascending());
610        assert!(OrderType::ascending_nulls_last().nulls_are_largest());
611        assert!(OrderType::ascending_nulls_last().nulls_are_last());
612
613        assert!(OrderType::descending_nulls_first().is_descending());
614        assert!(OrderType::descending_nulls_first().nulls_are_largest());
615        assert!(OrderType::descending_nulls_first().nulls_are_first());
616
617        assert!(OrderType::descending_nulls_last().is_descending());
618        assert!(OrderType::descending_nulls_last().nulls_are_smallest());
619        assert!(OrderType::descending_nulls_last().nulls_are_last());
620
621        assert_eq!(OrderType::ascending().reverse(), OrderType::descending());
622        assert_eq!(OrderType::descending().reverse(), OrderType::ascending());
623        assert_eq!(
624            OrderType::ascending_nulls_first().reverse(),
625            OrderType::descending_nulls_last()
626        );
627        assert_eq!(
628            OrderType::ascending_nulls_last().reverse(),
629            OrderType::descending_nulls_first()
630        );
631    }
632
633    #[test]
634    fn test_compare_rows_in_chunk() {
635        let v10 = Some(ScalarImpl::Int32(42));
636        let v11 = Some(ScalarImpl::Utf8("hello".into()));
637        let v12 = Some(ScalarImpl::Float32(4.0.into()));
638        let v20 = Some(ScalarImpl::Int32(42));
639        let v21 = Some(ScalarImpl::Utf8("hell".into()));
640        let v22 = Some(ScalarImpl::Float32(3.0.into()));
641
642        let row1 = OwnedRow::new(vec![v10, v11, v12]);
643        let row2 = OwnedRow::new(vec![v20, v21, v22]);
644        let chunk = DataChunk::from_rows(
645            &[row1, row2],
646            &[DataType::Int32, DataType::Varchar, DataType::Float32],
647        );
648        let column_orders = vec![
649            ColumnOrder::new(0, OrderType::ascending()),
650            ColumnOrder::new(1, OrderType::descending()),
651        ];
652
653        assert_eq!(
654            Ordering::Equal,
655            compare_rows_in_chunk(&chunk, 0, &chunk, 0, &column_orders)
656        );
657        assert_eq!(
658            Ordering::Less,
659            compare_rows_in_chunk(&chunk, 0, &chunk, 1, &column_orders)
660        );
661    }
662
663    #[test]
664    fn test_compare_all_types() {
665        let row1 = OwnedRow::new(vec![
666            Some(ScalarImpl::Int16(16)),
667            Some(ScalarImpl::Int32(32)),
668            Some(ScalarImpl::Int64(64)),
669            Some(ScalarImpl::Float32(3.2.into())),
670            Some(ScalarImpl::Float64(6.4.into())),
671            Some(ScalarImpl::Utf8("hello".into())),
672            Some(ScalarImpl::Bool(true)),
673            Some(ScalarImpl::Decimal(10.into())),
674            Some(ScalarImpl::Interval(Default::default())),
675            Some(ScalarImpl::Date(Default::default())),
676            Some(ScalarImpl::Timestamp(Default::default())),
677            Some(ScalarImpl::Time(Default::default())),
678            Some(ScalarImpl::Struct(StructValue::new(vec![
679                Some(ScalarImpl::Int32(1)),
680                Some(ScalarImpl::Float32(3.0.into())),
681            ]))),
682            Some(ScalarImpl::List(ListValue::from_iter([1, 2]))),
683        ]);
684        let row2 = OwnedRow::new(vec![
685            Some(ScalarImpl::Int16(16)),
686            Some(ScalarImpl::Int32(32)),
687            Some(ScalarImpl::Int64(64)),
688            Some(ScalarImpl::Float32(3.2.into())),
689            Some(ScalarImpl::Float64(6.4.into())),
690            Some(ScalarImpl::Utf8("hello".into())),
691            Some(ScalarImpl::Bool(true)),
692            Some(ScalarImpl::Decimal(10.into())),
693            Some(ScalarImpl::Interval(Default::default())),
694            Some(ScalarImpl::Date(Default::default())),
695            Some(ScalarImpl::Timestamp(Default::default())),
696            Some(ScalarImpl::Time(Default::default())),
697            Some(ScalarImpl::Struct(StructValue::new(vec![
698                Some(ScalarImpl::Int32(1)),
699                Some(ScalarImpl::Float32(33333.0.into())), // larger than row1
700            ]))),
701            Some(ScalarImpl::List(ListValue::from_iter([1, 2]))),
702        ]);
703
704        let column_orders = (0..row1.len())
705            .map(|i| ColumnOrder::new(i, OrderType::ascending()))
706            .collect_vec();
707
708        let chunk = DataChunk::from_rows(
709            &[row1, row2],
710            &[
711                DataType::Int16,
712                DataType::Int32,
713                DataType::Int64,
714                DataType::Float32,
715                DataType::Float64,
716                DataType::Varchar,
717                DataType::Boolean,
718                DataType::Decimal,
719                DataType::Interval,
720                DataType::Date,
721                DataType::Timestamp,
722                DataType::Time,
723                StructType::unnamed(vec![DataType::Int32, DataType::Float32]).into(),
724                DataType::List(Box::new(DataType::Int32)),
725            ],
726        );
727        assert_eq!(
728            Ordering::Equal,
729            compare_rows_in_chunk(&chunk, 0, &chunk, 0, &column_orders)
730        );
731        assert_eq!(
732            Ordering::Less,
733            compare_rows_in_chunk(&chunk, 0, &chunk, 1, &column_orders)
734        );
735    }
736
737    fn common_compare_datum<CmpFn>(compare: CmpFn)
738    where
739        CmpFn: Fn(Datum, Datum, OrderType) -> Ordering,
740    {
741        assert_eq!(
742            Ordering::Equal,
743            compare(Some(42.into()), Some(42.into()), OrderType::default(),)
744        );
745        assert_eq!(Ordering::Equal, compare(None, None, OrderType::default(),));
746        assert_eq!(
747            Ordering::Less,
748            compare(Some(42.into()), Some(100.into()), OrderType::ascending(),)
749        );
750        assert_eq!(
751            Ordering::Greater,
752            compare(Some(42.into()), None, OrderType::ascending_nulls_first(),)
753        );
754        assert_eq!(
755            Ordering::Less,
756            compare(Some(42.into()), None, OrderType::ascending_nulls_last(),)
757        );
758        assert_eq!(
759            Ordering::Greater,
760            compare(Some(42.into()), None, OrderType::descending_nulls_first(),)
761        );
762        assert_eq!(
763            Ordering::Less,
764            compare(Some(42.into()), None, OrderType::descending_nulls_last(),)
765        );
766    }
767
768    fn common_compare_rows<CmpFn>(compare: CmpFn)
769    where
770        CmpFn: Fn(OwnedRow, OwnedRow, &[OrderType]) -> Ordering,
771    {
772        assert_eq!(
773            Ordering::Equal,
774            compare(
775                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
776                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
777                &[OrderType::ascending(), OrderType::ascending()],
778            )
779        );
780
781        assert_eq!(
782            Ordering::Greater,
783            compare(
784                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
785                OwnedRow::new(vec![Some(42.into()), Some(100.into())]),
786                &[OrderType::ascending(), OrderType::descending()],
787            )
788        );
789
790        assert_eq!(
791            Ordering::Less,
792            compare(
793                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
794                OwnedRow::new(vec![Some(42.into()), Some(100.into())]),
795                &[OrderType::ascending(), OrderType::ascending()],
796            )
797        );
798    }
799
800    #[test]
801    fn test_compare_datum() {
802        common_compare_datum(cmp_datum);
803    }
804
805    #[test]
806    fn test_compare_rows() {
807        common_compare_rows(cmp_rows);
808    }
809
810    #[test]
811    fn test_partial_compare_datum() {
812        common_compare_datum(|lhs, rhs, order| partial_cmp_datum(lhs, rhs, order).unwrap());
813
814        assert_eq!(
815            None,
816            partial_cmp_datum(
817                Some(ScalarImpl::from(42)),
818                Some(ScalarImpl::from("abc")),
819                OrderType::default()
820            )
821        )
822    }
823
824    #[test]
825    fn test_partial_compare_rows() {
826        common_compare_rows(|lhs, rhs, orders| partial_cmp_rows(lhs, rhs, orders).unwrap());
827
828        assert_eq!(
829            None,
830            partial_cmp_rows(
831                OwnedRow::new(vec![Some(42.into())]),
832                OwnedRow::new(vec![Some("abc".into())]),
833                &[OrderType::default()]
834            )
835        )
836    }
837}