risingwave_connector/sink/encoder/
text.rs1use risingwave_common::catalog::Schema;
16use risingwave_common::types::{DataType, ToText};
17
18use super::RowEncoder;
19
20pub struct TextEncoder {
22 pub schema: Schema,
23 pub col_index: usize,
25}
26
27impl TextEncoder {
28 pub fn new(schema: Schema, col_index: usize) -> Self {
29 Self { schema, col_index }
30 }
31}
32
33impl RowEncoder for TextEncoder {
34 type Output = String;
35
36 fn schema(&self) -> &risingwave_common::catalog::Schema {
37 &self.schema
38 }
39
40 fn col_indices(&self) -> Option<&[usize]> {
41 Some(std::slice::from_ref(&self.col_index))
42 }
43
44 fn encode_cols(
45 &self,
46 row: impl risingwave_common::row::Row,
47 col_indices: impl Iterator<Item = usize>,
48 ) -> crate::sink::Result<Self::Output> {
49 let mut result = String::new();
51 for col_index in col_indices {
52 let datum = row.datum_at(col_index);
53 let data_type = &self.schema.fields[col_index].data_type;
54 if data_type == &DataType::Boolean {
55 result = if let Some(scalar_impl) = datum {
56 scalar_impl.into_bool().to_string()
57 } else {
58 "NULL".to_owned()
59 }
60 } else {
61 result = datum.to_text_with_type(data_type);
62 }
63 }
64
65 Ok(result)
66 }
67}
68
69#[cfg(test)]
70mod test {
71 use risingwave_common::catalog::Field;
72 use risingwave_common::row::OwnedRow;
73 use risingwave_common::types::ScalarImpl;
74
75 use super::*;
76
77 #[test]
78 fn test_text_encoder_ser_bool() {
79 let schema = Schema::new(vec![Field::with_name(DataType::Boolean, "col1")]);
80 let encoder = TextEncoder::new(schema, 0);
81
82 let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(true))]);
83 assert_eq!(
84 encoder
85 .encode_cols(&row, std::iter::once(0))
86 .unwrap()
87 .as_str(),
88 "true"
89 );
90
91 let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(false))]);
92 assert_eq!(
93 encoder
94 .encode_cols(&row, std::iter::once(0))
95 .unwrap()
96 .as_str(),
97 "false"
98 );
99
100 let row = OwnedRow::new(vec![None]);
101 assert_eq!(
102 encoder
103 .encode_cols(&row, std::iter::once(0))
104 .unwrap()
105 .as_str(),
106 "NULL"
107 );
108 }
109}