risingwave_storage/row_serde/
value_serde.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Value encoding is an encoding format which converts the data into a binary form (not
16//! memcomparable).
17use std::sync::Arc;
18
19use either::for_both;
20use futures::FutureExt;
21use itertools::Itertools;
22use risingwave_common::catalog::ColumnDesc;
23use risingwave_common::row::{OwnedRow, RowDeserializer as BasicDeserializer};
24use risingwave_common::types::*;
25use risingwave_common::util::value_encoding::column_aware_row_encoding::{
26    ColumnAwareSerde, Deserializer, Serializer,
27};
28use risingwave_common::util::value_encoding::error::ValueEncodingError;
29use risingwave_common::util::value_encoding::{
30    BasicSerde, BasicSerializer, DatumFromProtoExt, EitherSerde, ValueRowDeserializer,
31    ValueRowSerdeKind, ValueRowSerializer,
32};
33use risingwave_expr::expr::build_from_prost;
34use risingwave_pb::plan_common::DefaultColumnDesc;
35use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
36
37pub type Result<T> = std::result::Result<T, ValueEncodingError>;
38
39/// Part of `ValueRowSerde` that implements `new` a serde given `column_ids` and `schema`
40pub trait ValueRowSerdeNew: Clone {
41    fn new(value_indices: Arc<[usize]>, table_columns: Arc<[ColumnDesc]>) -> Self;
42}
43
44/// The compound trait used in `StateTableInner`, implemented by `BasicSerde` and `ColumnAwareSerde`
45pub trait ValueRowSerde:
46    ValueRowSerializer + ValueRowDeserializer + ValueRowSerdeNew + Sync + Send + 'static
47{
48    fn kind(&self) -> ValueRowSerdeKind;
49}
50
51impl ValueRowSerdeNew for EitherSerde {
52    fn new(_value_indices: Arc<[usize]>, _table_columns: Arc<[ColumnDesc]>) -> EitherSerde {
53        unreachable!("should construct manually")
54    }
55}
56
57impl ValueRowSerdeNew for BasicSerde {
58    fn new(value_indices: Arc<[usize]>, table_columns: Arc<[ColumnDesc]>) -> BasicSerde {
59        BasicSerde {
60            serializer: BasicSerializer {},
61            deserializer: BasicDeserializer::new(
62                value_indices
63                    .iter()
64                    .map(|idx| table_columns[*idx].data_type.clone())
65                    .collect_vec(),
66            ),
67        }
68    }
69}
70
71impl ValueRowSerde for EitherSerde {
72    fn kind(&self) -> ValueRowSerdeKind {
73        for_both!(&self.0, s => s.kind())
74    }
75}
76
77impl ValueRowSerde for BasicSerde {
78    fn kind(&self) -> ValueRowSerdeKind {
79        ValueRowSerdeKind::Basic
80    }
81}
82
83impl ValueRowSerdeNew for ColumnAwareSerde {
84    fn new(value_indices: Arc<[usize]>, table_columns: Arc<[ColumnDesc]>) -> ColumnAwareSerde {
85        let column_ids = value_indices
86            .iter()
87            .map(|idx| table_columns[*idx].column_id)
88            .collect_vec();
89        let schema = value_indices
90            .iter()
91            .map(|idx| table_columns[*idx].data_type.clone())
92            .collect_vec();
93        if cfg!(debug_assertions) {
94            let duplicates = column_ids.iter().duplicates().collect_vec();
95            if !duplicates.is_empty() {
96                panic!("duplicated column ids: {duplicates:?}");
97            }
98        }
99
100        let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]);
101        let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| {
102            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
103                snapshot_value,
104                expr,
105            })) = c.generated_or_default_column.clone()
106            {
107                // TODO: may not panic on error
108                let value = if let Some(snapshot_value) = snapshot_value {
109                    // If there's a `snapshot_value`, we can use it directly.
110                    Datum::from_protobuf(&snapshot_value, &c.data_type)
111                        .expect("invalid default value")
112                } else {
113                    // For backward compatibility, default columns in old tables may not have `snapshot_value`.
114                    // In this case, we need to evaluate the expression to get the default value.
115                    // It's okay since we previously banned impure expressions in default columns.
116                    build_from_prost(&expr.expect("expr should not be none"))
117                        .expect("build_from_prost error")
118                        .eval_row(&OwnedRow::empty())
119                        .now_or_never()
120                        .expect("constant expression should not be async")
121                        .expect("eval_row failed")
122                };
123                Some((i, value))
124            } else {
125                None
126            }
127        });
128
129        let serializer = Serializer::new(&column_ids, schema.clone());
130        let deserializer = Deserializer::new(&column_ids, schema.into(), column_with_default);
131        ColumnAwareSerde {
132            serializer,
133            deserializer,
134        }
135    }
136}
137
138impl ValueRowSerde for ColumnAwareSerde {
139    fn kind(&self) -> ValueRowSerdeKind {
140        ValueRowSerdeKind::ColumnAware
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use std::collections::HashSet;
147
148    use risingwave_common::catalog::ColumnId;
149    use risingwave_common::row::Row;
150    use risingwave_common::types::ScalarImpl::*;
151    use risingwave_common::util::value_encoding::column_aware_row_encoding;
152    use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
153
154    use super::*;
155
156    #[test]
157    fn test_row_encoding() {
158        let column_ids = vec![ColumnId::new(0), ColumnId::new(1)];
159        let row1 = OwnedRow::new(vec![Some(Int16(5)), Some(Utf8("abc".into()))]);
160        let row2 = OwnedRow::new(vec![Some(Int16(5)), Some(Utf8("abd".into()))]);
161        let row3 = OwnedRow::new(vec![Some(Int16(6)), Some(Utf8("abc".into()))]);
162        let rows = vec![row1, row2, row3];
163        let mut array = vec![];
164        let serializer = column_aware_row_encoding::Serializer::new(
165            &column_ids,
166            [DataType::Int16, DataType::Varchar],
167        );
168        for row in &rows {
169            let row_bytes = serializer.serialize(row);
170            array.push(row_bytes);
171        }
172        let zero_le_bytes = 0_i32.to_le_bytes();
173        let one_le_bytes = 1_i32.to_le_bytes();
174
175        assert_eq!(
176            array[0],
177            [
178                0b10000001, // flag mid WW mid BB
179                2,
180                0,
181                0,
182                0,                // column nums
183                zero_le_bytes[0], // start id 0
184                zero_le_bytes[1],
185                zero_le_bytes[2],
186                zero_le_bytes[3],
187                one_le_bytes[0], // start id 1
188                one_le_bytes[1],
189                one_le_bytes[2],
190                one_le_bytes[3],
191                0, // offset0: 0
192                2, // offset1: 2
193                5, // i16: 5
194                0,
195                3, // str: abc
196                0,
197                0,
198                0,
199                b'a',
200                b'b',
201                b'c'
202            ]
203        );
204    }
205    #[test]
206    fn test_row_decoding() {
207        let column_ids = vec![ColumnId::new(0), ColumnId::new(1)];
208        let row1 = OwnedRow::new(vec![Some(Int16(5)), Some(Utf8("abc".into()))]);
209        let serializer = column_aware_row_encoding::Serializer::new(
210            &column_ids,
211            [DataType::Int16, DataType::Varchar],
212        );
213        let row_bytes = serializer.serialize(row1);
214        let data_types = vec![DataType::Int16, DataType::Varchar];
215        let deserializer = column_aware_row_encoding::Deserializer::new(
216            &column_ids[..],
217            Arc::from(data_types.into_boxed_slice()),
218            std::iter::empty(),
219        );
220        let decoded = deserializer.deserialize(&row_bytes[..]);
221        assert_eq!(
222            decoded.unwrap(),
223            vec![Some(Int16(5)), Some(Utf8("abc".into()))]
224        );
225    }
226    #[test]
227    fn test_row_hard1() {
228        let row = OwnedRow::new(vec![Some(Int16(233)); 20000]);
229        let serde = ColumnAwareSerde::new(
230            Arc::from_iter(0..20000),
231            Arc::from_iter(
232                (0..20000).map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Int16)),
233            ),
234        );
235        let encoded_bytes = serde.serialize(row);
236        let decoded_row = serde.deserialize(&encoded_bytes);
237        assert_eq!(decoded_row.unwrap(), vec![Some(Int16(233)); 20000]);
238    }
239    #[test]
240    fn test_row_hard2() {
241        let mut data = vec![Some(Int16(233)); 5000];
242        data.extend(vec![None; 5000]);
243        data.extend(vec![Some(Utf8("risingwave risingwave".into())); 5000]);
244        data.extend(vec![None; 5000]);
245        let row = OwnedRow::new(data.clone());
246        let serde = ColumnAwareSerde::new(
247            Arc::from_iter(0..20000),
248            Arc::from_iter(
249                (0..10000)
250                    .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Int16))
251                    .chain(
252                        (10000..20000)
253                            .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Varchar)),
254                    ),
255            ),
256        );
257        let encoded_bytes = serde.serialize(row);
258        let decoded_row = serde.deserialize(&encoded_bytes);
259        assert_eq!(decoded_row.unwrap(), data);
260    }
261    #[test]
262    fn test_row_hard3() {
263        let mut data = vec![Some(Int64(233)); 500000];
264        data.extend(vec![None; 250000]);
265        data.extend(vec![Some(Utf8("risingwave risingwave".into())); 250000]);
266        let row = OwnedRow::new(data.clone());
267        let serde = ColumnAwareSerde::new(
268            Arc::from_iter(0..1000000),
269            Arc::from_iter(
270                (0..500000)
271                    .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Int64))
272                    .chain(
273                        (500000..1000000)
274                            .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Varchar)),
275                    ),
276            ),
277        );
278        let encoded_bytes = serde.serialize(row);
279        let decoded_row = serde.deserialize(&encoded_bytes);
280        assert_eq!(decoded_row.unwrap(), data);
281    }
282
283    #[test]
284    fn test_drop_column() {
285        let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
286        let row1 = OwnedRow::new(vec![
287            Some(Int16(5)),
288            Some(Utf8("abc".into())),
289            Some(Utf8("ABC".into())),
290        ]);
291        let serializer = column_aware_row_encoding::Serializer::new(
292            &column_ids,
293            [DataType::Int16, DataType::Varchar, DataType::Varchar],
294        );
295        let row_bytes = serializer.serialize(row1);
296
297        // no columns is dropped
298        assert!(
299            try_drop_invalid_columns(&row_bytes, &[0, 1, 2, 3, 4].into_iter().collect()).is_none()
300        );
301
302        // column id 1 is dropped
303        let row_bytes_dropped =
304            try_drop_invalid_columns(&row_bytes, &[0, 2].into_iter().collect()).unwrap();
305        let deserializer = column_aware_row_encoding::Deserializer::new(
306            &[ColumnId::new(0), ColumnId::new(2)],
307            Arc::from(vec![DataType::Int16, DataType::Varchar].into_boxed_slice()),
308            std::iter::empty(),
309        );
310        let decoded = deserializer.deserialize(&row_bytes_dropped[..]);
311        assert_eq!(
312            decoded.unwrap(),
313            vec![Some(Int16(5)), Some(Utf8("ABC".into()))]
314        );
315
316        // all columns are dropped
317        let row_bytes_all_dropped = try_drop_invalid_columns(&row_bytes, &HashSet::new()).unwrap();
318        assert_eq!(row_bytes_all_dropped.len(), 5); // 1 byte flag + 4 bytes for length (0)
319        assert_eq!(&row_bytes_all_dropped[1..], [0, 0, 0, 0]);
320    }
321
322    #[test]
323    fn test_deserialize_partial_columns() {
324        let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
325        let row1 = OwnedRow::new(vec![
326            Some(Int16(5)),
327            Some(Utf8("abc".into())),
328            Some(Utf8("ABC".into())),
329        ]);
330        let serializer = column_aware_row_encoding::Serializer::new(
331            &column_ids,
332            [DataType::Int16, DataType::Varchar, DataType::Varchar],
333        );
334        let row_bytes = serializer.serialize(row1);
335
336        let deserializer = column_aware_row_encoding::Deserializer::new(
337            &[ColumnId::new(2), ColumnId::new(0)],
338            Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()),
339            std::iter::empty(),
340        );
341        let decoded = deserializer.deserialize(&row_bytes[..]);
342        assert_eq!(
343            decoded.unwrap(),
344            vec![Some(Utf8("ABC".into())), Some(Int16(5))]
345        );
346    }
347
348    #[test]
349    fn test_deserialize_partial_columns_with_default_columns() {
350        let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
351        let row1 = OwnedRow::new(vec![
352            Some(Int16(5)),
353            Some(Utf8("abc".into())),
354            Some(Utf8("ABC".into())),
355        ]);
356        let serializer = column_aware_row_encoding::Serializer::new(
357            &column_ids,
358            [DataType::Int16, DataType::Varchar, DataType::Varchar],
359        );
360        let row_bytes = serializer.serialize(row1);
361
362        // default column of ColumnId::new(3)
363        let default_columns = vec![(1, Some(Utf8("new column".into())))];
364
365        let deserializer = column_aware_row_encoding::Deserializer::new(
366            &[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)],
367            Arc::from(
368                vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(),
369            ),
370            default_columns.into_iter(),
371        );
372        let decoded = deserializer.deserialize(&row_bytes[..]);
373        assert_eq!(
374            decoded.unwrap(),
375            vec![
376                Some(Utf8("ABC".into())),
377                Some(Utf8("new column".into())),
378                Some(Int16(5))
379            ]
380        );
381    }
382
383    #[test]
384    fn test_row_composite_types() {
385        let inner_struct: DataType =
386            StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
387                .with_ids([ColumnId::new(11), ColumnId::new(12)])
388                .into();
389        let list = DataType::List(Box::new(inner_struct.clone()));
390        let map = MapType::from_kv(DataType::Varchar, list.clone()).into();
391        let outer_struct = StructType::new([("f1", DataType::Int32), ("map", map)])
392            .with_ids([ColumnId::new(1), ColumnId::new(2)])
393            .into();
394
395        let inner_struct_value = StructValue::new(vec![Some(Int32(6)), Some(Bool(true))]);
396        let list_value =
397            ListValue::from_datum_iter(&inner_struct, [Some(Struct(inner_struct_value))]);
398        let map_value = MapValue::try_from_kv(
399            ListValue::from_datum_iter(&DataType::Varchar, [Some(Utf8("key".into()))]),
400            ListValue::from_datum_iter(&list, [Some(List(list_value))]),
401        )
402        .unwrap();
403        let outer_struct_value = StructValue::new(vec![Some(Int32(5)), Some(Map(map_value))]);
404
405        let datum = Some(Struct(outer_struct_value));
406        let row1 = [datum];
407
408        let column_ids = &[ColumnId::new(0)];
409        let data_types = vec![outer_struct];
410
411        let serializer = column_aware_row_encoding::Serializer::new(column_ids, data_types.clone());
412        let row_bytes = serializer.serialize(row1.clone());
413
414        assert_eq!(
415            row_bytes,
416            [
417                0b10000001, // flag mid WW mid BB
418                1, 0, 0, 0, // column num = 1
419                0, 0, 0, 0,  // column id = 0 "outer"
420                0,  // offset 0 = 0
421                60, // struct length = 60,
422                0, 0, 0, 0b10000001, // recursive col-aware flag
423                2, 0, 0, 0, // field num = 2
424                1, 0, 0, 0, // field id = 1 "f1"
425                2, 0, 0, 0, // field id = 2 "map"
426                0, // offset 1 = 0
427                4, // offset 2 = 4
428                5, 0, 0, 0, // "f1": i32 = 5
429                1, 0, 0, 0, // map length = 1
430                3, 0, 0, 0, // map key string length = 3
431                b'k', b'e', b'y', // map key = "key"
432                1,    // map value is non NULL
433                1, 0, 0, 0, // map value list length = 1
434                1, // map value list element (struct) is non NULL
435                20, 0, 0, 0,          // struct length = 20
436                0b10000001, // recursive col-aware flag
437                2, 0, 0, 0, // field num = 2
438                11, 0, 0, 0, // field id = 11 "f2"
439                12, 0, 0, 0, // field id = 12 "f3"
440                0, // offset 11 = 0
441                4, // offset 12 = 4
442                6, 0, 0, 0, // "f2": i32 = 6
443                1, // "f3": bool = true
444            ]
445        );
446
447        let deserializer = column_aware_row_encoding::Deserializer::new(
448            column_ids,
449            data_types.into(),
450            std::iter::empty(),
451        );
452        let decoded = deserializer.deserialize(&row_bytes).unwrap();
453
454        assert_eq!(OwnedRow::new(decoded), row1.to_owned_row());
455    }
456}