risingwave_connector/sink/encoder/
mod.rsuse std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use crate::sink::Result;
mod avro;
mod bson;
pub mod bytes;
mod json;
mod proto;
pub mod template;
pub mod text;
pub use avro::{AvroEncoder, AvroHeader};
pub use bson::BsonEncoder;
pub use json::JsonEncoder;
pub use proto::{ProtoEncoder, ProtoHeader};
pub trait RowEncoder {
type Output: SerTo<Vec<u8>>;
fn encode_cols(
&self,
row: impl Row,
col_indices: impl Iterator<Item = usize>,
) -> Result<Self::Output>;
fn schema(&self) -> &Schema;
fn col_indices(&self) -> Option<&[usize]>;
fn encode(&self, row: impl Row) -> Result<Self::Output> {
assert_eq!(row.len(), self.schema().len());
match self.col_indices() {
Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
None => self.encode_cols(row, 0..self.schema().len()),
}
}
}
pub trait SerTo<T> {
fn ser_to(self) -> Result<T>;
}
impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
fn ser_to(self) -> Result<Vec<u8>> {
self.ser_to().map(|s: String| s.into_bytes())
}
}
impl<T> SerTo<T> for T {
fn ser_to(self) -> Result<T> {
Ok(self)
}
}
#[derive(Clone, Copy, Default)]
pub enum DateHandlingMode {
#[default]
FromCe,
FromEpoch,
String,
}
#[derive(Clone, Copy)]
pub enum TimestampHandlingMode {
Milli,
String,
}
#[derive(Clone, Copy)]
pub enum TimeHandlingMode {
Milli,
String,
}
#[derive(Clone, Copy, Default)]
pub enum TimestamptzHandlingMode {
#[default]
UtcString,
UtcWithoutSuffix,
Micro,
Milli,
}
impl TimestamptzHandlingMode {
pub const FRONTEND_DEFAULT: &'static str = "utc_string";
pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
Some("micro") => Ok(Self::Micro),
Some("milli") => Ok(Self::Milli),
Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
"unrecognized {} value {}",
Self::OPTION_KEY,
v
))),
None => Ok(Self::UtcWithoutSuffix),
}
}
}
#[derive(Clone)]
pub enum CustomJsonType {
Doris(HashMap<String, u8>),
Es,
StarRocks,
None,
}
pub enum JsonbHandlingMode {
String,
Dynamic,
}
impl JsonbHandlingMode {
pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some("string") | None => Ok(Self::String),
Some("dynamic") => Ok(Self::Dynamic),
Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
"unrecognized {} value {}",
Self::OPTION_KEY,
v
))),
}
}
}
#[derive(Debug)]
struct FieldEncodeError {
message: String,
rev_path: Vec<String>,
}
impl FieldEncodeError {
fn new(message: impl std::fmt::Display) -> Self {
Self {
message: message.to_string(),
rev_path: vec![],
}
}
fn with_name(mut self, name: &str) -> Self {
self.rev_path.push(name.into());
self
}
}
impl std::fmt::Display for FieldEncodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use itertools::Itertools;
write!(
f,
"encode '{}' error: {}",
self.rev_path.iter().rev().join("."),
self.message
)
}
}
impl From<FieldEncodeError> for super::SinkError {
fn from(value: FieldEncodeError) -> Self {
Self::Encode(value.to_string())
}
}
#[derive(Clone)]
pub struct KafkaConnectParams {
pub schema_name: String,
}
type KafkaConnectParamsRef = Arc<KafkaConnectParams>;