risingwave_common/util/
memcmp_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use bytes::{Buf, BufMut};
18use itertools::Itertools;
19use risingwave_common_estimate_size::EstimateSize;
20use serde::{Deserialize, Serialize};
21
22use super::iter_util::{ZipEqDebug, ZipEqFast};
23use crate::array::{ArrayImpl, DataChunk};
24use crate::row::{OwnedRow, Row};
25use crate::types::{
26    DataType, Date, Datum, F32, F64, Int256, ScalarImpl, Serial, Time, Timestamp, Timestamptz,
27    ToDatumRef,
28};
29use crate::util::sort_util::{ColumnOrder, OrderType};
30
31// NULL > any non-NULL value by default
32const DEFAULT_NULL_TAG_NONE: u8 = 1;
33const DEFAULT_NULL_TAG_SOME: u8 = 0;
34
35pub(crate) fn serialize_datum(
36    datum: impl ToDatumRef,
37    order: OrderType,
38    serializer: &mut memcomparable::Serializer<impl BufMut>,
39) -> memcomparable::Result<()> {
40    serializer.set_reverse(order.is_descending());
41    let (null_tag_none, null_tag_some) = if order.nulls_are_largest() {
42        (1u8, 0u8) // None > Some
43    } else {
44        (0u8, 1u8) // None < Some
45    };
46    if let Some(scalar) = datum.to_datum_ref() {
47        null_tag_some.serialize(&mut *serializer)?;
48        scalar.serialize(serializer)?;
49    } else {
50        null_tag_none.serialize(serializer)?;
51    }
52    Ok(())
53}
54
55pub(crate) fn serialize_datum_in_composite(
56    datum: impl ToDatumRef,
57    serializer: &mut memcomparable::Serializer<impl BufMut>,
58) -> memcomparable::Result<()> {
59    // NOTE: No need to call `serializer.set_reverse` because we are inside a
60    // composite type value, we should follow the outside order, except for `NULL`s.
61    if let Some(scalar) = datum.to_datum_ref() {
62        DEFAULT_NULL_TAG_SOME.serialize(&mut *serializer)?;
63        scalar.serialize(serializer)?;
64    } else {
65        DEFAULT_NULL_TAG_NONE.serialize(serializer)?;
66    }
67    Ok(())
68}
69
70pub(crate) fn deserialize_datum(
71    ty: &DataType,
72    order: OrderType,
73    deserializer: &mut memcomparable::Deserializer<impl Buf>,
74) -> memcomparable::Result<Datum> {
75    deserializer.set_reverse(order.is_descending());
76    let null_tag = u8::deserialize(&mut *deserializer)?;
77    let (null_tag_none, null_tag_some) = if order.nulls_are_largest() {
78        (1u8, 0u8) // None > Some
79    } else {
80        (0u8, 1u8) // None < Some
81    };
82    if null_tag == null_tag_none {
83        Ok(None)
84    } else if null_tag == null_tag_some {
85        Ok(Some(ScalarImpl::deserialize(ty, deserializer)?))
86    } else {
87        Err(memcomparable::Error::InvalidTagEncoding(null_tag as _))
88    }
89}
90
91pub(crate) fn deserialize_datum_in_composite(
92    ty: &DataType,
93    deserializer: &mut memcomparable::Deserializer<impl Buf>,
94) -> memcomparable::Result<Datum> {
95    // NOTE: Similar to serialization, we should follow the outside order, except for `NULL`s.
96    let null_tag = u8::deserialize(&mut *deserializer)?;
97    if null_tag == DEFAULT_NULL_TAG_NONE {
98        Ok(None)
99    } else if null_tag == DEFAULT_NULL_TAG_SOME {
100        Ok(Some(ScalarImpl::deserialize(ty, deserializer)?))
101    } else {
102        Err(memcomparable::Error::InvalidTagEncoding(null_tag as _))
103    }
104}
105
106/// Deserialize the `data_size` of `input_data_type` in `memcmp_encoding`. This function will
107/// consume the offset of deserializer then return the length (without memcopy, only length
108/// calculation).
109pub(crate) fn calculate_encoded_size(
110    ty: &DataType,
111    order: OrderType,
112    encoded_data: &[u8],
113) -> memcomparable::Result<usize> {
114    let mut deserializer = memcomparable::Deserializer::new(encoded_data);
115    let (null_tag_none, null_tag_some) = if order.nulls_are_largest() {
116        (1u8, 0u8) // None > Some
117    } else {
118        (0u8, 1u8) // None < Some
119    };
120    deserializer.set_reverse(order.is_descending());
121    calculate_encoded_size_inner(ty, null_tag_none, null_tag_some, &mut deserializer)
122}
123
124fn calculate_encoded_size_inner(
125    ty: &DataType,
126    null_tag_none: u8,
127    null_tag_some: u8,
128    deserializer: &mut memcomparable::Deserializer<impl Buf>,
129) -> memcomparable::Result<usize> {
130    let base_position = deserializer.position();
131    let null_tag = u8::deserialize(&mut *deserializer)?;
132    if null_tag == null_tag_none {
133        // deserialize nothing more
134    } else if null_tag == null_tag_some {
135        use std::mem::size_of;
136        let len = match ty {
137            DataType::Int16 => size_of::<i16>(),
138            DataType::Int32 => size_of::<i32>(),
139            DataType::Int64 => size_of::<i64>(),
140            DataType::Serial => size_of::<Serial>(),
141            DataType::Float32 => size_of::<F32>(),
142            DataType::Float64 => size_of::<F64>(),
143            DataType::Date => size_of::<Date>(),
144            DataType::Time => size_of::<Time>(),
145            DataType::Timestamp => size_of::<Timestamp>(),
146            DataType::Timestamptz => size_of::<Timestamptz>(),
147            DataType::Boolean => size_of::<u8>(),
148            // Interval is serialized as (i32, i32, i64)
149            DataType::Interval => size_of::<(i32, i32, i64)>(),
150            DataType::Decimal => {
151                deserializer.deserialize_decimal()?;
152                0 // the len is not used since decimal is not a fixed length type
153            }
154            // these types are var-length and should only be determine at runtime.
155            // TODO: need some test for this case (e.g. e2e test)
156            DataType::List { .. } | DataType::Map(_) => deserializer.skip_bytes()?,
157            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
158            DataType::Struct(t) => t
159                .types()
160                .map(|field| {
161                    // use default null tags inside composite type
162                    calculate_encoded_size_inner(
163                        field,
164                        DEFAULT_NULL_TAG_NONE,
165                        DEFAULT_NULL_TAG_SOME,
166                        deserializer,
167                    )
168                })
169                .try_fold(0, |a, b| b.map(|b| a + b))?,
170            DataType::Jsonb => deserializer.skip_bytes()?,
171            DataType::Varchar => deserializer.skip_bytes()?,
172            DataType::Bytea => deserializer.skip_bytes()?,
173            DataType::Int256 => Int256::MEMCMP_ENCODED_SIZE,
174        };
175
176        // consume offset of fixed_type
177        if deserializer.position() == base_position + 1 {
178            // fixed type
179            deserializer.advance(len);
180        }
181    } else {
182        return Err(memcomparable::Error::InvalidTagEncoding(null_tag as _));
183    }
184
185    Ok(deserializer.position() - base_position)
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, EstimateSize)]
189pub struct MemcmpEncoded(Box<[u8]>);
190
191impl MemcmpEncoded {
192    pub fn as_inner(&self) -> &[u8] {
193        &self.0
194    }
195
196    pub fn into_inner(self) -> Box<[u8]> {
197        self.0
198    }
199}
200
201impl AsRef<[u8]> for MemcmpEncoded {
202    fn as_ref(&self) -> &[u8] {
203        &self.0
204    }
205}
206
207impl Deref for MemcmpEncoded {
208    type Target = [u8];
209
210    fn deref(&self) -> &Self::Target {
211        &self.0
212    }
213}
214
215impl IntoIterator for MemcmpEncoded {
216    type IntoIter = std::vec::IntoIter<Self::Item>;
217    type Item = u8;
218
219    fn into_iter(self) -> Self::IntoIter {
220        self.0.into_vec().into_iter()
221    }
222}
223
224impl FromIterator<u8> for MemcmpEncoded {
225    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
226        Self(iter.into_iter().collect())
227    }
228}
229
230impl From<Vec<u8>> for MemcmpEncoded {
231    fn from(v: Vec<u8>) -> Self {
232        Self(v.into_boxed_slice())
233    }
234}
235
236impl From<Box<[u8]>> for MemcmpEncoded {
237    fn from(v: Box<[u8]>) -> Self {
238        Self(v)
239    }
240}
241
242impl From<MemcmpEncoded> for Vec<u8> {
243    fn from(v: MemcmpEncoded) -> Self {
244        v.0.into()
245    }
246}
247
248impl From<MemcmpEncoded> for Box<[u8]> {
249    fn from(v: MemcmpEncoded) -> Self {
250        v.0
251    }
252}
253
254/// Encode a datum into memcomparable format.
255pub fn encode_value(
256    value: impl ToDatumRef,
257    order: OrderType,
258) -> memcomparable::Result<MemcmpEncoded> {
259    let mut serializer = memcomparable::Serializer::new(vec![]);
260    serialize_datum(value, order, &mut serializer)?;
261    Ok(serializer.into_inner().into())
262}
263
264/// Decode a datum from memcomparable format.
265pub fn decode_value(
266    ty: &DataType,
267    encoded_value: &[u8],
268    order: OrderType,
269) -> memcomparable::Result<Datum> {
270    let mut deserializer = memcomparable::Deserializer::new(encoded_value);
271    deserialize_datum(ty, order, &mut deserializer)
272}
273
274/// Encode an array into memcomparable format.
275pub fn encode_array(
276    array: &ArrayImpl,
277    order: OrderType,
278) -> memcomparable::Result<Vec<MemcmpEncoded>> {
279    let mut data = Vec::with_capacity(array.len());
280    for datum in array.iter() {
281        data.push(encode_value(datum, order)?);
282    }
283    Ok(data)
284}
285
286/// Encode a chunk into memcomparable format.
287pub fn encode_chunk(
288    chunk: &DataChunk,
289    column_orders: &[ColumnOrder],
290) -> memcomparable::Result<Vec<MemcmpEncoded>> {
291    let encoded_columns: Vec<_> = column_orders
292        .iter()
293        .map(|o| encode_array(chunk.column_at(o.column_index), o.order_type))
294        .try_collect()?;
295
296    let mut encoded_chunk = vec![vec![]; chunk.capacity()];
297    for encoded_column in encoded_columns {
298        for (encoded_row, data) in encoded_chunk.iter_mut().zip_eq_fast(encoded_column) {
299            encoded_row.extend(data);
300        }
301    }
302
303    Ok(encoded_chunk.into_iter().map(Into::into).collect())
304}
305
306/// Encode a row into memcomparable format.
307pub fn encode_row(
308    row: impl Row,
309    order_types: &[OrderType],
310) -> memcomparable::Result<MemcmpEncoded> {
311    let mut serializer = memcomparable::Serializer::new(vec![]);
312    row.iter()
313        .zip_eq_debug(order_types)
314        .try_for_each(|(datum, order)| serialize_datum(datum, *order, &mut serializer))?;
315    Ok(serializer.into_inner().into())
316}
317
318/// Decode a row from memcomparable format.
319pub fn decode_row(
320    encoded_row: &[u8],
321    data_types: &[DataType],
322    order_types: &[OrderType],
323) -> memcomparable::Result<OwnedRow> {
324    let mut deserializer = memcomparable::Deserializer::new(encoded_row);
325    let row_data = data_types
326        .iter()
327        .zip_eq_debug(order_types)
328        .map(|(dt, ot)| deserialize_datum(dt, *ot, &mut deserializer))
329        .try_collect()?;
330    Ok(OwnedRow::new(row_data))
331}
332
333#[cfg(test)]
334mod tests {
335    use std::ops::Neg;
336
337    use rand::rng as thread_rng;
338
339    use super::*;
340    use crate::array::{ListValue, StructValue};
341    use crate::row::RowExt;
342    use crate::types::FloatExt;
343
344    #[test]
345    fn test_memcomparable() {
346        fn encode_num(num: Option<i32>, order_type: OrderType) -> MemcmpEncoded {
347            encode_value(num.map(ScalarImpl::from), order_type).unwrap()
348        }
349
350        {
351            // default ascending
352            let order_type = OrderType::ascending();
353            let memcmp_minus_1 = encode_num(Some(-1), order_type);
354            let memcmp_3874 = encode_num(Some(3874), order_type);
355            let memcmp_45745 = encode_num(Some(45745), order_type);
356            let memcmp_i32_min = encode_num(Some(i32::MIN), order_type);
357            let memcmp_i32_max = encode_num(Some(i32::MAX), order_type);
358            let memcmp_none = encode_num(None, order_type);
359
360            assert!(memcmp_3874 < memcmp_45745);
361            assert!(memcmp_3874 < memcmp_i32_max);
362            assert!(memcmp_45745 < memcmp_i32_max);
363
364            assert!(memcmp_i32_min < memcmp_i32_max);
365            assert!(memcmp_i32_min < memcmp_3874);
366            assert!(memcmp_i32_min < memcmp_45745);
367
368            assert!(memcmp_minus_1 < memcmp_3874);
369            assert!(memcmp_minus_1 < memcmp_45745);
370            assert!(memcmp_minus_1 < memcmp_i32_max);
371            assert!(memcmp_minus_1 > memcmp_i32_min);
372
373            assert!(memcmp_none > memcmp_minus_1);
374            assert!(memcmp_none > memcmp_3874);
375            assert!(memcmp_none > memcmp_i32_min);
376            assert!(memcmp_none > memcmp_i32_max);
377        }
378        {
379            // default descending
380            let order_type = OrderType::descending();
381            let memcmp_minus_1 = encode_num(Some(-1), order_type);
382            let memcmp_3874 = encode_num(Some(3874), order_type);
383            let memcmp_none = encode_num(None, order_type);
384
385            assert!(memcmp_none < memcmp_minus_1);
386            assert!(memcmp_none < memcmp_3874);
387            assert!(memcmp_3874 < memcmp_minus_1);
388        }
389        {
390            // ASC NULLS FIRST (NULLS SMALLEST)
391            let order_type = OrderType::ascending_nulls_first();
392            let memcmp_minus_1 = encode_num(Some(-1), order_type);
393            let memcmp_3874 = encode_num(Some(3874), order_type);
394            let memcmp_none = encode_num(None, order_type);
395            assert!(memcmp_none < memcmp_minus_1);
396            assert!(memcmp_none < memcmp_3874);
397        }
398        {
399            // ASC NULLS LAST (NULLS LARGEST)
400            let order_type = OrderType::ascending_nulls_last();
401            let memcmp_minus_1 = encode_num(Some(-1), order_type);
402            let memcmp_3874 = encode_num(Some(3874), order_type);
403            let memcmp_none = encode_num(None, order_type);
404            assert!(memcmp_none > memcmp_minus_1);
405            assert!(memcmp_none > memcmp_3874);
406        }
407        {
408            // DESC NULLS FIRST (NULLS LARGEST)
409            let order_type = OrderType::descending_nulls_first();
410            let memcmp_minus_1 = encode_num(Some(-1), order_type);
411            let memcmp_3874 = encode_num(Some(3874), order_type);
412            let memcmp_none = encode_num(None, order_type);
413            assert!(memcmp_none < memcmp_minus_1);
414            assert!(memcmp_none < memcmp_3874);
415        }
416        {
417            // DESC NULLS LAST (NULLS SMALLEST)
418            let order_type = OrderType::descending_nulls_last();
419            let memcmp_minus_1 = encode_num(Some(-1), order_type);
420            let memcmp_3874 = encode_num(Some(3874), order_type);
421            let memcmp_none = encode_num(None, order_type);
422            assert!(memcmp_none > memcmp_minus_1);
423            assert!(memcmp_none > memcmp_3874);
424        }
425    }
426
427    #[test]
428    fn test_memcomparable_structs() {
429        // NOTE: `NULL`s inside composite type values are always the largest.
430
431        let struct_none = Datum::None;
432        let struct_1 = Datum::Some(
433            StructValue::new(vec![Some(ScalarImpl::from(1)), Some(ScalarImpl::from(2))]).into(),
434        );
435        let struct_2 = Datum::Some(
436            StructValue::new(vec![Some(ScalarImpl::from(1)), Some(ScalarImpl::from(3))]).into(),
437        );
438        let struct_3 = Datum::Some(StructValue::new(vec![Some(ScalarImpl::from(1)), None]).into());
439
440        {
441            // ASC NULLS FIRST (NULLS SMALLEST)
442            let order_type = OrderType::ascending_nulls_first();
443            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
444            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
445            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
446            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
447            assert!(memcmp_struct_none < memcmp_struct_1);
448            assert!(memcmp_struct_1 < memcmp_struct_2);
449            assert!(memcmp_struct_2 < memcmp_struct_3);
450        }
451        {
452            // ASC NULLS LAST (NULLS LARGEST)
453            let order_type = OrderType::ascending_nulls_last();
454            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
455            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
456            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
457            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
458            assert!(memcmp_struct_1 < memcmp_struct_2);
459            assert!(memcmp_struct_2 < memcmp_struct_3);
460            assert!(memcmp_struct_3 < memcmp_struct_none);
461        }
462        {
463            // DESC NULLS FIRST (NULLS LARGEST)
464            let order_type = OrderType::descending_nulls_first();
465            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
466            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
467            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
468            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
469            assert!(memcmp_struct_none < memcmp_struct_3);
470            assert!(memcmp_struct_3 < memcmp_struct_2);
471            assert!(memcmp_struct_2 < memcmp_struct_1);
472        }
473        {
474            // DESC NULLS LAST (NULLS SMALLEST)
475            let order_type = OrderType::descending_nulls_last();
476            let memcmp_struct_none = encode_value(&struct_none, order_type).unwrap();
477            let memcmp_struct_1 = encode_value(&struct_1, order_type).unwrap();
478            let memcmp_struct_2 = encode_value(&struct_2, order_type).unwrap();
479            let memcmp_struct_3 = encode_value(&struct_3, order_type).unwrap();
480            assert!(memcmp_struct_3 < memcmp_struct_2);
481            assert!(memcmp_struct_2 < memcmp_struct_1);
482            assert!(memcmp_struct_1 < memcmp_struct_none);
483        }
484    }
485
486    #[test]
487    fn test_memcomparable_lists() {
488        // NOTE: `NULL`s inside composite type values are always the largest.
489
490        let list_none = Datum::None;
491        let list_1 = Datum::Some(ListValue::from_iter([1, 2]).into());
492        let list_2 = Datum::Some(ListValue::from_iter([1, 3]).into());
493        let list_3 = Datum::Some(ListValue::from_iter([Some(1), None]).into());
494
495        {
496            // ASC NULLS FIRST (NULLS SMALLEST)
497            let order_type = OrderType::ascending_nulls_first();
498            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
499            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
500            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
501            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
502            assert!(memcmp_list_none < memcmp_list_1);
503            assert!(memcmp_list_1 < memcmp_list_2);
504            assert!(memcmp_list_2 < memcmp_list_3);
505        }
506        {
507            // ASC NULLS LAST (NULLS LARGEST)
508            let order_type = OrderType::ascending_nulls_last();
509            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
510            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
511            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
512            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
513            assert!(memcmp_list_1 < memcmp_list_2);
514            assert!(memcmp_list_2 < memcmp_list_3);
515            assert!(memcmp_list_3 < memcmp_list_none);
516        }
517        {
518            // DESC NULLS FIRST (NULLS LARGEST)
519            let order_type = OrderType::descending_nulls_first();
520            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
521            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
522            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
523            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
524            assert!(memcmp_list_none < memcmp_list_3);
525            assert!(memcmp_list_3 < memcmp_list_2);
526            assert!(memcmp_list_2 < memcmp_list_1);
527        }
528        {
529            // DESC NULLS LAST (NULLS SMALLEST)
530            let order_type = OrderType::descending_nulls_last();
531            let memcmp_list_none = encode_value(&list_none, order_type).unwrap();
532            let memcmp_list_1 = encode_value(&list_1, order_type).unwrap();
533            let memcmp_list_2 = encode_value(&list_2, order_type).unwrap();
534            let memcmp_list_3 = encode_value(&list_3, order_type).unwrap();
535            assert!(memcmp_list_3 < memcmp_list_2);
536            assert!(memcmp_list_2 < memcmp_list_1);
537            assert!(memcmp_list_1 < memcmp_list_none);
538        }
539    }
540
541    #[test]
542    fn test_issue_legacy_2057_ordered_float_memcomparable() {
543        use num_traits::*;
544        use rand::seq::SliceRandom;
545
546        fn serialize(f: F32) -> MemcmpEncoded {
547            encode_value(Some(ScalarImpl::from(f)), OrderType::default()).unwrap()
548        }
549
550        fn deserialize(data: MemcmpEncoded) -> F32 {
551            decode_value(&DataType::Float32, &data, OrderType::default())
552                .unwrap()
553                .unwrap()
554                .into_float32()
555        }
556
557        let floats = vec![
558            // -inf
559            F32::neg_infinity(),
560            // -1
561            F32::one().neg(),
562            // 0, -0 should be treated the same
563            F32::zero(),
564            F32::neg_zero(),
565            F32::zero(),
566            // 1
567            F32::one(),
568            // inf
569            F32::infinity(),
570            // nan, -nan should be treated the same
571            F32::nan(),
572            F32::nan().neg(),
573            F32::nan(),
574        ];
575        assert!(floats.is_sorted());
576
577        let mut floats_clone = floats.clone();
578        floats_clone.shuffle(&mut thread_rng());
579        floats_clone.sort();
580        assert_eq!(floats, floats_clone);
581
582        let memcomparables = floats.clone().into_iter().map(serialize).collect_vec();
583        assert!(memcomparables.is_sorted());
584
585        let decoded_floats = memcomparables.into_iter().map(deserialize).collect_vec();
586        assert!(decoded_floats.is_sorted());
587        assert_eq!(floats, decoded_floats);
588    }
589
590    #[test]
591    fn test_encode_row() {
592        let v10 = Some(ScalarImpl::Int32(42));
593        let v10_cloned = v10.clone();
594        let v11 = Some(ScalarImpl::Utf8("hello".into()));
595        let v11_cloned = v11.clone();
596        let v12 = Some(ScalarImpl::Float32(4.0.into()));
597        let v20 = Some(ScalarImpl::Int32(42));
598        let v21 = Some(ScalarImpl::Utf8("hell".into()));
599        let v22 = Some(ScalarImpl::Float32(3.0.into()));
600
601        let row1 = OwnedRow::new(vec![v10, v11, v12]);
602        let row2 = OwnedRow::new(vec![v20, v21, v22]);
603        let order_col_indices = vec![0, 1];
604        let order_types = vec![OrderType::ascending(), OrderType::descending()];
605
606        let encoded_row1 = encode_row(row1.project(&order_col_indices), &order_types).unwrap();
607        let encoded_v10 = encode_value(
608            v10_cloned.as_ref().map(|x| x.as_scalar_ref_impl()),
609            OrderType::ascending(),
610        )
611        .unwrap();
612        let encoded_v11 = encode_value(
613            v11_cloned.as_ref().map(|x| x.as_scalar_ref_impl()),
614            OrderType::descending(),
615        )
616        .unwrap();
617        let concated_encoded_row1 = encoded_v10.into_iter().chain(encoded_v11).collect();
618        assert_eq!(encoded_row1, concated_encoded_row1);
619
620        let encoded_row2 = encode_row(row2.project(&order_col_indices), &order_types).unwrap();
621        assert!(encoded_row1 < encoded_row2);
622    }
623
624    // See also `row_value_encode_decode()` in `src/common/src/row/owned_row.rs`
625    #[test]
626    fn test_decode_row() {
627        let encoded: Vec<u8> = vec![
628            0, 128, 0, 0, 42, 255, 127, 255, 255, 255, 255, 255, 255, 213, 1, 0, 193, 186, 163,
629            215, 255, 254, 153, 144, 144, 144, 144, 144, 255, 255, 249, 0, 1, 98, 97, 97, 97, 97,
630            114, 0, 0, 6,
631        ];
632
633        let order_types = vec![
634            OrderType::ascending(),
635            OrderType::descending(),
636            OrderType::ascending(),
637            OrderType::ascending(),
638            OrderType::descending(),
639            OrderType::ascending(),
640        ];
641        let data_types = vec![
642            DataType::Int32,
643            DataType::Int64,
644            DataType::Timestamp,
645            DataType::Float32,
646            DataType::Varchar,
647            DataType::Bytea,
648        ];
649
650        let result = decode_row(&encoded, &data_types, &order_types).unwrap();
651        // println!("{:?}", &result);
652
653        let expected = OwnedRow::new(vec![
654            Some(ScalarImpl::Int32(42)),
655            Some(ScalarImpl::Int64(42)),
656            None,
657            Some(ScalarImpl::Float32(23.33.into())),
658            Some(ScalarImpl::Utf8("fooooo".into())),
659            Some(ScalarImpl::Bytea("baaaar".as_bytes().into())),
660        ]);
661        assert_eq!(&result, &expected);
662    }
663
664    #[test]
665    fn test_encode_chunk() {
666        let v10 = Some(ScalarImpl::Int32(42));
667        let v11 = Some(ScalarImpl::Utf8("hello".into()));
668        let v12 = Some(ScalarImpl::Float32(4.0.into()));
669        let v20 = Some(ScalarImpl::Int32(42));
670        let v21 = Some(ScalarImpl::Utf8("hell".into()));
671        let v22 = Some(ScalarImpl::Float32(3.0.into()));
672
673        let row1 = OwnedRow::new(vec![v10, v11, v12]);
674        let row2 = OwnedRow::new(vec![v20, v21, v22]);
675        let chunk = DataChunk::from_rows(
676            &[row1.clone(), row2.clone()],
677            &[DataType::Int32, DataType::Varchar, DataType::Float32],
678        );
679        let order_col_indices = vec![0, 1];
680        let order_types = vec![OrderType::ascending(), OrderType::descending()];
681        let column_orders = order_col_indices
682            .iter()
683            .zip_eq_fast(&order_types)
684            .map(|(i, o)| ColumnOrder::new(*i, *o))
685            .collect_vec();
686
687        let encoded_row1 = encode_row(row1.project(&order_col_indices), &order_types).unwrap();
688        let encoded_row2 = encode_row(row2.project(&order_col_indices), &order_types).unwrap();
689        let encoded_chunk = encode_chunk(&chunk, &column_orders).unwrap();
690        assert_eq!(&encoded_chunk, &[encoded_row1, encoded_row2]);
691    }
692}