risingwave_connector/sink/encoder/
text.rsuse risingwave_common::catalog::Schema;
use risingwave_common::types::{DataType, ToText};
use super::RowEncoder;
pub struct TextEncoder {
pub schema: Schema,
pub col_index: usize,
}
impl TextEncoder {
pub fn new(schema: Schema, col_index: usize) -> Self {
Self { schema, col_index }
}
}
impl RowEncoder for TextEncoder {
type Output = String;
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}
fn col_indices(&self) -> Option<&[usize]> {
Some(std::slice::from_ref(&self.col_index))
}
fn encode_cols(
&self,
row: impl risingwave_common::row::Row,
col_indices: impl Iterator<Item = usize>,
) -> crate::sink::Result<Self::Output> {
let mut result = String::new();
for col_index in col_indices {
let datum = row.datum_at(col_index);
let data_type = &self.schema.fields[col_index].data_type;
if data_type == &DataType::Boolean {
result = if let Some(scalar_impl) = datum {
scalar_impl.into_bool().to_string()
} else {
"NULL".to_string()
}
} else {
result = datum.to_text_with_type(data_type);
}
}
Ok(result)
}
}
#[cfg(test)]
mod test {
use risingwave_common::catalog::Field;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;
use super::*;
#[test]
fn test_text_encoder_ser_bool() {
let schema = Schema::new(vec![Field::with_name(DataType::Boolean, "col1")]);
let encoder = TextEncoder::new(schema, 0);
let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(true))]);
assert_eq!(
encoder
.encode_cols(&row, std::iter::once(0))
.unwrap()
.as_str(),
"true"
);
let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(false))]);
assert_eq!(
encoder
.encode_cols(&row, std::iter::once(0))
.unwrap()
.as_str(),
"false"
);
let row = OwnedRow::new(vec![None]);
assert_eq!(
encoder
.encode_cols(&row, std::iter::once(0))
.unwrap()
.as_str(),
"NULL"
);
}
}