1use std::sync::Arc;
18
19use either::for_both;
20use futures::FutureExt;
21use itertools::Itertools;
22use risingwave_common::catalog::ColumnDesc;
23use risingwave_common::row::{OwnedRow, RowDeserializer as BasicDeserializer};
24use risingwave_common::types::*;
25use risingwave_common::util::value_encoding::column_aware_row_encoding::{
26 ColumnAwareSerde, Deserializer, Serializer,
27};
28use risingwave_common::util::value_encoding::error::ValueEncodingError;
29use risingwave_common::util::value_encoding::{
30 BasicSerde, BasicSerializer, DatumFromProtoExt, EitherSerde, ValueRowDeserializer,
31 ValueRowSerdeKind, ValueRowSerializer,
32};
33use risingwave_expr::expr::build_from_prost;
34use risingwave_pb::plan_common::DefaultColumnDesc;
35use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
36
37pub type Result<T> = std::result::Result<T, ValueEncodingError>;
38
39pub trait ValueRowSerdeNew: Clone {
41 fn new(value_indices: Arc<[usize]>, table_columns: Arc<[ColumnDesc]>) -> Self;
42}
43
44pub trait ValueRowSerde:
46 ValueRowSerializer + ValueRowDeserializer + ValueRowSerdeNew + Sync + Send + 'static
47{
48 fn kind(&self) -> ValueRowSerdeKind;
49}
50
51impl ValueRowSerdeNew for EitherSerde {
52 fn new(_value_indices: Arc<[usize]>, _table_columns: Arc<[ColumnDesc]>) -> EitherSerde {
53 unreachable!("should construct manually")
54 }
55}
56
57impl ValueRowSerdeNew for BasicSerde {
58 fn new(value_indices: Arc<[usize]>, table_columns: Arc<[ColumnDesc]>) -> BasicSerde {
59 BasicSerde {
60 serializer: BasicSerializer {},
61 deserializer: BasicDeserializer::new(
62 value_indices
63 .iter()
64 .map(|idx| table_columns[*idx].data_type.clone())
65 .collect_vec(),
66 ),
67 }
68 }
69}
70
71impl ValueRowSerde for EitherSerde {
72 fn kind(&self) -> ValueRowSerdeKind {
73 for_both!(&self.0, s => s.kind())
74 }
75}
76
77impl ValueRowSerde for BasicSerde {
78 fn kind(&self) -> ValueRowSerdeKind {
79 ValueRowSerdeKind::Basic
80 }
81}
82
83impl ValueRowSerdeNew for ColumnAwareSerde {
84 fn new(value_indices: Arc<[usize]>, table_columns: Arc<[ColumnDesc]>) -> ColumnAwareSerde {
85 let column_ids = value_indices
86 .iter()
87 .map(|idx| table_columns[*idx].column_id)
88 .collect_vec();
89 let schema = value_indices
90 .iter()
91 .map(|idx| table_columns[*idx].data_type.clone())
92 .collect_vec();
93 if cfg!(debug_assertions) {
94 let duplicates = column_ids.iter().duplicates().collect_vec();
95 if !duplicates.is_empty() {
96 panic!("duplicated column ids: {duplicates:?}");
97 }
98 }
99
100 let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]);
101 let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| {
102 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
103 snapshot_value,
104 expr,
105 })) = c.generated_or_default_column.clone()
106 {
107 let value = if let Some(snapshot_value) = snapshot_value {
109 Datum::from_protobuf(&snapshot_value, &c.data_type)
111 .expect("invalid default value")
112 } else {
113 build_from_prost(&expr.expect("expr should not be none"))
117 .expect("build_from_prost error")
118 .eval_row(&OwnedRow::empty())
119 .now_or_never()
120 .expect("constant expression should not be async")
121 .expect("eval_row failed")
122 };
123 Some((i, value))
124 } else {
125 None
126 }
127 });
128
129 let serializer = Serializer::new(&column_ids, schema.clone());
130 let deserializer = Deserializer::new(&column_ids, schema.into(), column_with_default);
131 ColumnAwareSerde {
132 serializer,
133 deserializer,
134 }
135 }
136}
137
138impl ValueRowSerde for ColumnAwareSerde {
139 fn kind(&self) -> ValueRowSerdeKind {
140 ValueRowSerdeKind::ColumnAware
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use std::collections::HashSet;
147
148 use risingwave_common::catalog::ColumnId;
149 use risingwave_common::row::Row;
150 use risingwave_common::types::ScalarImpl::*;
151 use risingwave_common::util::value_encoding::column_aware_row_encoding;
152 use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
153
154 use super::*;
155
156 #[test]
157 fn test_row_encoding() {
158 let column_ids = vec![ColumnId::new(0), ColumnId::new(1)];
159 let row1 = OwnedRow::new(vec![Some(Int16(5)), Some(Utf8("abc".into()))]);
160 let row2 = OwnedRow::new(vec![Some(Int16(5)), Some(Utf8("abd".into()))]);
161 let row3 = OwnedRow::new(vec![Some(Int16(6)), Some(Utf8("abc".into()))]);
162 let rows = vec![row1, row2, row3];
163 let mut array = vec![];
164 let serializer = column_aware_row_encoding::Serializer::new(
165 &column_ids,
166 [DataType::Int16, DataType::Varchar],
167 );
168 for row in &rows {
169 let row_bytes = serializer.serialize(row);
170 array.push(row_bytes);
171 }
172 let zero_le_bytes = 0_i32.to_le_bytes();
173 let one_le_bytes = 1_i32.to_le_bytes();
174
175 assert_eq!(
176 array[0],
177 [
178 0b10000001, 2,
180 0,
181 0,
182 0, zero_le_bytes[0], zero_le_bytes[1],
185 zero_le_bytes[2],
186 zero_le_bytes[3],
187 one_le_bytes[0], one_le_bytes[1],
189 one_le_bytes[2],
190 one_le_bytes[3],
191 0, 2, 5, 0,
195 3, 0,
197 0,
198 0,
199 b'a',
200 b'b',
201 b'c'
202 ]
203 );
204 }
205 #[test]
206 fn test_row_decoding() {
207 let column_ids = vec![ColumnId::new(0), ColumnId::new(1)];
208 let row1 = OwnedRow::new(vec![Some(Int16(5)), Some(Utf8("abc".into()))]);
209 let serializer = column_aware_row_encoding::Serializer::new(
210 &column_ids,
211 [DataType::Int16, DataType::Varchar],
212 );
213 let row_bytes = serializer.serialize(row1);
214 let data_types = vec![DataType::Int16, DataType::Varchar];
215 let deserializer = column_aware_row_encoding::Deserializer::new(
216 &column_ids[..],
217 Arc::from(data_types.into_boxed_slice()),
218 std::iter::empty(),
219 );
220 let decoded = deserializer.deserialize(&row_bytes[..]);
221 assert_eq!(
222 decoded.unwrap(),
223 vec![Some(Int16(5)), Some(Utf8("abc".into()))]
224 );
225 }
226 #[test]
227 fn test_row_hard1() {
228 let row = OwnedRow::new(vec![Some(Int16(233)); 20000]);
229 let serde = ColumnAwareSerde::new(
230 Arc::from_iter(0..20000),
231 Arc::from_iter(
232 (0..20000).map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Int16)),
233 ),
234 );
235 let encoded_bytes = serde.serialize(row);
236 let decoded_row = serde.deserialize(&encoded_bytes);
237 assert_eq!(decoded_row.unwrap(), vec![Some(Int16(233)); 20000]);
238 }
239 #[test]
240 fn test_row_hard2() {
241 let mut data = vec![Some(Int16(233)); 5000];
242 data.extend(vec![None; 5000]);
243 data.extend(vec![Some(Utf8("risingwave risingwave".into())); 5000]);
244 data.extend(vec![None; 5000]);
245 let row = OwnedRow::new(data.clone());
246 let serde = ColumnAwareSerde::new(
247 Arc::from_iter(0..20000),
248 Arc::from_iter(
249 (0..10000)
250 .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Int16))
251 .chain(
252 (10000..20000)
253 .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Varchar)),
254 ),
255 ),
256 );
257 let encoded_bytes = serde.serialize(row);
258 let decoded_row = serde.deserialize(&encoded_bytes);
259 assert_eq!(decoded_row.unwrap(), data);
260 }
261 #[test]
262 fn test_row_hard3() {
263 let mut data = vec![Some(Int64(233)); 500000];
264 data.extend(vec![None; 250000]);
265 data.extend(vec![Some(Utf8("risingwave risingwave".into())); 250000]);
266 let row = OwnedRow::new(data.clone());
267 let serde = ColumnAwareSerde::new(
268 Arc::from_iter(0..1000000),
269 Arc::from_iter(
270 (0..500000)
271 .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Int64))
272 .chain(
273 (500000..1000000)
274 .map(|id| ColumnDesc::unnamed(ColumnId::new(id), DataType::Varchar)),
275 ),
276 ),
277 );
278 let encoded_bytes = serde.serialize(row);
279 let decoded_row = serde.deserialize(&encoded_bytes);
280 assert_eq!(decoded_row.unwrap(), data);
281 }
282
283 #[test]
284 fn test_drop_column() {
285 let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
286 let row1 = OwnedRow::new(vec![
287 Some(Int16(5)),
288 Some(Utf8("abc".into())),
289 Some(Utf8("ABC".into())),
290 ]);
291 let serializer = column_aware_row_encoding::Serializer::new(
292 &column_ids,
293 [DataType::Int16, DataType::Varchar, DataType::Varchar],
294 );
295 let row_bytes = serializer.serialize(row1);
296
297 assert!(
299 try_drop_invalid_columns(&row_bytes, &[0, 1, 2, 3, 4].into_iter().collect()).is_none()
300 );
301
302 let row_bytes_dropped =
304 try_drop_invalid_columns(&row_bytes, &[0, 2].into_iter().collect()).unwrap();
305 let deserializer = column_aware_row_encoding::Deserializer::new(
306 &[ColumnId::new(0), ColumnId::new(2)],
307 Arc::from(vec![DataType::Int16, DataType::Varchar].into_boxed_slice()),
308 std::iter::empty(),
309 );
310 let decoded = deserializer.deserialize(&row_bytes_dropped[..]);
311 assert_eq!(
312 decoded.unwrap(),
313 vec![Some(Int16(5)), Some(Utf8("ABC".into()))]
314 );
315
316 let row_bytes_all_dropped = try_drop_invalid_columns(&row_bytes, &HashSet::new()).unwrap();
318 assert_eq!(row_bytes_all_dropped.len(), 5); assert_eq!(&row_bytes_all_dropped[1..], [0, 0, 0, 0]);
320 }
321
322 #[test]
323 fn test_deserialize_partial_columns() {
324 let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
325 let row1 = OwnedRow::new(vec![
326 Some(Int16(5)),
327 Some(Utf8("abc".into())),
328 Some(Utf8("ABC".into())),
329 ]);
330 let serializer = column_aware_row_encoding::Serializer::new(
331 &column_ids,
332 [DataType::Int16, DataType::Varchar, DataType::Varchar],
333 );
334 let row_bytes = serializer.serialize(row1);
335
336 let deserializer = column_aware_row_encoding::Deserializer::new(
337 &[ColumnId::new(2), ColumnId::new(0)],
338 Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()),
339 std::iter::empty(),
340 );
341 let decoded = deserializer.deserialize(&row_bytes[..]);
342 assert_eq!(
343 decoded.unwrap(),
344 vec![Some(Utf8("ABC".into())), Some(Int16(5))]
345 );
346 }
347
348 #[test]
349 fn test_deserialize_partial_columns_with_default_columns() {
350 let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
351 let row1 = OwnedRow::new(vec![
352 Some(Int16(5)),
353 Some(Utf8("abc".into())),
354 Some(Utf8("ABC".into())),
355 ]);
356 let serializer = column_aware_row_encoding::Serializer::new(
357 &column_ids,
358 [DataType::Int16, DataType::Varchar, DataType::Varchar],
359 );
360 let row_bytes = serializer.serialize(row1);
361
362 let default_columns = vec![(1, Some(Utf8("new column".into())))];
364
365 let deserializer = column_aware_row_encoding::Deserializer::new(
366 &[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)],
367 Arc::from(
368 vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(),
369 ),
370 default_columns.into_iter(),
371 );
372 let decoded = deserializer.deserialize(&row_bytes[..]);
373 assert_eq!(
374 decoded.unwrap(),
375 vec![
376 Some(Utf8("ABC".into())),
377 Some(Utf8("new column".into())),
378 Some(Int16(5))
379 ]
380 );
381 }
382
383 #[test]
384 fn test_row_composite_types() {
385 let inner_struct: DataType =
386 StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
387 .with_ids([ColumnId::new(11), ColumnId::new(12)])
388 .into();
389 let list = DataType::List(Box::new(inner_struct.clone()));
390 let map = MapType::from_kv(DataType::Varchar, list.clone()).into();
391 let outer_struct = StructType::new([("f1", DataType::Int32), ("map", map)])
392 .with_ids([ColumnId::new(1), ColumnId::new(2)])
393 .into();
394
395 let inner_struct_value = StructValue::new(vec![Some(Int32(6)), Some(Bool(true))]);
396 let list_value =
397 ListValue::from_datum_iter(&inner_struct, [Some(Struct(inner_struct_value))]);
398 let map_value = MapValue::try_from_kv(
399 ListValue::from_datum_iter(&DataType::Varchar, [Some(Utf8("key".into()))]),
400 ListValue::from_datum_iter(&list, [Some(List(list_value))]),
401 )
402 .unwrap();
403 let outer_struct_value = StructValue::new(vec![Some(Int32(5)), Some(Map(map_value))]);
404
405 let datum = Some(Struct(outer_struct_value));
406 let row1 = [datum];
407
408 let column_ids = &[ColumnId::new(0)];
409 let data_types = vec![outer_struct];
410
411 let serializer = column_aware_row_encoding::Serializer::new(column_ids, data_types.clone());
412 let row_bytes = serializer.serialize(row1.clone());
413
414 assert_eq!(
415 row_bytes,
416 [
417 0b10000001, 1, 0, 0, 0, 0, 0, 0, 0, 0, 60, 0, 0, 0, 0b10000001, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 4, 5, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, b'k', b'e', b'y', 1, 1, 0, 0, 0, 1, 20, 0, 0, 0, 0b10000001, 2, 0, 0, 0, 11, 0, 0, 0, 12, 0, 0, 0, 0, 4, 6, 0, 0, 0, 1, ]
445 );
446
447 let deserializer = column_aware_row_encoding::Deserializer::new(
448 column_ids,
449 data_types.into(),
450 std::iter::empty(),
451 );
452 let decoded = deserializer.deserialize(&row_bytes).unwrap();
453
454 assert_eq!(OwnedRow::new(decoded), row1.to_owned_row());
455 }
456}