risingwave_common/util/
row_serde.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16
17use bytes::BufMut;
18
19use crate::row::{OwnedRow, Row};
20use crate::types::{DataType, ToDatumRef};
21use crate::util::iter_util::{ZipEqDebug, ZipEqFast};
22use crate::util::memcmp_encoding;
23use crate::util::sort_util::OrderType;
24
25/// `OrderedRowSerde` is responsible for serializing and deserializing Ordered Row.
26#[derive(Clone)]
27pub struct OrderedRowSerde {
28    schema: Vec<DataType>,
29    order_types: Vec<OrderType>,
30}
31
32impl OrderedRowSerde {
33    pub fn new(schema: Vec<DataType>, order_types: Vec<OrderType>) -> Self {
34        assert_eq!(schema.len(), order_types.len());
35        Self {
36            schema,
37            order_types,
38        }
39    }
40
41    /// FIXME: This leads to allocation in most cases. Should allow using slices for the schema and
42    /// order types.
43    #[must_use]
44    pub fn prefix(&self, len: usize) -> Cow<'_, Self> {
45        if len == self.order_types.len() {
46            Cow::Borrowed(self)
47        } else {
48            Cow::Owned(Self {
49                schema: self.schema[..len].to_vec(),
50                order_types: self.order_types[..len].to_vec(),
51            })
52        }
53    }
54
55    #[must_use]
56    pub fn index(&self, idx: usize) -> Cow<'_, Self> {
57        if 1 == self.order_types.len() {
58            Cow::Borrowed(self)
59        } else {
60            Cow::Owned(Self {
61                schema: vec![self.schema[idx].clone()],
62                order_types: vec![self.order_types[idx]],
63            })
64        }
65    }
66
67    /// Note: prefer [`Row::memcmp_serialize`] if possible.
68    pub fn serialize(&self, row: impl Row, append_to: impl BufMut) {
69        self.serialize_datums(row.iter(), append_to)
70    }
71
72    /// Note: prefer [`Row::memcmp_serialize`] if possible.
73    pub fn serialize_datums(
74        &self,
75        datum_refs: impl Iterator<Item = impl ToDatumRef>,
76        mut append_to: impl BufMut,
77    ) {
78        let mut serializer = memcomparable::Serializer::new(&mut append_to);
79        for (datum, order) in datum_refs.zip_eq_debug(self.order_types.iter().copied()) {
80            memcmp_encoding::serialize_datum(datum, order, &mut serializer).unwrap();
81        }
82    }
83
84    pub fn deserialize(&self, data: &[u8]) -> memcomparable::Result<OwnedRow> {
85        let mut values = Vec::with_capacity(self.schema.len());
86        let mut deserializer = memcomparable::Deserializer::new(data);
87        for (data_type, order) in self
88            .schema
89            .iter()
90            .zip_eq_fast(self.order_types.iter().copied())
91        {
92            let datum = memcmp_encoding::deserialize_datum(data_type, order, &mut deserializer)?;
93            values.push(datum);
94        }
95        Ok(OwnedRow::new(values))
96    }
97
98    pub fn get_order_types(&self) -> &[OrderType] {
99        &self.order_types
100    }
101
102    pub fn get_data_types(&self) -> &[DataType] {
103        &self.schema
104    }
105
106    pub fn deserialize_prefix_len(
107        &self,
108        key: &[u8],
109        prefix_len: usize,
110    ) -> memcomparable::Result<usize> {
111        let mut len: usize = 0;
112        for index in 0..prefix_len {
113            let data_type = &self.schema[index];
114            let data = &key[len..];
115            len +=
116                memcmp_encoding::calculate_encoded_size(data_type, self.order_types[index], data)?;
117        }
118        Ok(len)
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use std::str::FromStr;
125
126    use super::*;
127    use crate::types::{ScalarImpl as S, Timestamp};
128
129    #[test]
130    fn test_ordered_row_serializer() {
131        let orders = vec![OrderType::descending(), OrderType::ascending()];
132        let data_types = vec![DataType::Int16, DataType::Varchar];
133        let serializer = OrderedRowSerde::new(data_types, orders);
134        let row1 = OwnedRow::new(vec![Some(S::Int16(5)), Some(S::Utf8("abc".into()))]);
135        let row2 = OwnedRow::new(vec![Some(S::Int16(5)), Some(S::Utf8("abd".into()))]);
136        let row3 = OwnedRow::new(vec![Some(S::Int16(6)), Some(S::Utf8("abc".into()))]);
137        let rows = vec![row1, row2, row3];
138        let mut array = vec![];
139        for row in &rows {
140            let mut row_bytes = vec![];
141            serializer.serialize(row, &mut row_bytes);
142            array.push(row_bytes);
143        }
144        array.sort();
145        // option 1 byte || number 2 bytes
146        assert_eq!(array[0][2], !6i16.to_be_bytes()[1]);
147        assert_eq!(&array[1][3..], [0, 1, b'a', b'b', b'c', 0, 0, 0, 0, 0, 3u8]);
148        assert_eq!(&array[2][3..], [0, 1, b'a', b'b', b'd', 0, 0, 0, 0, 0, 3u8]);
149    }
150
151    #[test]
152    fn test_ordered_row_deserializer() {
153        use crate::types::*;
154        {
155            // basic
156            let order_types = vec![OrderType::descending(), OrderType::ascending()];
157
158            let schema = vec![DataType::Varchar, DataType::Int16];
159            let serde = OrderedRowSerde::new(schema, order_types);
160            let row1 = OwnedRow::new(vec![Some(S::Utf8("abc".into())), Some(S::Int16(5))]);
161            let row2 = OwnedRow::new(vec![Some(S::Utf8("abd".into())), Some(S::Int16(5))]);
162            let row3 = OwnedRow::new(vec![Some(S::Utf8("abc".into())), Some(S::Int16(6))]);
163            let rows = vec![row1.clone(), row2.clone(), row3.clone()];
164            let mut array = vec![];
165            for row in &rows {
166                let mut row_bytes = vec![];
167                serde.serialize(row, &mut row_bytes);
168                array.push(row_bytes);
169            }
170            assert_eq!(serde.deserialize(&array[0]).unwrap(), row1);
171            assert_eq!(serde.deserialize(&array[1]).unwrap(), row2);
172            assert_eq!(serde.deserialize(&array[2]).unwrap(), row3);
173        }
174
175        {
176            // decimal
177
178            let order_types = vec![OrderType::descending(), OrderType::ascending()];
179
180            let schema = vec![DataType::Varchar, DataType::Decimal];
181            let serde = OrderedRowSerde::new(schema, order_types);
182            let row1 = OwnedRow::new(vec![
183                Some(S::Utf8("abc".into())),
184                Some(S::Decimal(Decimal::NaN)),
185            ]);
186            let row2 = OwnedRow::new(vec![
187                Some(S::Utf8("abd".into())),
188                Some(S::Decimal(Decimal::PositiveInf)),
189            ]);
190            let row3 = OwnedRow::new(vec![
191                Some(S::Utf8("abc".into())),
192                Some(S::Decimal(Decimal::NegativeInf)),
193            ]);
194            let rows = vec![row1.clone(), row2.clone(), row3.clone()];
195            let mut array = vec![];
196            for row in &rows {
197                let mut row_bytes = vec![];
198                serde.serialize(row, &mut row_bytes);
199                array.push(row_bytes);
200            }
201            assert_eq!(serde.deserialize(&array[0]).unwrap(), row1);
202            assert_eq!(serde.deserialize(&array[1]).unwrap(), row2);
203            assert_eq!(serde.deserialize(&array[2]).unwrap(), row3);
204        }
205    }
206
207    #[test]
208    fn test_deserialize_with_column_indices() {
209        let order_types = vec![OrderType::descending(), OrderType::ascending()];
210
211        let schema = vec![DataType::Varchar, DataType::Int16];
212        let serde = OrderedRowSerde::new(schema, order_types);
213        let row1 = OwnedRow::new(vec![Some(S::Utf8("abc".into())), Some(S::Int16(5))]);
214        let rows = vec![row1.clone()];
215        let mut array = vec![];
216        for row in &rows {
217            let mut row_bytes = vec![];
218            serde.serialize(row, &mut row_bytes);
219            array.push(row_bytes);
220        }
221
222        {
223            let row_0_idx_0_len = serde.deserialize_prefix_len(&array[0], 1).unwrap();
224
225            let schema = vec![DataType::Varchar];
226            let order_types = vec![OrderType::descending()];
227            let deserde = OrderedRowSerde::new(schema, order_types);
228            let prefix_slice = &array[0][0..row_0_idx_0_len];
229            assert_eq!(
230                deserde.deserialize(prefix_slice).unwrap(),
231                OwnedRow::new(vec![Some(S::Utf8("abc".into()))])
232            );
233        }
234
235        {
236            let row_0_idx_1_len = serde.deserialize_prefix_len(&array[0], 2).unwrap();
237
238            let order_types = vec![OrderType::descending(), OrderType::ascending()];
239            let schema = vec![DataType::Varchar, DataType::Int16];
240            let deserde = OrderedRowSerde::new(schema, order_types);
241            let prefix_slice = &array[0][0..row_0_idx_1_len];
242            assert_eq!(deserde.deserialize(prefix_slice).unwrap(), row1);
243        }
244    }
245
246    #[test]
247    fn test_encoding_data_size() {
248        use std::mem::size_of;
249
250        use crate::types::{F64, Interval};
251
252        let order_types = vec![OrderType::ascending()];
253        let schema = vec![DataType::Int16];
254        let serde = OrderedRowSerde::new(schema, order_types.clone());
255
256        // test fixed_size
257        {
258            {
259                // test None
260                let row = OwnedRow::new(vec![None]);
261                let mut row_bytes = vec![];
262                serde.serialize(&row, &mut row_bytes);
263                let encoding_data_size = memcmp_encoding::calculate_encoded_size(
264                    &DataType::Int16,
265                    order_types[0],
266                    &row_bytes[..],
267                )
268                .unwrap();
269                assert_eq!(1, encoding_data_size);
270            }
271
272            {
273                // float64
274                let row = OwnedRow::new(vec![Some(S::Float64(6.4.into()))]);
275                let mut row_bytes = vec![];
276                serde.serialize(&row, &mut row_bytes);
277                let encoding_data_size = memcmp_encoding::calculate_encoded_size(
278                    &DataType::Float64,
279                    order_types[0],
280                    &row_bytes[..],
281                )
282                .unwrap();
283                let data_size = size_of::<F64>();
284                assert_eq!(8, data_size);
285                assert_eq!(1 + data_size, encoding_data_size);
286            }
287
288            {
289                // bool
290                let row = OwnedRow::new(vec![Some(S::Bool(false))]);
291                let mut row_bytes = vec![];
292                serde.serialize(&row, &mut row_bytes);
293                let encoding_data_size = memcmp_encoding::calculate_encoded_size(
294                    &DataType::Boolean,
295                    order_types[0],
296                    &row_bytes[..],
297                )
298                .unwrap();
299
300                let data_size = size_of::<u8>();
301                assert_eq!(1, data_size);
302                assert_eq!(1 + data_size, encoding_data_size);
303            }
304
305            {
306                // ts
307                let row = OwnedRow::new(vec![Some(S::Timestamp(Default::default()))]);
308                let mut row_bytes = vec![];
309                serde.serialize(&row, &mut row_bytes);
310                let encoding_data_size = memcmp_encoding::calculate_encoded_size(
311                    &DataType::Timestamp,
312                    order_types[0],
313                    &row_bytes[..],
314                )
315                .unwrap();
316                let data_size = size_of::<Timestamp>();
317                assert_eq!(12, data_size);
318                assert_eq!(1 + data_size, encoding_data_size);
319            }
320
321            {
322                // tz
323                let row = OwnedRow::new(vec![Some(S::Int64(1111111111))]);
324                let mut row_bytes = vec![];
325                serde.serialize(&row, &mut row_bytes);
326                let encoding_data_size = memcmp_encoding::calculate_encoded_size(
327                    &DataType::Timestamptz,
328                    order_types[0],
329                    &row_bytes[..],
330                )
331                .unwrap();
332                let data_size = size_of::<i64>();
333                assert_eq!(8, data_size);
334                assert_eq!(1 + data_size, encoding_data_size);
335            }
336
337            {
338                // interval
339                let row = OwnedRow::new(vec![Some(S::Interval(Interval::default()))]);
340                let mut row_bytes = vec![];
341                serde.serialize(&row, &mut row_bytes);
342                let encoding_data_size = memcmp_encoding::calculate_encoded_size(
343                    &DataType::Interval,
344                    order_types[0],
345                    &row_bytes[..],
346                )
347                .unwrap();
348                let data_size = size_of::<Interval>();
349                assert_eq!(16, data_size);
350                assert_eq!(1 + data_size, encoding_data_size);
351            }
352        }
353
354        {
355            // test dynamic_size
356            {
357                // test decimal
358                pub use crate::types::Decimal;
359
360                {
361                    let d = Decimal::from_str("41721.900909090909090909090909").unwrap();
362                    let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
363                    let mut row_bytes = vec![];
364                    serde.serialize(&row, &mut row_bytes);
365                    let encoding_data_size = memcmp_encoding::calculate_encoded_size(
366                        &DataType::Decimal,
367                        order_types[0],
368                        &row_bytes[..],
369                    )
370                    .unwrap();
371                    // [nulltag, flag, decimal_chunk]
372                    assert_eq!(17, encoding_data_size);
373                }
374
375                {
376                    let d = Decimal::from_str("1").unwrap();
377                    let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
378                    let mut row_bytes = vec![];
379                    serde.serialize(&row, &mut row_bytes);
380                    let encoding_data_size = memcmp_encoding::calculate_encoded_size(
381                        &DataType::Decimal,
382                        order_types[0],
383                        &row_bytes[..],
384                    )
385                    .unwrap();
386                    // [nulltag, flag, decimal_chunk]
387                    assert_eq!(3, encoding_data_size);
388                }
389
390                {
391                    let d = Decimal::from_str("inf").unwrap();
392                    let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
393                    let mut row_bytes = vec![];
394                    serde.serialize(&row, &mut row_bytes);
395                    let encoding_data_size = memcmp_encoding::calculate_encoded_size(
396                        &DataType::Decimal,
397                        order_types[0],
398                        &row_bytes[..],
399                    )
400                    .unwrap();
401
402                    assert_eq!(2, encoding_data_size); // [1, 35]
403                }
404
405                {
406                    let d = Decimal::from_str("nan").unwrap();
407                    let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
408                    let mut row_bytes = vec![];
409                    serde.serialize(&row, &mut row_bytes);
410                    let encoding_data_size = memcmp_encoding::calculate_encoded_size(
411                        &DataType::Decimal,
412                        order_types[0],
413                        &row_bytes[..],
414                    )
415                    .unwrap();
416                    assert_eq!(2, encoding_data_size); // [1, 6]
417                }
418
419                {
420                    // TODO(test list / struct)
421                }
422
423                {
424                    // test varchar
425                    let varchar = "abcdefghijklmn";
426                    let row = OwnedRow::new(vec![Some(S::Utf8(varchar.into()))]);
427                    let mut row_bytes = vec![];
428                    serde.serialize(&row, &mut row_bytes);
429                    let encoding_data_size = memcmp_encoding::calculate_encoded_size(
430                        &DataType::Varchar,
431                        order_types[0],
432                        &row_bytes[..],
433                    )
434                    .unwrap();
435                    // [1, 1, 97, 98, 99, 100, 101, 102, 103, 104, 9, 105, 106, 107, 108, 109, 110,
436                    // 0, 0, 6]
437                    assert_eq!(6 + varchar.len(), encoding_data_size);
438                }
439
440                {
441                    {
442                        // test varchar Descending
443                        let order_types = vec![OrderType::descending()];
444                        let schema = vec![DataType::Varchar];
445                        let serde = OrderedRowSerde::new(schema, order_types.clone());
446                        let varchar = "abcdefghijklmnopq";
447                        let row = OwnedRow::new(vec![Some(S::Utf8(varchar.into()))]);
448                        let mut row_bytes = vec![];
449                        serde.serialize(&row, &mut row_bytes);
450                        let encoding_data_size = memcmp_encoding::calculate_encoded_size(
451                            &DataType::Varchar,
452                            order_types[0],
453                            &row_bytes[..],
454                        )
455                        .unwrap();
456
457                        // [254, 254, 158, 157, 156, 155, 154, 153, 152, 151, 246, 150, 149, 148,
458                        // 147, 146, 145, 144, 143, 246, 142, 255, 255, 255, 255, 255, 255, 255,
459                        // 254]
460                        assert_eq!(12 + varchar.len(), encoding_data_size);
461                    }
462                }
463            }
464        }
465    }
466}