1use std::borrow::Cow;
16
17use bytes::BufMut;
18
19use crate::row::{OwnedRow, Row};
20use crate::types::{DataType, ToDatumRef};
21use crate::util::iter_util::{ZipEqDebug, ZipEqFast};
22use crate::util::memcmp_encoding;
23use crate::util::sort_util::OrderType;
24
25#[derive(Clone)]
27pub struct OrderedRowSerde {
28 schema: Vec<DataType>,
29 order_types: Vec<OrderType>,
30}
31
32impl OrderedRowSerde {
33 pub fn new(schema: Vec<DataType>, order_types: Vec<OrderType>) -> Self {
34 assert_eq!(schema.len(), order_types.len());
35 Self {
36 schema,
37 order_types,
38 }
39 }
40
41 #[must_use]
44 pub fn prefix(&self, len: usize) -> Cow<'_, Self> {
45 if len == self.order_types.len() {
46 Cow::Borrowed(self)
47 } else {
48 Cow::Owned(Self {
49 schema: self.schema[..len].to_vec(),
50 order_types: self.order_types[..len].to_vec(),
51 })
52 }
53 }
54
55 #[must_use]
56 pub fn index(&self, idx: usize) -> Cow<'_, Self> {
57 if 1 == self.order_types.len() {
58 Cow::Borrowed(self)
59 } else {
60 Cow::Owned(Self {
61 schema: vec![self.schema[idx].clone()],
62 order_types: vec![self.order_types[idx]],
63 })
64 }
65 }
66
67 pub fn serialize(&self, row: impl Row, append_to: impl BufMut) {
69 self.serialize_datums(row.iter(), append_to)
70 }
71
72 pub fn serialize_datums(
74 &self,
75 datum_refs: impl Iterator<Item = impl ToDatumRef>,
76 mut append_to: impl BufMut,
77 ) {
78 let mut serializer = memcomparable::Serializer::new(&mut append_to);
79 for (datum, order) in datum_refs.zip_eq_debug(self.order_types.iter().copied()) {
80 memcmp_encoding::serialize_datum(datum, order, &mut serializer).unwrap();
81 }
82 }
83
84 pub fn deserialize(&self, data: &[u8]) -> memcomparable::Result<OwnedRow> {
85 let mut values = Vec::with_capacity(self.schema.len());
86 let mut deserializer = memcomparable::Deserializer::new(data);
87 for (data_type, order) in self
88 .schema
89 .iter()
90 .zip_eq_fast(self.order_types.iter().copied())
91 {
92 let datum = memcmp_encoding::deserialize_datum(data_type, order, &mut deserializer)?;
93 values.push(datum);
94 }
95 Ok(OwnedRow::new(values))
96 }
97
98 pub fn get_order_types(&self) -> &[OrderType] {
99 &self.order_types
100 }
101
102 pub fn get_data_types(&self) -> &[DataType] {
103 &self.schema
104 }
105
106 pub fn deserialize_prefix_len(
107 &self,
108 key: &[u8],
109 prefix_len: usize,
110 ) -> memcomparable::Result<usize> {
111 let mut len: usize = 0;
112 for index in 0..prefix_len {
113 let data_type = &self.schema[index];
114 let data = &key[len..];
115 len +=
116 memcmp_encoding::calculate_encoded_size(data_type, self.order_types[index], data)?;
117 }
118 Ok(len)
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use std::str::FromStr;
125
126 use super::*;
127 use crate::types::{ScalarImpl as S, Timestamp};
128
129 #[test]
130 fn test_ordered_row_serializer() {
131 let orders = vec![OrderType::descending(), OrderType::ascending()];
132 let data_types = vec![DataType::Int16, DataType::Varchar];
133 let serializer = OrderedRowSerde::new(data_types, orders);
134 let row1 = OwnedRow::new(vec![Some(S::Int16(5)), Some(S::Utf8("abc".into()))]);
135 let row2 = OwnedRow::new(vec![Some(S::Int16(5)), Some(S::Utf8("abd".into()))]);
136 let row3 = OwnedRow::new(vec![Some(S::Int16(6)), Some(S::Utf8("abc".into()))]);
137 let rows = vec![row1, row2, row3];
138 let mut array = vec![];
139 for row in &rows {
140 let mut row_bytes = vec![];
141 serializer.serialize(row, &mut row_bytes);
142 array.push(row_bytes);
143 }
144 array.sort();
145 assert_eq!(array[0][2], !6i16.to_be_bytes()[1]);
147 assert_eq!(&array[1][3..], [0, 1, b'a', b'b', b'c', 0, 0, 0, 0, 0, 3u8]);
148 assert_eq!(&array[2][3..], [0, 1, b'a', b'b', b'd', 0, 0, 0, 0, 0, 3u8]);
149 }
150
151 #[test]
152 fn test_ordered_row_deserializer() {
153 use crate::types::*;
154 {
155 let order_types = vec![OrderType::descending(), OrderType::ascending()];
157
158 let schema = vec![DataType::Varchar, DataType::Int16];
159 let serde = OrderedRowSerde::new(schema, order_types);
160 let row1 = OwnedRow::new(vec![Some(S::Utf8("abc".into())), Some(S::Int16(5))]);
161 let row2 = OwnedRow::new(vec![Some(S::Utf8("abd".into())), Some(S::Int16(5))]);
162 let row3 = OwnedRow::new(vec![Some(S::Utf8("abc".into())), Some(S::Int16(6))]);
163 let rows = vec![row1.clone(), row2.clone(), row3.clone()];
164 let mut array = vec![];
165 for row in &rows {
166 let mut row_bytes = vec![];
167 serde.serialize(row, &mut row_bytes);
168 array.push(row_bytes);
169 }
170 assert_eq!(serde.deserialize(&array[0]).unwrap(), row1);
171 assert_eq!(serde.deserialize(&array[1]).unwrap(), row2);
172 assert_eq!(serde.deserialize(&array[2]).unwrap(), row3);
173 }
174
175 {
176 let order_types = vec![OrderType::descending(), OrderType::ascending()];
179
180 let schema = vec![DataType::Varchar, DataType::Decimal];
181 let serde = OrderedRowSerde::new(schema, order_types);
182 let row1 = OwnedRow::new(vec![
183 Some(S::Utf8("abc".into())),
184 Some(S::Decimal(Decimal::NaN)),
185 ]);
186 let row2 = OwnedRow::new(vec![
187 Some(S::Utf8("abd".into())),
188 Some(S::Decimal(Decimal::PositiveInf)),
189 ]);
190 let row3 = OwnedRow::new(vec![
191 Some(S::Utf8("abc".into())),
192 Some(S::Decimal(Decimal::NegativeInf)),
193 ]);
194 let rows = vec![row1.clone(), row2.clone(), row3.clone()];
195 let mut array = vec![];
196 for row in &rows {
197 let mut row_bytes = vec![];
198 serde.serialize(row, &mut row_bytes);
199 array.push(row_bytes);
200 }
201 assert_eq!(serde.deserialize(&array[0]).unwrap(), row1);
202 assert_eq!(serde.deserialize(&array[1]).unwrap(), row2);
203 assert_eq!(serde.deserialize(&array[2]).unwrap(), row3);
204 }
205 }
206
207 #[test]
208 fn test_deserialize_with_column_indices() {
209 let order_types = vec![OrderType::descending(), OrderType::ascending()];
210
211 let schema = vec![DataType::Varchar, DataType::Int16];
212 let serde = OrderedRowSerde::new(schema, order_types);
213 let row1 = OwnedRow::new(vec![Some(S::Utf8("abc".into())), Some(S::Int16(5))]);
214 let rows = vec![row1.clone()];
215 let mut array = vec![];
216 for row in &rows {
217 let mut row_bytes = vec![];
218 serde.serialize(row, &mut row_bytes);
219 array.push(row_bytes);
220 }
221
222 {
223 let row_0_idx_0_len = serde.deserialize_prefix_len(&array[0], 1).unwrap();
224
225 let schema = vec![DataType::Varchar];
226 let order_types = vec![OrderType::descending()];
227 let deserde = OrderedRowSerde::new(schema, order_types);
228 let prefix_slice = &array[0][0..row_0_idx_0_len];
229 assert_eq!(
230 deserde.deserialize(prefix_slice).unwrap(),
231 OwnedRow::new(vec![Some(S::Utf8("abc".into()))])
232 );
233 }
234
235 {
236 let row_0_idx_1_len = serde.deserialize_prefix_len(&array[0], 2).unwrap();
237
238 let order_types = vec![OrderType::descending(), OrderType::ascending()];
239 let schema = vec![DataType::Varchar, DataType::Int16];
240 let deserde = OrderedRowSerde::new(schema, order_types);
241 let prefix_slice = &array[0][0..row_0_idx_1_len];
242 assert_eq!(deserde.deserialize(prefix_slice).unwrap(), row1);
243 }
244 }
245
246 #[test]
247 fn test_encoding_data_size() {
248 use std::mem::size_of;
249
250 use crate::types::{F64, Interval};
251
252 let order_types = vec![OrderType::ascending()];
253 let schema = vec![DataType::Int16];
254 let serde = OrderedRowSerde::new(schema, order_types.clone());
255
256 {
258 {
259 let row = OwnedRow::new(vec![None]);
261 let mut row_bytes = vec![];
262 serde.serialize(&row, &mut row_bytes);
263 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
264 &DataType::Int16,
265 order_types[0],
266 &row_bytes[..],
267 )
268 .unwrap();
269 assert_eq!(1, encoding_data_size);
270 }
271
272 {
273 let row = OwnedRow::new(vec![Some(S::Float64(6.4.into()))]);
275 let mut row_bytes = vec![];
276 serde.serialize(&row, &mut row_bytes);
277 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
278 &DataType::Float64,
279 order_types[0],
280 &row_bytes[..],
281 )
282 .unwrap();
283 let data_size = size_of::<F64>();
284 assert_eq!(8, data_size);
285 assert_eq!(1 + data_size, encoding_data_size);
286 }
287
288 {
289 let row = OwnedRow::new(vec![Some(S::Bool(false))]);
291 let mut row_bytes = vec![];
292 serde.serialize(&row, &mut row_bytes);
293 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
294 &DataType::Boolean,
295 order_types[0],
296 &row_bytes[..],
297 )
298 .unwrap();
299
300 let data_size = size_of::<u8>();
301 assert_eq!(1, data_size);
302 assert_eq!(1 + data_size, encoding_data_size);
303 }
304
305 {
306 let row = OwnedRow::new(vec![Some(S::Timestamp(Default::default()))]);
308 let mut row_bytes = vec![];
309 serde.serialize(&row, &mut row_bytes);
310 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
311 &DataType::Timestamp,
312 order_types[0],
313 &row_bytes[..],
314 )
315 .unwrap();
316 let data_size = size_of::<Timestamp>();
317 assert_eq!(12, data_size);
318 assert_eq!(1 + data_size, encoding_data_size);
319 }
320
321 {
322 let row = OwnedRow::new(vec![Some(S::Int64(1111111111))]);
324 let mut row_bytes = vec![];
325 serde.serialize(&row, &mut row_bytes);
326 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
327 &DataType::Timestamptz,
328 order_types[0],
329 &row_bytes[..],
330 )
331 .unwrap();
332 let data_size = size_of::<i64>();
333 assert_eq!(8, data_size);
334 assert_eq!(1 + data_size, encoding_data_size);
335 }
336
337 {
338 let row = OwnedRow::new(vec![Some(S::Interval(Interval::default()))]);
340 let mut row_bytes = vec![];
341 serde.serialize(&row, &mut row_bytes);
342 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
343 &DataType::Interval,
344 order_types[0],
345 &row_bytes[..],
346 )
347 .unwrap();
348 let data_size = size_of::<Interval>();
349 assert_eq!(16, data_size);
350 assert_eq!(1 + data_size, encoding_data_size);
351 }
352 }
353
354 {
355 {
357 pub use crate::types::Decimal;
359
360 {
361 let d = Decimal::from_str("41721.900909090909090909090909").unwrap();
362 let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
363 let mut row_bytes = vec![];
364 serde.serialize(&row, &mut row_bytes);
365 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
366 &DataType::Decimal,
367 order_types[0],
368 &row_bytes[..],
369 )
370 .unwrap();
371 assert_eq!(17, encoding_data_size);
373 }
374
375 {
376 let d = Decimal::from_str("1").unwrap();
377 let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
378 let mut row_bytes = vec![];
379 serde.serialize(&row, &mut row_bytes);
380 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
381 &DataType::Decimal,
382 order_types[0],
383 &row_bytes[..],
384 )
385 .unwrap();
386 assert_eq!(3, encoding_data_size);
388 }
389
390 {
391 let d = Decimal::from_str("inf").unwrap();
392 let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
393 let mut row_bytes = vec![];
394 serde.serialize(&row, &mut row_bytes);
395 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
396 &DataType::Decimal,
397 order_types[0],
398 &row_bytes[..],
399 )
400 .unwrap();
401
402 assert_eq!(2, encoding_data_size); }
404
405 {
406 let d = Decimal::from_str("nan").unwrap();
407 let row = OwnedRow::new(vec![Some(S::Decimal(d))]);
408 let mut row_bytes = vec![];
409 serde.serialize(&row, &mut row_bytes);
410 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
411 &DataType::Decimal,
412 order_types[0],
413 &row_bytes[..],
414 )
415 .unwrap();
416 assert_eq!(2, encoding_data_size); }
418
419 {
420 }
422
423 {
424 let varchar = "abcdefghijklmn";
426 let row = OwnedRow::new(vec![Some(S::Utf8(varchar.into()))]);
427 let mut row_bytes = vec![];
428 serde.serialize(&row, &mut row_bytes);
429 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
430 &DataType::Varchar,
431 order_types[0],
432 &row_bytes[..],
433 )
434 .unwrap();
435 assert_eq!(6 + varchar.len(), encoding_data_size);
438 }
439
440 {
441 {
442 let order_types = vec![OrderType::descending()];
444 let schema = vec![DataType::Varchar];
445 let serde = OrderedRowSerde::new(schema, order_types.clone());
446 let varchar = "abcdefghijklmnopq";
447 let row = OwnedRow::new(vec![Some(S::Utf8(varchar.into()))]);
448 let mut row_bytes = vec![];
449 serde.serialize(&row, &mut row_bytes);
450 let encoding_data_size = memcmp_encoding::calculate_encoded_size(
451 &DataType::Varchar,
452 order_types[0],
453 &row_bytes[..],
454 )
455 .unwrap();
456
457 assert_eq!(12 + varchar.len(), encoding_data_size);
461 }
462 }
463 }
464 }
465 }
466}