risingwave_connector/sink/encoder/
text.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::Schema;
16use risingwave_common::types::{DataType, ToText};
17
18use super::RowEncoder;
19
20/// Encode with [`ToText`]. Only used to encode key.
21pub struct TextEncoder {
22    pub schema: Schema,
23    // the column must contain only one element
24    pub col_index: usize,
25}
26
27impl TextEncoder {
28    pub fn new(schema: Schema, col_index: usize) -> Self {
29        Self { schema, col_index }
30    }
31}
32
33impl RowEncoder for TextEncoder {
34    type Output = String;
35
36    fn schema(&self) -> &risingwave_common::catalog::Schema {
37        &self.schema
38    }
39
40    fn col_indices(&self) -> Option<&[usize]> {
41        Some(std::slice::from_ref(&self.col_index))
42    }
43
44    fn encode_cols(
45        &self,
46        row: impl risingwave_common::row::Row,
47        col_indices: impl Iterator<Item = usize>,
48    ) -> crate::sink::Result<Self::Output> {
49        // It is guaranteed by the caller that col_indices contains only one element
50        let mut result = String::new();
51        for col_index in col_indices {
52            let datum = row.datum_at(col_index);
53            let data_type = &self.schema.fields[col_index].data_type;
54            if data_type == &DataType::Boolean {
55                result = if let Some(scalar_impl) = datum {
56                    scalar_impl.into_bool().to_string()
57                } else {
58                    "NULL".to_owned()
59                }
60            } else {
61                result = datum.to_text_with_type(data_type);
62            }
63        }
64
65        Ok(result)
66    }
67}
68
69#[cfg(test)]
70mod test {
71    use risingwave_common::catalog::Field;
72    use risingwave_common::row::OwnedRow;
73    use risingwave_common::types::ScalarImpl;
74
75    use super::*;
76
77    #[test]
78    fn test_text_encoder_ser_bool() {
79        let schema = Schema::new(vec![Field::with_name(DataType::Boolean, "col1")]);
80        let encoder = TextEncoder::new(schema, 0);
81
82        let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(true))]);
83        assert_eq!(
84            encoder
85                .encode_cols(&row, std::iter::once(0))
86                .unwrap()
87                .as_str(),
88            "true"
89        );
90
91        let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(false))]);
92        assert_eq!(
93            encoder
94                .encode_cols(&row, std::iter::once(0))
95                .unwrap()
96                .as_str(),
97            "false"
98        );
99
100        let row = OwnedRow::new(vec![None]);
101        assert_eq!(
102            encoder
103                .encode_cols(&row, std::iter::once(0))
104                .unwrap()
105                .as_str(),
106            "NULL"
107        );
108    }
109}