risingwave_common/util/
sort_util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt;
17use std::sync::Arc;
18
19use parse_display::Display;
20use risingwave_common_estimate_size::EstimateSize;
21use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
22
23use super::iter_util::ZipEqDebug;
24use crate::array::{Array, DataChunk};
25use crate::catalog::{FieldDisplay, Schema};
26use crate::dispatch_array_variants;
27use crate::row::Row;
28use crate::types::{DefaultOrdered, ToDatumRef};
29
30/// Sort direction, ascending/descending.
31#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Display, Default)]
32pub enum Direction {
33    #[default]
34    #[display("ASC")]
35    Ascending,
36    #[display("DESC")]
37    Descending,
38}
39
40impl Direction {
41    fn from_protobuf(direction: &PbDirection) -> Self {
42        match direction {
43            PbDirection::Ascending => Self::Ascending,
44            PbDirection::Descending => Self::Descending,
45            PbDirection::Unspecified => unreachable!(),
46        }
47    }
48
49    fn to_protobuf(self) -> PbDirection {
50        match self {
51            Self::Ascending => PbDirection::Ascending,
52            Self::Descending => PbDirection::Descending,
53        }
54    }
55}
56
57impl Direction {
58    fn reverse(self) -> Self {
59        match self {
60            Self::Ascending => Self::Descending,
61            Self::Descending => Self::Ascending,
62        }
63    }
64}
65
66/// Nulls are largest/smallest.
67#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Display, Default)]
68enum NullsAre {
69    #[default]
70    #[display("LARGEST")]
71    Largest,
72    #[display("SMALLEST")]
73    Smallest,
74}
75
76impl NullsAre {
77    fn from_protobuf(nulls_are: &PbNullsAre) -> Self {
78        match nulls_are {
79            PbNullsAre::Largest => Self::Largest,
80            PbNullsAre::Smallest => Self::Smallest,
81            PbNullsAre::Unspecified => unreachable!(),
82        }
83    }
84
85    fn to_protobuf(self) -> PbNullsAre {
86        match self {
87            Self::Largest => PbNullsAre::Largest,
88            Self::Smallest => PbNullsAre::Smallest,
89        }
90    }
91}
92
93/// Order type of a column.
94#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Default)]
95pub struct OrderType {
96    direction: Direction,
97    nulls_are: NullsAre,
98}
99
100impl OrderType {
101    pub fn from_protobuf(order_type: &PbOrderType) -> OrderType {
102        OrderType {
103            direction: Direction::from_protobuf(&order_type.direction()),
104            nulls_are: NullsAre::from_protobuf(&order_type.nulls_are()),
105        }
106    }
107
108    pub fn to_protobuf(self) -> PbOrderType {
109        PbOrderType {
110            direction: self.direction.to_protobuf() as _,
111            nulls_are: self.nulls_are.to_protobuf() as _,
112        }
113    }
114}
115
116impl OrderType {
117    fn new(direction: Direction, nulls_are: NullsAre) -> Self {
118        Self {
119            direction,
120            nulls_are,
121        }
122    }
123
124    fn nulls_first(direction: Direction) -> Self {
125        match direction {
126            Direction::Ascending => Self::new(direction, NullsAre::Smallest),
127            Direction::Descending => Self::new(direction, NullsAre::Largest),
128        }
129    }
130
131    fn nulls_last(direction: Direction) -> Self {
132        match direction {
133            Direction::Ascending => Self::new(direction, NullsAre::Largest),
134            Direction::Descending => Self::new(direction, NullsAre::Smallest),
135        }
136    }
137
138    pub fn from_bools(asc: Option<bool>, nulls_first: Option<bool>) -> Self {
139        let direction = match asc {
140            None => Direction::default(),
141            Some(true) => Direction::Ascending,
142            Some(false) => Direction::Descending,
143        };
144        match nulls_first {
145            None => Self::new(direction, NullsAre::default()),
146            Some(true) => Self::nulls_first(direction),
147            Some(false) => Self::nulls_last(direction),
148        }
149    }
150
151    // TODO(rc): Many places that call `ascending` should've call `default`.
152    /// Create an `ASC` order type.
153    pub fn ascending() -> Self {
154        Self {
155            direction: Direction::Ascending,
156            nulls_are: NullsAre::default(),
157        }
158    }
159
160    /// Create a `DESC` order type.
161    pub fn descending() -> Self {
162        Self {
163            direction: Direction::Descending,
164            nulls_are: NullsAre::default(),
165        }
166    }
167
168    /// Create an `ASC NULLS FIRST` order type.
169    pub fn ascending_nulls_first() -> Self {
170        Self::nulls_first(Direction::Ascending)
171    }
172
173    /// Create an `ASC NULLS LAST` order type.
174    pub fn ascending_nulls_last() -> Self {
175        Self::nulls_last(Direction::Ascending)
176    }
177
178    /// Create a `DESC NULLS FIRST` order type.
179    pub fn descending_nulls_first() -> Self {
180        Self::nulls_first(Direction::Descending)
181    }
182
183    /// Create a `DESC NULLS LAST` order type.
184    pub fn descending_nulls_last() -> Self {
185        Self::nulls_last(Direction::Descending)
186    }
187
188    pub fn direction(&self) -> Direction {
189        self.direction
190    }
191
192    pub fn is_ascending(&self) -> bool {
193        self.direction == Direction::Ascending
194    }
195
196    pub fn is_descending(&self) -> bool {
197        self.direction == Direction::Descending
198    }
199
200    pub fn nulls_are_largest(&self) -> bool {
201        self.nulls_are == NullsAre::Largest
202    }
203
204    pub fn nulls_are_smallest(&self) -> bool {
205        self.nulls_are == NullsAre::Smallest
206    }
207
208    pub fn nulls_are_first(&self) -> bool {
209        self.is_ascending() && self.nulls_are_smallest()
210            || self.is_descending() && self.nulls_are_largest()
211    }
212
213    pub fn nulls_are_last(&self) -> bool {
214        !self.nulls_are_first()
215    }
216
217    pub fn reverse(self) -> Self {
218        Self::new(self.direction.reverse(), self.nulls_are)
219    }
220
221    pub fn all() -> Vec<Self> {
222        vec![
223            Self::ascending_nulls_first(),
224            Self::ascending_nulls_last(),
225            Self::descending_nulls_first(),
226            Self::descending_nulls_last(),
227        ]
228    }
229}
230
231impl fmt::Display for OrderType {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        write!(f, "{}", self.direction)?;
234        if self.nulls_are != NullsAre::default() {
235            write!(
236                f,
237                " NULLS {}",
238                if self.nulls_are_first() {
239                    "FIRST"
240                } else {
241                    "LAST"
242                }
243            )?;
244        }
245        Ok(())
246    }
247}
248
249/// Column index with an order type (ASC or DESC). Used to represent a sort key
250/// (`Vec<ColumnOrder>`).
251///
252/// Corresponds to protobuf [`PbColumnOrder`].
253#[derive(Clone, PartialEq, Eq, Hash, Copy)]
254pub struct ColumnOrder {
255    pub column_index: usize,
256    pub order_type: OrderType,
257}
258
259impl ColumnOrder {
260    pub fn new(column_index: usize, order_type: OrderType) -> Self {
261        Self {
262            column_index,
263            order_type,
264        }
265    }
266
267    /// Shift the column index with offset.
268    pub fn shift_with_offset(&mut self, offset: isize) {
269        self.column_index = (self.column_index as isize + offset) as usize;
270    }
271}
272
273impl ColumnOrder {
274    pub fn from_protobuf(column_order: &PbColumnOrder) -> Self {
275        ColumnOrder {
276            column_index: column_order.column_index as _,
277            order_type: OrderType::from_protobuf(column_order.get_order_type().unwrap()),
278        }
279    }
280
281    pub fn to_protobuf(self) -> PbColumnOrder {
282        PbColumnOrder {
283            column_index: self.column_index as _,
284            order_type: Some(self.order_type.to_protobuf()),
285        }
286    }
287}
288
289impl fmt::Display for ColumnOrder {
290    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291        write!(f, "${} {}", self.column_index, self.order_type)
292    }
293}
294
295impl fmt::Debug for ColumnOrder {
296    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297        write!(f, "{}", self)
298    }
299}
300
301pub struct ColumnOrderDisplay<'a> {
302    pub column_order: &'a ColumnOrder,
303    pub input_schema: &'a Schema,
304}
305
306impl fmt::Display for ColumnOrderDisplay<'_> {
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        let that = self.column_order;
309        write!(
310            f,
311            "{} {}",
312            FieldDisplay(self.input_schema.fields.get(that.column_index).unwrap()),
313            that.order_type
314        )
315    }
316}
317
318#[derive(Clone, Debug)]
319pub struct HeapElem {
320    column_orders: Arc<Vec<ColumnOrder>>,
321    chunk: DataChunk,
322    chunk_idx: usize,
323    elem_idx: usize,
324    /// `DataChunk` can be encoded to accelerate the comparison.
325    /// Use `risingwave_common::util::encoding_for_comparison::encode_chunk`
326    /// to perform encoding, otherwise the comparison will be performed
327    /// column by column.
328    encoded_chunk: Option<Arc<Vec<Vec<u8>>>>,
329    estimated_size: usize,
330}
331
332impl HeapElem {
333    pub fn new(
334        column_orders: Arc<Vec<ColumnOrder>>,
335        chunk: DataChunk,
336        chunk_idx: usize,
337        elem_idx: usize,
338        encoded_chunk: Option<Arc<Vec<Vec<u8>>>>,
339    ) -> Self {
340        let estimated_size = encoded_chunk
341            .as_ref()
342            .map(|inner| inner.iter().map(|i| i.capacity()).sum())
343            .unwrap_or(0);
344
345        Self {
346            column_orders,
347            chunk,
348            chunk_idx,
349            elem_idx,
350            encoded_chunk,
351            estimated_size,
352        }
353    }
354
355    #[inline(always)]
356    pub fn chunk_idx(&self) -> usize {
357        self.chunk_idx
358    }
359
360    #[inline(always)]
361    pub fn elem_idx(&self) -> usize {
362        self.elem_idx
363    }
364
365    pub fn chunk(&self) -> &DataChunk {
366        &self.chunk
367    }
368}
369
370impl Ord for HeapElem {
371    fn cmp(&self, other: &Self) -> Ordering {
372        let ord = if let (Some(lhs_encoded_chunk), Some(rhs_encoded_chunk)) =
373            (self.encoded_chunk.as_ref(), other.encoded_chunk.as_ref())
374        {
375            lhs_encoded_chunk[self.elem_idx]
376                .as_slice()
377                .cmp(rhs_encoded_chunk[other.elem_idx].as_slice())
378        } else {
379            compare_rows_in_chunk(
380                &self.chunk,
381                self.elem_idx,
382                &other.chunk,
383                other.elem_idx,
384                self.column_orders.as_ref(),
385            )
386        };
387        ord.reverse()
388    }
389}
390
391impl EstimateSize for HeapElem {
392    fn estimated_heap_size(&self) -> usize {
393        self.estimated_size
394    }
395}
396
397impl PartialOrd for HeapElem {
398    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
399        Some(self.cmp(other))
400    }
401}
402
403impl PartialEq for HeapElem {
404    fn eq(&self, other: &Self) -> bool {
405        self.cmp(other) == Ordering::Equal
406    }
407}
408
409impl Eq for HeapElem {}
410
411fn generic_partial_cmp<T: PartialOrd>(
412    lhs: Option<&T>,
413    rhs: Option<&T>,
414    order_type: OrderType,
415) -> Option<Ordering> {
416    let ord = match (lhs, rhs, order_type.nulls_are) {
417        (Some(l), Some(r), _) => l.partial_cmp(r),
418        (None, None, _) => Some(Ordering::Equal),
419        (Some(_), None, NullsAre::Largest) => Some(Ordering::Less),
420        (Some(_), None, NullsAre::Smallest) => Some(Ordering::Greater),
421        (None, Some(_), NullsAre::Largest) => Some(Ordering::Greater),
422        (None, Some(_), NullsAre::Smallest) => Some(Ordering::Less),
423    };
424    ord.map(|o| {
425        if order_type.is_descending() {
426            o.reverse()
427        } else {
428            o
429        }
430    })
431}
432
433fn compare_values_in_array<'a, T>(
434    lhs_array: &'a T,
435    lhs_idx: usize,
436    rhs_array: &'a T,
437    rhs_idx: usize,
438    order_type: OrderType,
439) -> Ordering
440where
441    T: Array,
442    <T as Array>::RefItem<'a>: PartialOrd,
443{
444    generic_partial_cmp(
445        lhs_array.value_at(lhs_idx).as_ref(),
446        rhs_array.value_at(rhs_idx).as_ref(),
447        order_type,
448    )
449    .expect("items in the same `Array` type should be able to compare")
450}
451
452fn compare_rows_in_chunk(
453    lhs_data_chunk: &DataChunk,
454    lhs_idx: usize,
455    rhs_data_chunk: &DataChunk,
456    rhs_idx: usize,
457    column_orders: &[ColumnOrder],
458) -> Ordering {
459    for column_order in column_orders {
460        let lhs_array = lhs_data_chunk.column_at(column_order.column_index);
461        let rhs_array = rhs_data_chunk.column_at(column_order.column_index);
462
463        let res = dispatch_array_variants!(&**lhs_array, lhs_inner, {
464            #[expect(
465                clippy::unnecessary_fallible_conversions,
466                reason = "FIXME: Array's `into` is not safe. We should revise it."
467            )]
468            let rhs_inner = (&**rhs_array).try_into().unwrap_or_else(|_| {
469                panic!(
470                    "Unmatched array types, lhs array is: {}, rhs array is: {}",
471                    lhs_array.get_ident(),
472                    rhs_array.get_ident(),
473                )
474            });
475            compare_values_in_array(
476                lhs_inner,
477                lhs_idx,
478                rhs_inner,
479                rhs_idx,
480                column_order.order_type,
481            )
482        });
483
484        if res != Ordering::Equal {
485            return res;
486        }
487    }
488    Ordering::Equal
489}
490
491/// Partial compare two `Datum`s with specified order type.
492pub fn partial_cmp_datum(
493    lhs: impl ToDatumRef,
494    rhs: impl ToDatumRef,
495    order_type: OrderType,
496) -> Option<Ordering> {
497    let lhs = lhs.to_datum_ref().map(DefaultOrdered);
498    let rhs = rhs.to_datum_ref().map(DefaultOrdered);
499    generic_partial_cmp(lhs.as_ref(), rhs.as_ref(), order_type)
500}
501
502/// Compare two `Datum`s with specified order type.
503///
504/// # Panics
505///
506/// Panics if the data types of `lhs` and `rhs` are not matched.
507pub fn cmp_datum(lhs: impl ToDatumRef, rhs: impl ToDatumRef, order_type: OrderType) -> Ordering {
508    let lhs = lhs.to_datum_ref();
509    let rhs = rhs.to_datum_ref();
510    partial_cmp_datum(lhs, rhs, order_type)
511        .unwrap_or_else(|| panic!("cannot compare {lhs:?} with {rhs:?}"))
512}
513
514/// Compare two `Datum` iterators with specified order types.
515pub fn partial_cmp_datum_iter(
516    lhs: impl IntoIterator<Item = impl ToDatumRef>,
517    rhs: impl IntoIterator<Item = impl ToDatumRef>,
518    order_types: impl IntoIterator<Item = OrderType>,
519) -> Option<Ordering> {
520    let mut order_types_iter = order_types.into_iter();
521    lhs.into_iter().partial_cmp_by(rhs, |x, y| {
522        let order_type = order_types_iter.next()?;
523        partial_cmp_datum(x, y, order_type)
524    })
525}
526
527/// Compare two `Datum` iterators with specified order types.
528///
529/// # Panics
530///
531/// Panics if the number of `OrderType`s is smaller than the number of `Datum`s,
532/// or if the data types of `lhs` and `rhs` are not matched.
533pub fn cmp_datum_iter(
534    lhs: impl IntoIterator<Item = impl ToDatumRef>,
535    rhs: impl IntoIterator<Item = impl ToDatumRef>,
536    order_types: impl IntoIterator<Item = OrderType>,
537) -> Ordering {
538    let mut order_types_iter = order_types.into_iter();
539    lhs.into_iter().cmp_by(rhs, |x, y| {
540        let order_type = order_types_iter
541            .next()
542            .expect("number of `OrderType`s is not enough");
543        cmp_datum(x, y, order_type)
544    })
545}
546
547/// Partial compare two `Row`s with specified order types.
548///
549/// NOTE: This function returns `None` if two rows have different schema.
550pub fn partial_cmp_rows(
551    lhs: impl Row,
552    rhs: impl Row,
553    order_types: &[OrderType],
554) -> Option<Ordering> {
555    if lhs.len() != rhs.len() {
556        return None;
557    }
558    lhs.iter()
559        .zip_eq_debug(rhs.iter())
560        .zip_eq_debug(order_types)
561        .try_fold(Ordering::Equal, |acc, ((l, r), order_type)| match acc {
562            Ordering::Equal => partial_cmp_datum(l, r, *order_type),
563            acc => Some(acc),
564        })
565}
566
567/// Compare two `Row`s with specified order types.
568///
569/// # Panics
570///
571/// Panics if the length of `lhs`, `rhs` and `order_types` are not equal,
572/// or, if the schemas of `lhs` and `rhs` are not matched.
573pub fn cmp_rows(lhs: impl Row, rhs: impl Row, order_types: &[OrderType]) -> Ordering {
574    assert_eq!(lhs.len(), rhs.len());
575    lhs.iter()
576        .zip_eq_debug(rhs.iter())
577        .zip_eq_debug(order_types)
578        .fold(Ordering::Equal, |acc, ((l, r), order_type)| match acc {
579            Ordering::Equal => cmp_datum(l, r, *order_type),
580            acc => acc,
581        })
582}
583
584#[cfg(test)]
585mod tests {
586    use itertools::Itertools;
587
588    use super::*;
589    use crate::array::{ListValue, StructValue};
590    use crate::row::OwnedRow;
591    use crate::types::{DataType, Datum, ScalarImpl, StructType};
592
593    #[test]
594    fn test_order_type() {
595        assert_eq!(OrderType::default(), OrderType::ascending());
596        assert_eq!(
597            OrderType::default(),
598            OrderType::new(Direction::Ascending, NullsAre::Largest)
599        );
600        assert_eq!(
601            OrderType::default(),
602            OrderType::from_bools(Some(true), Some(false))
603        );
604        assert_eq!(OrderType::default(), OrderType::from_bools(None, None));
605
606        assert!(OrderType::ascending().is_ascending());
607        assert!(OrderType::ascending().nulls_are_largest());
608        assert!(OrderType::ascending().nulls_are_last());
609
610        assert!(OrderType::descending().is_descending());
611        assert!(OrderType::descending().nulls_are_largest());
612        assert!(OrderType::descending().nulls_are_first());
613
614        assert!(OrderType::ascending_nulls_first().is_ascending());
615        assert!(OrderType::ascending_nulls_first().nulls_are_smallest());
616        assert!(OrderType::ascending_nulls_first().nulls_are_first());
617
618        assert!(OrderType::ascending_nulls_last().is_ascending());
619        assert!(OrderType::ascending_nulls_last().nulls_are_largest());
620        assert!(OrderType::ascending_nulls_last().nulls_are_last());
621
622        assert!(OrderType::descending_nulls_first().is_descending());
623        assert!(OrderType::descending_nulls_first().nulls_are_largest());
624        assert!(OrderType::descending_nulls_first().nulls_are_first());
625
626        assert!(OrderType::descending_nulls_last().is_descending());
627        assert!(OrderType::descending_nulls_last().nulls_are_smallest());
628        assert!(OrderType::descending_nulls_last().nulls_are_last());
629
630        assert_eq!(OrderType::ascending().reverse(), OrderType::descending());
631        assert_eq!(OrderType::descending().reverse(), OrderType::ascending());
632        assert_eq!(
633            OrderType::ascending_nulls_first().reverse(),
634            OrderType::descending_nulls_last()
635        );
636        assert_eq!(
637            OrderType::ascending_nulls_last().reverse(),
638            OrderType::descending_nulls_first()
639        );
640    }
641
642    #[test]
643    fn test_compare_rows_in_chunk() {
644        let v10 = Some(ScalarImpl::Int32(42));
645        let v11 = Some(ScalarImpl::Utf8("hello".into()));
646        let v12 = Some(ScalarImpl::Float32(4.0.into()));
647        let v20 = Some(ScalarImpl::Int32(42));
648        let v21 = Some(ScalarImpl::Utf8("hell".into()));
649        let v22 = Some(ScalarImpl::Float32(3.0.into()));
650
651        let row1 = OwnedRow::new(vec![v10, v11, v12]);
652        let row2 = OwnedRow::new(vec![v20, v21, v22]);
653        let chunk = DataChunk::from_rows(
654            &[row1, row2],
655            &[DataType::Int32, DataType::Varchar, DataType::Float32],
656        );
657        let column_orders = vec![
658            ColumnOrder::new(0, OrderType::ascending()),
659            ColumnOrder::new(1, OrderType::descending()),
660        ];
661
662        assert_eq!(
663            Ordering::Equal,
664            compare_rows_in_chunk(&chunk, 0, &chunk, 0, &column_orders)
665        );
666        assert_eq!(
667            Ordering::Less,
668            compare_rows_in_chunk(&chunk, 0, &chunk, 1, &column_orders)
669        );
670    }
671
672    #[test]
673    fn test_compare_all_types() {
674        let row1 = OwnedRow::new(vec![
675            Some(ScalarImpl::Int16(16)),
676            Some(ScalarImpl::Int32(32)),
677            Some(ScalarImpl::Int64(64)),
678            Some(ScalarImpl::Float32(3.2.into())),
679            Some(ScalarImpl::Float64(6.4.into())),
680            Some(ScalarImpl::Utf8("hello".into())),
681            Some(ScalarImpl::Bool(true)),
682            Some(ScalarImpl::Decimal(10.into())),
683            Some(ScalarImpl::Interval(Default::default())),
684            Some(ScalarImpl::Date(Default::default())),
685            Some(ScalarImpl::Timestamp(Default::default())),
686            Some(ScalarImpl::Time(Default::default())),
687            Some(ScalarImpl::Struct(StructValue::new(vec![
688                Some(ScalarImpl::Int32(1)),
689                Some(ScalarImpl::Float32(3.0.into())),
690            ]))),
691            Some(ScalarImpl::List(ListValue::from_iter([1, 2]))),
692        ]);
693        let row2 = OwnedRow::new(vec![
694            Some(ScalarImpl::Int16(16)),
695            Some(ScalarImpl::Int32(32)),
696            Some(ScalarImpl::Int64(64)),
697            Some(ScalarImpl::Float32(3.2.into())),
698            Some(ScalarImpl::Float64(6.4.into())),
699            Some(ScalarImpl::Utf8("hello".into())),
700            Some(ScalarImpl::Bool(true)),
701            Some(ScalarImpl::Decimal(10.into())),
702            Some(ScalarImpl::Interval(Default::default())),
703            Some(ScalarImpl::Date(Default::default())),
704            Some(ScalarImpl::Timestamp(Default::default())),
705            Some(ScalarImpl::Time(Default::default())),
706            Some(ScalarImpl::Struct(StructValue::new(vec![
707                Some(ScalarImpl::Int32(1)),
708                Some(ScalarImpl::Float32(33333.0.into())), // larger than row1
709            ]))),
710            Some(ScalarImpl::List(ListValue::from_iter([1, 2]))),
711        ]);
712
713        let column_orders = (0..row1.len())
714            .map(|i| ColumnOrder::new(i, OrderType::ascending()))
715            .collect_vec();
716
717        let chunk = DataChunk::from_rows(
718            &[row1, row2],
719            &[
720                DataType::Int16,
721                DataType::Int32,
722                DataType::Int64,
723                DataType::Float32,
724                DataType::Float64,
725                DataType::Varchar,
726                DataType::Boolean,
727                DataType::Decimal,
728                DataType::Interval,
729                DataType::Date,
730                DataType::Timestamp,
731                DataType::Time,
732                StructType::unnamed(vec![DataType::Int32, DataType::Float32]).into(),
733                DataType::List(Box::new(DataType::Int32)),
734            ],
735        );
736        assert_eq!(
737            Ordering::Equal,
738            compare_rows_in_chunk(&chunk, 0, &chunk, 0, &column_orders)
739        );
740        assert_eq!(
741            Ordering::Less,
742            compare_rows_in_chunk(&chunk, 0, &chunk, 1, &column_orders)
743        );
744    }
745
746    fn common_compare_datum<CmpFn>(compare: CmpFn)
747    where
748        CmpFn: Fn(Datum, Datum, OrderType) -> Ordering,
749    {
750        assert_eq!(
751            Ordering::Equal,
752            compare(Some(42.into()), Some(42.into()), OrderType::default(),)
753        );
754        assert_eq!(Ordering::Equal, compare(None, None, OrderType::default(),));
755        assert_eq!(
756            Ordering::Less,
757            compare(Some(42.into()), Some(100.into()), OrderType::ascending(),)
758        );
759        assert_eq!(
760            Ordering::Greater,
761            compare(Some(42.into()), None, OrderType::ascending_nulls_first(),)
762        );
763        assert_eq!(
764            Ordering::Less,
765            compare(Some(42.into()), None, OrderType::ascending_nulls_last(),)
766        );
767        assert_eq!(
768            Ordering::Greater,
769            compare(Some(42.into()), None, OrderType::descending_nulls_first(),)
770        );
771        assert_eq!(
772            Ordering::Less,
773            compare(Some(42.into()), None, OrderType::descending_nulls_last(),)
774        );
775    }
776
777    fn common_compare_rows<CmpFn>(compare: CmpFn)
778    where
779        CmpFn: Fn(OwnedRow, OwnedRow, &[OrderType]) -> Ordering,
780    {
781        assert_eq!(
782            Ordering::Equal,
783            compare(
784                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
785                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
786                &[OrderType::ascending(), OrderType::ascending()],
787            )
788        );
789
790        assert_eq!(
791            Ordering::Greater,
792            compare(
793                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
794                OwnedRow::new(vec![Some(42.into()), Some(100.into())]),
795                &[OrderType::ascending(), OrderType::descending()],
796            )
797        );
798
799        assert_eq!(
800            Ordering::Less,
801            compare(
802                OwnedRow::new(vec![Some(42.into()), Some(42.into())]),
803                OwnedRow::new(vec![Some(42.into()), Some(100.into())]),
804                &[OrderType::ascending(), OrderType::ascending()],
805            )
806        );
807    }
808
809    #[test]
810    fn test_compare_datum() {
811        common_compare_datum(cmp_datum);
812    }
813
814    #[test]
815    fn test_compare_rows() {
816        common_compare_rows(cmp_rows);
817    }
818
819    #[test]
820    fn test_partial_compare_datum() {
821        common_compare_datum(|lhs, rhs, order| partial_cmp_datum(lhs, rhs, order).unwrap());
822
823        assert_eq!(
824            None,
825            partial_cmp_datum(
826                Some(ScalarImpl::from(42)),
827                Some(ScalarImpl::from("abc")),
828                OrderType::default()
829            )
830        )
831    }
832
833    #[test]
834    fn test_partial_compare_rows() {
835        common_compare_rows(|lhs, rhs, orders| partial_cmp_rows(lhs, rhs, orders).unwrap());
836
837        assert_eq!(
838            None,
839            partial_cmp_rows(
840                OwnedRow::new(vec![Some(42.into())]),
841                OwnedRow::new(vec![Some("abc".into())]),
842                &[OrderType::default()]
843            )
844        )
845    }
846}