risingwave_connector/sink/encoder/
bytes.rs1use risingwave_common::catalog::Schema;
16use risingwave_common::types::DataType;
17
18use super::RowEncoder;
19
20pub struct BytesEncoder {
21 pub schema: Schema,
22 pub col_index: usize,
24}
25
26impl BytesEncoder {
27 pub fn new(schema: Schema, col_index: usize) -> Self {
28 Self { schema, col_index }
29 }
30}
31
32impl RowEncoder for BytesEncoder {
33 type Output = Vec<u8>;
34
35 fn schema(&self) -> &risingwave_common::catalog::Schema {
36 &self.schema
37 }
38
39 fn col_indices(&self) -> Option<&[usize]> {
40 Some(std::slice::from_ref(&self.col_index))
41 }
42
43 fn encode_cols(
44 &self,
45 row: impl risingwave_common::row::Row,
46 col_indices: impl Iterator<Item = usize>,
47 ) -> crate::sink::Result<Self::Output> {
48 let mut result = Vec::new();
50 for col_index in col_indices {
51 let datum = row.datum_at(col_index);
52 let data_type = &self.schema.fields[col_index].data_type;
53 if data_type == &DataType::Bytea {
54 if let Some(scalar_impl) = datum {
55 result = scalar_impl.into_bytea().to_vec();
56 } else {
57 result = vec![];
58 }
59 } else {
60 return Err(crate::sink::SinkError::Encode(format!(
61 "Unsupported data type: expected bytea, got {}",
62 data_type
63 )));
64 }
65 }
66
67 Ok(result)
68 }
69}
70
71#[cfg(test)]
72mod test {
73 use risingwave_common::catalog::Field;
74 use risingwave_common::row::OwnedRow;
75 use risingwave_common::types::ScalarImpl;
76
77 use super::*;
78
79 #[test]
80 fn test_bytes_encoder_ser_bytes() {
81 let schema = Schema::new(vec![Field::with_name(DataType::Bytea, "col1")]);
82 let encoder = BytesEncoder::new(schema, 0);
83
84 let row = OwnedRow::new(vec![Some(ScalarImpl::Bytea(b"some_bytes".to_vec().into()))]);
85 assert_eq!(
86 encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
87 b"some_bytes".to_vec()
88 );
89
90 let row = OwnedRow::new(vec![None]);
91 assert_eq!(
92 encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
93 Vec::<u8>::new()
94 );
95
96 let schema = Schema::new(vec![Field::with_name(DataType::Int16, "col1")]);
97 let encoder = BytesEncoder::new(schema, 0);
98 let row = OwnedRow::new(vec![Some(ScalarImpl::Int16(123))]);
99 assert!(encoder.encode_cols(&row, std::iter::once(0)).is_err());
100 }
101}