risingwave_common/util/
memcmp_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use bytes::{Buf, BufMut};
18use itertools::Itertools;
19use risingwave_common_estimate_size::EstimateSize;
20use serde::{Deserialize, Serialize};
21
22use super::iter_util::{ZipEqDebug, ZipEqFast};
23use crate::array::{ArrayImpl, DataChunk};
24use crate::row::{OwnedRow, Row};
25use crate::types::{
26    DataType, Date, Datum, F32, F64, Int256, ScalarImpl, Serial, Time, Timestamp, Timestamptz,
27    ToDatumRef,
28};
29use crate::util::sort_util::{ColumnOrder, OrderType};
30
31// NULL > any non-NULL value by default
32const DEFAULT_NULL_TAG_NONE: u8 = 1;
33const DEFAULT_NULL_TAG_SOME: u8 = 0;
34
35pub(crate) fn serialize_datum(
36    datum: impl ToDatumRef,
37    order: OrderType,
38    serializer: &mut memcomparable::Serializer<impl BufMut>,
39) -> memcomparable::Result<()> {
40    serializer.set_reverse(order.is_descending());
41    let (null_tag_none, null_tag_some) = if order.nulls_are_largest() {
42        (1u8, 0u8) // None > Some
43    } else {
44        (0u8, 1u8) // None < Some
45    };
46    if let Some(scalar) = datum.to_datum_ref() {
47        null_tag_some.serialize(&mut *serializer)?;
48        scalar.serialize(serializer)?;
49    } else {
50        null_tag_none.serialize(serializer)?;
51    }
52    Ok(())
53}
54
55pub(crate) fn serialize_datum_in_composite(
56    datum: impl ToDatumRef,
57    serializer: &mut memcomparable::Serializer<impl BufMut>,
58) -> memcomparable::Result<()> {
59    // NOTE: No need to call `serializer.set_reverse` because we are inside a
60    // composite type value, we should follow the outside order, except for `NULL`s.
61    if let Some(scalar) = datum.to_datum_ref() {
62        DEFAULT_NULL_TAG_SOME.serialize(&mut *serializer)?;
63        scalar.serialize(serializer)?;
64    } else {
65        DEFAULT_NULL_TAG_NONE.serialize(serializer)?;
66    }
67    Ok(())
68}
69
70pub(crate) fn deserialize_datum(
71    ty: &DataType,
72    order: OrderType,
73    deserializer: &mut memcomparable::Deserializer<impl Buf>,
74) -> memcomparable::Result<Datum> {
75    deserializer.set_reverse(order.is_descending());
76    let null_tag = u8::deserialize(&mut *deserializer)?;
77    let (null_tag_none, null_tag_some) = if order.nulls_are_largest() {
78        (1u8, 0u8) // None > Some
79    } else {
80        (0u8, 1u8) // None < Some
81    };
82    if null_tag == null_tag_none {
83        Ok(None)
84    } else if null_tag == null_tag_some {
85        Ok(Some(ScalarImpl::deserialize(ty, deserializer)?))
86    } else {
87        Err(memcomparable::Error::InvalidTagEncoding(null_tag as _))
88    }
89}
90
91pub(crate) fn deserialize_datum_in_composite(
92    ty: &DataType,
93    deserializer: &mut memcomparable::Deserializer<impl Buf>,
94) -> memcomparable::Result<Datum> {
95    // NOTE: Similar to serialization, we should follow the outside order, except for `NULL`s.
96    let null_tag = u8::deserialize(&mut *deserializer)?;
97    if null_tag == DEFAULT_NULL_TAG_NONE {
98        Ok(None)
99    } else if null_tag == DEFAULT_NULL_TAG_SOME {
100        Ok(Some(ScalarImpl::deserialize(ty, deserializer)?))
101    } else {
102        Err(memcomparable::Error::InvalidTagEncoding(null_tag as _))
103    }
104}
105
106/// Deserialize the `data_size` of `input_data_type` in `memcmp_encoding`. This function will
107/// consume the offset of deserializer then return the length (without memcopy, only length
108/// calculation).
109pub(crate) fn calculate_encoded_size(
110    ty: &DataType,
111    order: OrderType,
112    encoded_data: &[u8],
113) -> memcomparable::Result<usize> {
114    let mut deserializer = memcomparable::Deserializer::new(encoded_data);
115    let (null_tag_none, null_tag_some) = if order.nulls_are_largest() {
116        (1u8, 0u8) // None > Some
117    } else {
118        (0u8, 1u8) // None < Some
119    };
120    deserializer.set_reverse(order.is_descending());
121    calculate_encoded_size_inner(ty, null_tag_none, null_tag_some, &mut deserializer)
122}
123
124fn calculate_encoded_size_inner(
125    ty: &DataType,
126    null_tag_none: u8,
127    null_tag_some: u8,
128    deserializer: &mut memcomparable::Deserializer<impl Buf>,
129) -> memcomparable::Result<usize> {
130    let base_position = deserializer.position();
131    let null_tag = u8::deserialize(&mut *deserializer)?;
132    if null_tag == null_tag_none {
133        // deserialize nothing more
134    } else if null_tag == null_tag_some {
135        use std::mem::size_of;
136        let len = match ty {
137            DataType::Int16 => size_of::<i16>(),
138            DataType::Int32 => size_of::<i32>(),
139            DataType::Int64 => size_of::<i64>(),
140            DataType::Serial => size_of::<Serial>(),
141            DataType::Float32 => size_of::<F32>(),
142            DataType::Float64 => size_of::<F64>(),
143            DataType::Date => size_of::<Date>(),
144            DataType::Time => size_of::<Time>(),
145            DataType::Timestamp => size_of::<Timestamp>(),
146            DataType::Timestamptz => size_of::<Timestamptz>(),
147            DataType::Boolean => size_of::<u8>(),
148            // Interval is serialized as (i32, i32, i64)
149            DataType::Interval => size_of::<(i32, i32, i64)>(),
150            DataType::Decimal => {
151                deserializer.deserialize_decimal()?;
152                0 // the len is not used since decimal is not a fixed length type
153            }
154            // these types are var-length and should only be determine at runtime.
155            // TODO: need some test for this case (e.g. e2e test)
156            DataType::List { .. } | DataType::Map(_) => deserializer.skip_bytes()?,
157            DataType::Struct(t) => t
158                .types()
159                .map(|field| {
160                    // use default null tags inside composite type
161                    calculate_encoded_size_inner(
162                        field,
163                        DEFAULT_NULL_TAG_NONE,
164                        DEFAULT_NULL_TAG_SOME,
165                        deserializer,
166                    )
167                })
168                .try_fold(0, |a, b| b.map(|b| a + b))?,
169            DataType::Jsonb => deserializer.skip_bytes()?,
170            DataType::Varchar => deserializer.skip_bytes()?,
171            DataType::Bytea => deserializer.skip_bytes()?,
172            DataType::Int256 => Int256::MEMCMP_ENCODED_SIZE,
173        };
174
175        // consume offset of fixed_type
176        if deserializer.position() == base_position + 1 {
177            // fixed type
178            deserializer.advance(len);
179        }
180    } else {
181        return Err(memcomparable::Error::InvalidTagEncoding(null_tag as _));
182    }
183
184    Ok(deserializer.position() - base_position)
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, EstimateSize)]
188pub struct MemcmpEncoded(Box<[u8]>);
189
190impl MemcmpEncoded {
191    pub fn as_inner(&self) -> &[u8] {
192        &self.0
193    }
194
195    pub fn into_inner(self) -> Box<[u8]> {
196        self.0
197    }
198}
199
200impl AsRef<[u8]> for MemcmpEncoded {
201    fn as_ref(&self) -> &[u8] {
202        &self.0
203    }
204}
205
206impl Deref for MemcmpEncoded {
207    type Target = [u8];
208
209    fn deref(&self) -> &Self::Target {
210        &self.0
211    }
212}
213
214impl IntoIterator for MemcmpEncoded {
215    type IntoIter = std::vec::IntoIter<Self::Item>;
216    type Item = u8;
217
218    fn into_iter(self) -> Self::IntoIter {
219        self.0.into_vec().into_iter()
220    }
221}
222
223impl FromIterator<u8> for MemcmpEncoded {
224    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
225        Self(iter.into_iter().collect())
226    }
227}
228
229impl From<Vec<u8>> for MemcmpEncoded {
230    fn from(v: Vec<u8>) -> Self {
231        Self(v.into_boxed_slice())
232    }
233}
234
235impl From<Box<[u8]>> for MemcmpEncoded {
236    fn from(v: Box<[u8]>) -> Self {
237        Self(v)
238    }
239}
240
241impl From<MemcmpEncoded> for Vec<u8> {
242    fn from(v: MemcmpEncoded) -> Self {
243        v.0.into()
244    }
245}
246
247impl From<MemcmpEncoded> for Box<[u8]> {
248    fn from(v: MemcmpEncoded) -> Self {
249        v.0
250    }
251}
252
253/// Encode a datum into memcomparable format.
254pub fn encode_value(
255    value: impl ToDatumRef,
256    order: OrderType,
257) -> memcomparable::Result<MemcmpEncoded> {
258    let mut serializer = memcomparable::Serializer::new(vec![]);
259    serialize_datum(value, order, &mut serializer)?;
260    Ok(serializer.into_inner().into())
261}
262
263/// Decode a datum from memcomparable format.
264pub fn decode_value(
265    ty: &DataType,
266    encoded_value: &[u8],
267    order: OrderType,
268) -> memcomparable::Result<Datum> {
269    let mut deserializer = memcomparable::Deserializer::new(encoded_value);
270    deserialize_datum(ty, order, &mut deserializer)
271}
272
273/// Encode an array into memcomparable format.
274pub fn encode_array(
275    array: &ArrayImpl,
276    order: OrderType,
277) -> memcomparable::Result<Vec<MemcmpEncoded>> {
278    let mut data = Vec::with_capacity(array.len());
279    for datum in array.iter() {
280        data.push(encode_value(datum, order)?);
281    }
282    Ok(data)
283}
284
285/// Encode a chunk into memcomparable format.
286pub fn encode_chunk(
287    chunk: &DataChunk,
288    column_orders: &[ColumnOrder],
289) -> memcomparable::Result<Vec<MemcmpEncoded>> {
290    let encoded_columns: Vec<_> = column_orders
291        .iter()
292        .map(|o| encode_array(chunk.column_at(o.column_index), o.order_type))
293        .try_collect()?;
294
295    let mut encoded_chunk = vec![vec![]; chunk.capacity()];
296    for encoded_column in encoded_columns {
297        for (encoded_row, data) in encoded_chunk.iter_mut().zip_eq_fast(encoded_column) {
298            encoded_row.extend(data);
299        }
300    }
301
302    Ok(encoded_chunk.into_iter().map(Into::into).collect())
303}
304
305/// Encode a row into memcomparable format.
306pub fn encode_row(
307    row: impl Row,
308    order_types: &[OrderType],
309) -> memcomparable::Result<MemcmpEncoded> {
310    let mut serializer = memcomparable::Serializer::new(vec![]);
311    row.iter()
312        .zip_eq_debug(order_types)
313        .try_for_each(|(datum, order)| serialize_datum(datum, *order, &mut serializer))?;
314    Ok(serializer.into_inner().into())
315}
316
317/// Decode a row from memcomparable format.
318pub fn decode_row(
319    encoded_row: &[u8],
320    data_types: &[DataType],
321    order_types: &[OrderType],
322) -> memcomparable::Result<OwnedRow> {
323    let mut deserializer = memcomparable::Deserializer::new(encoded_row);
324    let row_data = data_types
325        .iter()
326        .zip_eq_debug(order_types)
327        .map(|(dt, ot)| deserialize_datum(dt, *ot, &mut deserializer))
328        .try_collect()?;
329    Ok(OwnedRow::new(row_data))
330}
331
332#[cfg(test)]
333mod tests {
334    use std::ops::Neg;
335
336    use rand::rng as thread_rng;
337
338    use super::*;
339    use crate::array::{ListValue, StructValue};
340    use crate::row::RowExt;
341    use crate::types::FloatExt;
342
343    #[test]
344    fn test_memcomparable() {
345        fn encode_num(num: Option<i32>, order_type: OrderType) -> MemcmpEncoded {
346            encode_value(num.map(ScalarImpl::from), order_type).unwrap()
347        }
348
349        {
350            // default ascending
351            let order_type = OrderType::ascending();
352            let memcmp_minus_1 = encode_num(Some(-1), order_type);
353            let memcmp_3874 = encode_num(Some(3874), order_type);
354            let memcmp_45745 = encode_num(Some(45745), order_type);
355            let memcmp_i32_min = encode_num(Some(i32::MIN), order_type);
356            let memcmp_i32_max = encode_num(Some(i32::MAX), order_type);
357            let memcmp_none = encode_num(None, order_type);
358
359            assert!(memcmp_3874 < memcmp_45745);
360            assert!(memcmp_3874 < memcmp_i32_max);
361            assert!(memcmp_45745 < memcmp_i32_max);
362
363            assert!(memcmp_i32_min < memcmp_i32_max);
364            assert!(memcmp_i32_min < memcmp_3874);
365            assert!(memcmp_i32_min < memcmp_45745);
366
367            assert!(memcmp_minus_1 < memcmp_3874);
368            assert!(memcmp_minus_1 < memcmp_45745);
369            assert!(memcmp_minus_1 < memcmp_i32_max);
370            assert!(memcmp_minus_1 > memcmp_i32_min);
371
372            assert!(memcmp_none > memcmp_minus_1);
373            assert!(memcmp_none > memcmp_3874);
374            assert!(memcmp_none > memcmp_i32_min);
375            assert!(memcmp_none > memcmp_i32_max);
376        }
377        {
378            // default descending
379            let order_type = OrderType::descending();
380            let memcmp_minus_1 = encode_num(Some(-1), order_type);
381            let memcmp_3874 = encode_num(Some(3874), order_type);
382            let memcmp_none = encode_num(None, order_type);
383
384            assert!(memcmp_none < memcmp_minus_1);
385            assert!(memcmp_none < memcmp_3874);
386            assert!(memcmp_3874 < memcmp_minus_1);
387        }
388        {
389            // ASC NULLS FIRST (NULLS SMALLEST)
390            let order_type = OrderType::ascending_nulls_first();
391            let memcmp_minus_1 = encode_num(Some(-1), order_type);
392            let memcmp_3874 = encode_num(Some(3874), order_type);
393            let memcmp_none = encode_num(None, order_type);
394            assert!(memcmp_none < memcmp_minus_1);
395            assert!(memcmp_none < memcmp_3874);
396        }
397        {
398            // ASC NULLS LAST (NULLS LARGEST)
399            let order_type = OrderType::ascending_nulls_last();
400            let memcmp_minus_1 = encode_num(Some(-1), order_type);
401            let memcmp_3874 = encode_num(Some(3874), order_type);
402            let memcmp_none = encode_num(None, order_type);
403            assert!(memcmp_none > memcmp_minus_1);
404            assert!(memcmp_none > memcmp_3874);
405        }
406        {
407            // DESC NULLS FIRST (NULLS LARGEST)
408            let order_type = OrderType::descending_nulls_first();
409            let memcmp_minus_1 = encode_num(Some(-1), order_type);
410            let memcmp_3874 = encode_num(Some(3874), order_type);
411            let memcmp_none = encode_num(None, order_type);
412            assert!(memcmp_none < memcmp_minus_1);
413            assert!(memcmp_none < memcmp_3874);
414        }
415        {
416            // DESC NULLS LAST (NULLS SMALLEST)
417            let order_type = OrderType::descending_nulls_last();
418            let memcmp_minus_1 = encode_num(Some(-1), order_type);
419            let memcmp_3874 = encode_num(Some(3874), order_type);
420            let memcmp_none = encode_num(None, order_type);
421            assert!(memcmp_none > memcmp_minus_1);
422            assert!(memcmp_none > memcmp_3874);
423        }
424    }
425
426    #[test]
427    fn test_memcomparable_structs() {
428        // NOTE: `NULL`s inside composite type values are always the largest.
429
430        let struct_none = Datum::None;
431        let struct_1 = Datum::Some(
432            StructValue::new(vec![Some(ScalarImpl::from(1)), Some(ScalarImpl::from(2))]).into(),
433        );
434        let struct_2 = Datum::Some(
435            StructValue::new(vec![Some(ScalarImpl::from(1)), Some(ScalarImpl::from(3))]).into(),
436        );
437        let struct_3 = Datum::Some(StructValue::new(vec![Some(ScalarImpl::from(1)), None]).into());
438
439        {
440            // ASC NULLS FIRST (NULLS SMALLEST)
441            let order_type = OrderType::ascending_nulls_first();
442            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
443            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
444            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
445            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
446            assert!(memcmp_struct_none < memcmp_struct_1);
447            assert!(memcmp_struct_1 < memcmp_struct_2);
448            assert!(memcmp_struct_2 < memcmp_struct_3);
449        }
450        {
451            // ASC NULLS LAST (NULLS LARGEST)
452            let order_type = OrderType::ascending_nulls_last();
453            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
454            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
455            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
456            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
457            assert!(memcmp_struct_1 < memcmp_struct_2);
458            assert!(memcmp_struct_2 < memcmp_struct_3);
459            assert!(memcmp_struct_3 < memcmp_struct_none);
460        }
461        {
462            // DESC NULLS FIRST (NULLS LARGEST)
463            let order_type = OrderType::descending_nulls_first();
464            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
465            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
466            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
467            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
468            assert!(memcmp_struct_none < memcmp_struct_3);
469            assert!(memcmp_struct_3 < memcmp_struct_2);
470            assert!(memcmp_struct_2 < memcmp_struct_1);
471        }
472        {
473            // DESC NULLS LAST (NULLS SMALLEST)
474            let order_type = OrderType::descending_nulls_last();
475            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
476            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
477            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
478            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
479            assert!(memcmp_struct_3 < memcmp_struct_2);
480            assert!(memcmp_struct_2 < memcmp_struct_1);
481            assert!(memcmp_struct_1 < memcmp_struct_none);
482        }
483    }
484
485    #[test]
486    fn test_memcomparable_lists() {
487        // NOTE: `NULL`s inside composite type values are always the largest.
488
489        let list_none = Datum::None;
490        let list_1 = Datum::Some(ListValue::from_iter([1, 2]).into());
491        let list_2 = Datum::Some(ListValue::from_iter([1, 3]).into());
492        let list_3 = Datum::Some(ListValue::from_iter([Some(1), None]).into());
493
494        {
495            // ASC NULLS FIRST (NULLS SMALLEST)
496            let order_type = OrderType::ascending_nulls_first();
497            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
498            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
499            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
500            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
501            assert!(memcmp_list_none < memcmp_list_1);
502            assert!(memcmp_list_1 < memcmp_list_2);
503            assert!(memcmp_list_2 < memcmp_list_3);
504        }
505        {
506            // ASC NULLS LAST (NULLS LARGEST)
507            let order_type = OrderType::ascending_nulls_last();
508            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
509            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
510            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
511            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
512            assert!(memcmp_list_1 < memcmp_list_2);
513            assert!(memcmp_list_2 < memcmp_list_3);
514            assert!(memcmp_list_3 < memcmp_list_none);
515        }
516        {
517            // DESC NULLS FIRST (NULLS LARGEST)
518            let order_type = OrderType::descending_nulls_first();
519            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
520            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
521            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
522            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
523            assert!(memcmp_list_none < memcmp_list_3);
524            assert!(memcmp_list_3 < memcmp_list_2);
525            assert!(memcmp_list_2 < memcmp_list_1);
526        }
527        {
528            // DESC NULLS LAST (NULLS SMALLEST)
529            let order_type = OrderType::descending_nulls_last();
530            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
531            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
532            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
533            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
534            assert!(memcmp_list_3 < memcmp_list_2);
535            assert!(memcmp_list_2 < memcmp_list_1);
536            assert!(memcmp_list_1 < memcmp_list_none);
537        }
538    }
539
540    #[test]
541    fn test_issue_legacy_2057_ordered_float_memcomparable() {
542        use num_traits::*;
543        use rand::seq::SliceRandom;
544
545        fn serialize(f: F32) -> MemcmpEncoded {
546            encode_value(Some(ScalarImpl::from(f)), OrderType::default()).unwrap()
547        }
548
549        fn deserialize(data: MemcmpEncoded) -> F32 {
550            decode_value(&DataType::Float32, &data, OrderType::default())
551                .unwrap()
552                .unwrap()
553                .into_float32()
554        }
555
556        let floats = vec![
557            // -inf
558            F32::neg_infinity(),
559            // -1
560            F32::one().neg(),
561            // 0, -0 should be treated the same
562            F32::zero(),
563            F32::neg_zero(),
564            F32::zero(),
565            // 1
566            F32::one(),
567            // inf
568            F32::infinity(),
569            // nan, -nan should be treated the same
570            F32::nan(),
571            F32::nan().neg(),
572            F32::nan(),
573        ];
574        assert!(floats.is_sorted());
575
576        let mut floats_clone = floats.clone();
577        floats_clone.shuffle(&mut thread_rng());
578        floats_clone.sort();
579        assert_eq!(floats, floats_clone);
580
581        let memcomparables = floats.clone().into_iter().map(serialize).collect_vec();
582        assert!(memcomparables.is_sorted());
583
584        let decoded_floats = memcomparables.into_iter().map(deserialize).collect_vec();
585        assert!(decoded_floats.is_sorted());
586        assert_eq!(floats, decoded_floats);
587    }
588
589    #[test]
590    fn test_encode_row() {
591        let v10 = Some(ScalarImpl::Int32(42));
592        let v10_cloned = v10.clone();
593        let v11 = Some(ScalarImpl::Utf8("hello".into()));
594        let v11_cloned = v11.clone();
595        let v12 = Some(ScalarImpl::Float32(4.0.into()));
596        let v20 = Some(ScalarImpl::Int32(42));
597        let v21 = Some(ScalarImpl::Utf8("hell".into()));
598        let v22 = Some(ScalarImpl::Float32(3.0.into()));
599
600        let row1 = OwnedRow::new(vec![v10, v11, v12]);
601        let row2 = OwnedRow::new(vec![v20, v21, v22]);
602        let order_col_indices = vec![0, 1];
603        let order_types = vec![OrderType::ascending(), OrderType::descending()];
604
605        let encoded_row1 = encode_row(row1.project(&order_col_indices), &order_types).unwrap();
606        let encoded_v10 = encode_value(
607            v10_cloned.as_ref().map(|x| x.as_scalar_ref_impl()),
608            OrderType::ascending(),
609        )
610        .unwrap();
611        let encoded_v11 = encode_value(
612            v11_cloned.as_ref().map(|x| x.as_scalar_ref_impl()),
613            OrderType::descending(),
614        )
615        .unwrap();
616        let concated_encoded_row1 = encoded_v10.into_iter().chain(encoded_v11).collect();
617        assert_eq!(encoded_row1, concated_encoded_row1);
618
619        let encoded_row2 = encode_row(row2.project(&order_col_indices), &order_types).unwrap();
620        assert!(encoded_row1 < encoded_row2);
621    }
622
623    // See also `row_value_encode_decode()` in `src/common/src/row/owned_row.rs`
624    #[test]
625    fn test_decode_row() {
626        let encoded: Vec<u8> = vec![
627            0, 128, 0, 0, 42, 255, 127, 255, 255, 255, 255, 255, 255, 213, 1, 0, 193, 186, 163,
628            215, 255, 254, 153, 144, 144, 144, 144, 144, 255, 255, 249, 0, 1, 98, 97, 97, 97, 97,
629            114, 0, 0, 6,
630        ];
631
632        let order_types = vec![
633            OrderType::ascending(),
634            OrderType::descending(),
635            OrderType::ascending(),
636            OrderType::ascending(),
637            OrderType::descending(),
638            OrderType::ascending(),
639        ];
640        let data_types = vec![
641            DataType::Int32,
642            DataType::Int64,
643            DataType::Timestamp,
644            DataType::Float32,
645            DataType::Varchar,
646            DataType::Bytea,
647        ];
648
649        let result = decode_row(&encoded, &data_types, &order_types).unwrap();
650        // println!("{:?}", &result);
651
652        let expected = OwnedRow::new(vec![
653            Some(ScalarImpl::Int32(42)),
654            Some(ScalarImpl::Int64(42)),
655            None,
656            Some(ScalarImpl::Float32(23.33.into())),
657            Some(ScalarImpl::Utf8("fooooo".into())),
658            Some(ScalarImpl::Bytea("baaaar".as_bytes().into())),
659        ]);
660        assert_eq!(&result, &expected);
661    }
662
663    #[test]
664    fn test_encode_chunk() {
665        let v10 = Some(ScalarImpl::Int32(42));
666        let v11 = Some(ScalarImpl::Utf8("hello".into()));
667        let v12 = Some(ScalarImpl::Float32(4.0.into()));
668        let v20 = Some(ScalarImpl::Int32(42));
669        let v21 = Some(ScalarImpl::Utf8("hell".into()));
670        let v22 = Some(ScalarImpl::Float32(3.0.into()));
671
672        let row1 = OwnedRow::new(vec![v10, v11, v12]);
673        let row2 = OwnedRow::new(vec![v20, v21, v22]);
674        let chunk = DataChunk::from_rows(
675            &[row1.clone(), row2.clone()],
676            &[DataType::Int32, DataType::Varchar, DataType::Float32],
677        );
678        let order_col_indices = vec![0, 1];
679        let order_types = vec![OrderType::ascending(), OrderType::descending()];
680        let column_orders = order_col_indices
681            .iter()
682            .zip_eq_fast(&order_types)
683            .map(|(i, o)| ColumnOrder::new(*i, *o))
684            .collect_vec();
685
686        let encoded_row1 = encode_row(row1.project(&order_col_indices), &order_types).unwrap();
687        let encoded_row2 = encode_row(row2.project(&order_col_indices), &order_types).unwrap();
688        let encoded_chunk = encode_chunk(&chunk, &column_orders).unwrap();
689        assert_eq!(&encoded_chunk, &[encoded_row1, encoded_row2]);
690    }
691}