risingwave_common/util/value_encoding/
column_aware_row_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Column-aware row encoding is an encoding format which converts row into a binary form that
16//! remains explanable after schema changes
17//! Current design of flag just contains 1 meaningful information: the 2 LSBs represents
18//! the size of offsets: `u8`/`u16`/`u32`
19//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
20//! until schema changes
21
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38/// Serialize and deserialize functions that recursively use [`ColumnAwareSerde`] for nested fields
39/// of composite types.
40///
41/// Only for column with types that can be altered ([`DataType::can_alter`]).
42mod new_serde {
43    use super::*;
44    use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45    use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47    // --- serialize ---
48
49    // Copy of `plain` but call `new_` for the scalar.
50    fn new_serialize_datum(
51        data_type: &DataType,
52        datum_ref: impl ToDatumRef,
53        buf: &mut impl BufMut,
54    ) {
55        if let Some(d) = datum_ref.to_datum_ref() {
56            buf.put_u8(1);
57            new_serialize_scalar(data_type, d, buf)
58        } else {
59            buf.put_u8(0);
60        }
61    }
62
63    // Different logic from `plain`:
64    //
65    // Recursively construct a new column-aware `Serializer` for nested fields.
66    fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67        let serializer = super::Serializer::from_struct(struct_type.clone()); // cloning `StructType` is lightweight
68
69        // `StructRef` is interpreted as a `Row` here.
70        let bytes = serializer.serialize(value); // TODO: serialize into the buf directly if we can reserve space accurately
71        buf.put_u32_le(bytes.len() as _);
72        buf.put_slice(&bytes);
73    }
74
75    // Copy of `plain` but call `new_` for the elements.
76    fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77        let elems = value.iter();
78        buf.put_u32_le(elems.len() as u32);
79
80        elems.for_each(|field_value| {
81            new_serialize_datum(inner_type, field_value, buf);
82        });
83    }
84
85    // Different logic from `plain`:
86    //
87    // We don't reuse the serialization of `struct<k, v>[]` for map, as it introduces overhead
88    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
89    fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90        let elems = value.iter();
91        buf.put_u32_le(elems.len() as u32);
92
93        elems.for_each(|(k, v)| {
94            new_serialize_scalar(map_type.key(), k, buf);
95            new_serialize_datum(map_type.value(), v, buf);
96        });
97    }
98
99    // Copy of `plain` but call `new_` for composite types.
100    pub fn new_serialize_scalar(
101        data_type: &DataType,
102        value: ScalarRefImpl<'_>,
103        buf: &mut impl BufMut,
104    ) {
105        match value {
106            ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107            ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list_element_type(), l, buf),
108            ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110            _ => plain::serialize_scalar(value, buf),
111        }
112    }
113
114    // --- deserialize ---
115
116    // Copy of `plain` but call `new_` for the scalar.
117    fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118        let null_tag = data.get_u8();
119        match null_tag {
120            0 => Ok(None),
121            1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122            _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123        }
124    }
125
126    // Different logic from `plain`:
127    //
128    // Recursively construct a new column-aware `Deserializer` for nested fields.
129    fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130        let deserializer = super::Deserializer::from_struct(struct_def.clone()); // cloning `StructType` is lightweight
131        let encoded_len = data.get_u32_le() as usize;
132
133        let (struct_data, remaining) = data.split_at(encoded_len);
134        *data = remaining;
135        let fields = deserializer.deserialize(struct_data)?;
136
137        Ok(ScalarImpl::Struct(StructValue::new(fields)))
138    }
139
140    // Copy of `plain` but call `new_` for the elements.
141    fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142        let len = data.get_u32_le();
143        let mut builder = item_type.create_array_builder(len as usize);
144        for _ in 0..len {
145            builder.append(new_inner_deserialize_datum(data, item_type)?);
146        }
147        Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148    }
149
150    // Different logic from `plain`:
151    //
152    // We don't reuse the deserialization of `struct<k, v>[]` for map, as it introduces overhead
153    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
154    fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155        let len = data.get_u32_le();
156        let mut builder = map_type
157            .clone() // FIXME: clone type everytime here is inefficient
158            .into_struct()
159            .create_array_builder(len as usize);
160        for _ in 0..len {
161            let key = new_deserialize_scalar(map_type.key(), data)?;
162            let value = new_inner_deserialize_datum(data, map_type.value())?;
163            let entry = StructValue::new(vec![Some(key), value]);
164            builder.append(Some(ScalarImpl::Struct(entry)));
165        }
166        Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167            builder.finish(),
168        ))))
169    }
170
171    // Copy of `plain` but call `new_` for composite types.
172    pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173        Ok(match ty {
174            DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175            DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176            DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
178
179            data_types::simple!() => plain::deserialize_value(ty, data)?,
180        })
181    }
182}
183
184/// When a row has columns no more than this number, we will use stack for some intermediate buffers.
185const COLUMN_ON_STACK: usize = 8;
186
187/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189#[repr(u8)]
190enum OffsetWidth {
191    /// The offset of encoded data can be represented by u8.
192    Offset8 = 0b01,
193    /// The offset of encoded data can be represented by u16.
194    Offset16 = 0b10,
195    /// The offset of encoded data can be represented by u32.
196    Offset32 = 0b11,
197}
198
199impl OffsetWidth {
200    /// Get the width of the offset in bytes.
201    const fn width(self) -> usize {
202        match self {
203            OffsetWidth::Offset8 => 1,
204            OffsetWidth::Offset16 => 2,
205            OffsetWidth::Offset32 => 4,
206        }
207    }
208
209    const fn into_bits(self) -> u8 {
210        self as u8
211    }
212
213    const fn from_bits(bits: u8) -> Self {
214        match bits {
215            0b01 => OffsetWidth::Offset8,
216            0b10 => OffsetWidth::Offset16,
217            0b11 => OffsetWidth::Offset32,
218            _ => panic!("invalid offset width bits"),
219        }
220    }
221}
222
223/// Header (metadata) of the encoded row.
224///
225/// Layout (most to least significant bits):
226///
227/// ```text
228/// | magic | reserved | offset |
229/// |   1   |    5     |   2    |
230/// ```
231#[bitfield(u8, order = Msb)]
232#[derive(PartialEq, Eq)]
233struct Header {
234    /// Magic bit to indicate it's column-aware encoding.
235    /// Note that in plain value encoding, the first byte is always 0 or 1 for nullability,
236    /// of which the most significant bit is always 0.
237    #[bits(1, default = true, access = RO)]
238    magic: bool,
239
240    #[bits(5)]
241    _reserved: u8,
242
243    /// Indicate the offset width of the encoded data.
244    #[bits(2, default = OffsetWidth::Offset8)]
245    offset: OffsetWidth,
246}
247
248/// `RowEncoding` holds row-specific information for Column-Aware Encoding
249struct RowEncoding {
250    header: Header,
251    offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
252    data: Vec<u8>,
253}
254
255/// A trait unifying [`ToDatumRef`] and already encoded bytes.
256trait Encode {
257    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
258}
259
260impl<T> Encode for T
261where
262    T: ToDatumRef,
263{
264    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
265        if let Some(v) = self.to_datum_ref() {
266            // Use different encoding logic for alterable types.
267            if data_type.can_alter() == Some(true) {
268                new_serde::new_serialize_scalar(data_type, v, data);
269            } else {
270                plain::serialize_scalar(v, data);
271            }
272        }
273    }
274}
275
276impl Encode for Option<&[u8]> {
277    fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
278        // Already encoded, just copy the bytes.
279        if let Some(v) = self {
280            data.extend(v);
281        }
282    }
283}
284
285impl RowEncoding {
286    fn new() -> Self {
287        RowEncoding {
288            header: Header::new(),
289            offsets: Default::default(),
290            data: Default::default(),
291        }
292    }
293
294    fn set_offsets(&mut self, usize_offsets: &[usize]) {
295        debug_assert!(
296            self.offsets.is_empty(),
297            "should not set offsets multiple times"
298        );
299
300        // Use 0 if there's no data.
301        let max_offset = usize_offsets.last().copied().unwrap_or(0);
302
303        let offset_width = match max_offset {
304            _n @ ..=const { u8::MAX as usize } => OffsetWidth::Offset8,
305            _n @ ..=const { u16::MAX as usize } => OffsetWidth::Offset16,
306            _n @ ..=const { u32::MAX as usize } => OffsetWidth::Offset32,
307            _ => panic!("encoding length {} exceeds u32", max_offset),
308        };
309        self.header.set_offset(offset_width);
310
311        self.offsets
312            .resize(usize_offsets.len() * offset_width.width(), 0);
313
314        let mut offsets_buf = &mut self.offsets[..];
315        for &offset in usize_offsets {
316            offsets_buf.put_uint_le(offset as u64, offset_width.width());
317        }
318    }
319
320    fn encode<T: Encode>(
321        &mut self,
322        datums: impl IntoIterator<Item = T>,
323        data_types: impl IntoIterator<Item = &DataType>,
324    ) {
325        debug_assert!(
326            self.data.is_empty(),
327            "should not encode one RowEncoding object multiple times."
328        );
329        let datums = datums.into_iter();
330        let mut offset_usize =
331            SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
332        for (datum, data_type) in datums.zip_eq_debug(data_types) {
333            offset_usize.push(self.data.len());
334            datum.encode_to(data_type, &mut self.data);
335        }
336        self.set_offsets(&offset_usize);
337    }
338}
339
340mod data_types {
341    use crate::types::{DataType, StructType};
342
343    /// A trait unifying data types of a row and field types of a struct.
344    pub trait DataTypes: Clone {
345        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
346        fn at(&self, index: usize) -> &DataType;
347    }
348
349    impl<T> DataTypes for T
350    where
351        T: AsRef<[DataType]> + Clone,
352    {
353        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
354            self.as_ref().iter()
355        }
356
357        fn at(&self, index: usize) -> &DataType {
358            &self.as_ref()[index]
359        }
360    }
361
362    impl DataTypes for StructType {
363        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
364            self.types()
365        }
366
367        fn at(&self, index: usize) -> &DataType {
368            self.type_at(index)
369        }
370    }
371}
372use data_types::DataTypes;
373
374/// Column-Aware `Serializer` holds schema related information, and shall be
375/// created again once the schema changes
376#[derive(Clone)]
377pub struct Serializer<D: DataTypes = Vec<DataType>> {
378    encoded_column_ids: EncodedColumnIds,
379    data_types: D,
380}
381
382type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
383
384fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
385    // currently we hard-code ColumnId as i32
386    let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
387    let mut buf = &mut encoded_column_ids[..];
388    for id in column_ids {
389        buf.put_i32_le(id.get_id());
390    }
391    encoded_column_ids
392}
393
394impl Serializer {
395    /// Create a new `Serializer` with given `column_ids` and `data_types`.
396    pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
397        Self {
398            encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
399            data_types: data_types.into_iter().collect(),
400        }
401    }
402}
403
404impl Serializer<StructType> {
405    /// Create a new `Serializer` for the fields of the given struct.
406    ///
407    /// Panic if the struct type does not have field ids.
408    pub fn from_struct(struct_type: StructType) -> Self {
409        Self {
410            encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
411            data_types: struct_type,
412        }
413    }
414}
415
416impl<D: DataTypes> Serializer<D> {
417    fn datum_num(&self) -> usize {
418        self.encoded_column_ids.len() / 4
419    }
420
421    fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
422        let mut encoding = RowEncoding::new();
423        encoding.encode(datums, self.data_types.iter());
424        self.finish(encoding)
425    }
426
427    fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
428        let mut row_bytes = Vec::with_capacity(
429            5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), /* 5 comes from u8+u32 */
430        );
431        row_bytes.put_u8(encoding.header.into_bits());
432        row_bytes.put_u32_le(self.datum_num() as u32);
433        row_bytes.extend(&self.encoded_column_ids);
434        row_bytes.extend(&encoding.offsets);
435        row_bytes.extend(&encoding.data);
436
437        row_bytes
438    }
439}
440
441impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
442    /// Serialize a row under the schema of the Serializer
443    fn serialize(&self, row: impl Row) -> Vec<u8> {
444        assert_eq!(row.len(), self.datum_num());
445        self.serialize_raw(row.iter())
446    }
447}
448
449/// A view of the encoded bytes, which can be iterated over to get the column id and data.
450/// Used for deserialization.
451// TODO: can we unify this with `RowEncoding`, which is for serialization?
452#[derive(Clone)]
453struct EncodedBytes<'a> {
454    header: Header,
455
456    // When iterating, we will consume `column_ids` and `offsets` while keep `data` unchanged.
457    // This is because we record absolute values in `offsets` to index into `data`.
458    column_ids: &'a [u8],
459    offsets: &'a [u8],
460    data: &'a [u8],
461}
462
463impl<'a> EncodedBytes<'a> {
464    fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
465        let header = Header::from_bits(encoded_bytes.get_u8());
466        if !header.magic() {
467            return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
468        }
469        let offset_bytes = header.offset().width();
470
471        let datum_num = encoded_bytes.get_u32_le() as usize;
472        let offsets_start_idx = 4 * datum_num;
473        let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
474
475        Ok(EncodedBytes {
476            header,
477            column_ids: &encoded_bytes[..offsets_start_idx],
478            offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
479            data: &encoded_bytes[data_start_idx..],
480        })
481    }
482}
483
484impl<'a> Iterator for EncodedBytes<'a> {
485    type Item = (i32, &'a [u8]);
486
487    fn next(&mut self) -> Option<Self::Item> {
488        if self.column_ids.is_empty() {
489            assert!(self.offsets.is_empty());
490            return None;
491        }
492
493        let id = self.column_ids.get_i32_le();
494
495        let offset_width = self.header.offset().width();
496        let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
497
498        let this_offset = get_offset(&mut self.offsets);
499        let next_offset = if self.offsets.is_empty() {
500            self.data.len()
501        } else {
502            let mut peek_offsets = self.offsets; // copy the reference to the slice to avoid mutating the buf position
503            get_offset(&mut peek_offsets)
504        };
505
506        let data = &self.data[this_offset..next_offset];
507
508        Some((id, data))
509    }
510}
511
512/// Column-Aware `Deserializer` holds needed `ColumnIds` and their corresponding schema
513/// Should non-null default values be specified, a new field could be added to Deserializer
514#[derive(Clone)]
515pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
516    mapping: ColumnMapping,
517    data_types: D,
518
519    /// A row with default values for each column.
520    ///
521    /// `None` if all default values are `NULL`, typically for struct fields.
522    default_row: Option<Vec<Datum>>,
523}
524
525/// A mapping from column id to the index of the column in the schema.
526#[derive(Clone)]
527enum ColumnMapping {
528    /// For small number of columns, use linear search with `SmallVec`. This ensures no heap allocation.
529    Small(SmallVec<[i32; COLUMN_ON_STACK]>),
530    /// For larger number of columns, build a `HashMap` for faster lookup.
531    Large(HashMap<i32, usize>),
532}
533
534impl ColumnMapping {
535    fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
536        if column_ids.len() <= COLUMN_ON_STACK {
537            Self::Small(column_ids.map(|c| c.get_id()).collect())
538        } else {
539            Self::Large(
540                column_ids
541                    .enumerate()
542                    .map(|(i, c)| (c.get_id(), i))
543                    .collect(),
544            )
545        }
546    }
547
548    fn get(&self, id: i32) -> Option<usize> {
549        match self {
550            ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
551            ColumnMapping::Large(map) => map.get(&id).copied(),
552        }
553    }
554}
555
556impl Deserializer {
557    pub fn new(
558        column_ids: &[ColumnId],
559        schema: Arc<[DataType]>,
560        column_with_default: impl Iterator<Item = (usize, Datum)>,
561    ) -> Self {
562        assert_eq!(column_ids.len(), schema.len());
563        let mut default_row: Vec<Datum> = vec![None; schema.len()];
564        for (i, datum) in column_with_default {
565            default_row[i] = datum;
566        }
567        Self {
568            mapping: ColumnMapping::new(column_ids.iter().copied()),
569            data_types: schema,
570            default_row: Some(default_row),
571        }
572    }
573}
574
575impl Deserializer<StructType> {
576    /// Create a new `Deserializer` for the fields of the given struct.
577    ///
578    /// Panic if the struct type does not have field ids.
579    pub fn from_struct(struct_type: StructType) -> Self {
580        Self {
581            mapping: ColumnMapping::new(struct_type.ids().unwrap()),
582            data_types: struct_type,
583            default_row: None,
584        }
585    }
586}
587
588impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
589    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
590        let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
591
592        let mut row =
593            (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
594
595        for (id, mut data) in encoded_bytes {
596            let Some(decoded_idx) = self.mapping.get(id) else {
597                continue;
598            };
599            let data_type = self.data_types.at(decoded_idx);
600
601            let datum = if data.is_empty() {
602                None
603            } else if data_type.can_alter() == Some(true) {
604                Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
605            } else {
606                Some(plain::deserialize_value(data_type, &mut data)?)
607            };
608
609            row[decoded_idx] = datum;
610        }
611
612        Ok(row)
613    }
614}
615
616/// Combined column-aware `Serializer` and `Deserializer` given the same
617/// `column_ids` and `schema`
618#[derive(Clone)]
619pub struct ColumnAwareSerde {
620    pub serializer: Serializer,
621    pub deserializer: Deserializer,
622}
623
624impl ValueRowSerializer for ColumnAwareSerde {
625    fn serialize(&self, row: impl Row) -> Vec<u8> {
626        self.serializer.serialize(row)
627    }
628}
629
630impl ValueRowDeserializer for ColumnAwareSerde {
631    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
632        self.deserializer.deserialize(encoded_bytes)
633    }
634}
635
636/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
637/// If no column is dropped, returns None.
638// TODO: we only support trimming dropped top-level columns here; also support nested fields.
639pub fn try_drop_invalid_columns(
640    encoded_bytes: &[u8],
641    valid_column_ids: &HashSet<i32>,
642) -> Option<Vec<u8>> {
643    let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
644
645    let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
646    if !has_invalid_column {
647        return None;
648    }
649
650    // Slow path that drops columns. Should be rare.
651
652    let mut datums = Vec::with_capacity(valid_column_ids.len());
653    let mut column_ids = Vec::with_capacity(valid_column_ids.len());
654
655    for (id, data) in encoded_bytes {
656        if valid_column_ids.contains(&id) {
657            column_ids.push(ColumnId::new(id));
658            datums.push(if data.is_empty() { None } else { Some(data) });
659        }
660    }
661
662    // Data types are only needed when we are actually serializing. But we have encoded data here.
663    // Simple pass dummy data types.
664    let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
665
666    let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
667    Some(row_bytes)
668}