risingwave_connector/sink/encoder/
mod.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use risingwave_common::catalog::Schema;
19use risingwave_common::row::Row;
20
21use crate::sink::Result;
22
23mod avro;
24mod bson;
25pub mod bytes;
26mod json;
27mod proto;
28pub mod template;
29pub mod text;
30
31pub use avro::{AvroEncoder, AvroHeader};
32pub use bson::BsonEncoder;
33pub use json::JsonEncoder;
34pub use proto::{ProtoEncoder, ProtoHeader};
35
36pub trait RowEncoder {
41 type Output: SerTo<Vec<u8>>;
42
43 fn encode_cols(
44 &self,
45 row: impl Row,
46 col_indices: impl Iterator<Item = usize>,
47 ) -> Result<Self::Output>;
48 fn schema(&self) -> &Schema;
49 fn col_indices(&self) -> Option<&[usize]>;
50
51 fn encode(&self, row: impl Row) -> Result<Self::Output> {
52 assert_eq!(row.len(), self.schema().len());
53 match self.col_indices() {
54 Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
55 None => self.encode_cols(row, 0..self.schema().len()),
56 }
57 }
58}
59
60pub trait SerTo<T> {
74 fn ser_to(self) -> Result<T>;
75}
76
77impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
78 fn ser_to(self) -> Result<Vec<u8>> {
79 self.ser_to().map(|s: String| s.into_bytes())
80 }
81}
82
83impl<T> SerTo<T> for T {
84 fn ser_to(self) -> Result<T> {
85 Ok(self)
86 }
87}
88
89#[derive(Clone, Copy, Default)]
90pub enum DateHandlingMode {
91 #[default]
92 FromCe,
93 FromEpoch,
94 String,
95}
96
97#[derive(Clone, Copy)]
99pub enum TimestampHandlingMode {
100 Milli,
101 String,
102}
103
104#[derive(Clone, Copy)]
105pub enum TimeHandlingMode {
106 Milli,
107 String,
108}
109
110#[derive(Clone, Copy, Default)]
111pub enum TimestamptzHandlingMode {
112 #[default]
113 UtcString,
114 UtcWithoutSuffix,
115 Micro,
116 Milli,
117}
118
119impl TimestamptzHandlingMode {
120 pub const FRONTEND_DEFAULT: &'static str = "utc_string";
121 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
122
123 pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
124 match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
125 Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
126 Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
127 Some("micro") => Ok(Self::Micro),
128 Some("milli") => Ok(Self::Milli),
129 Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
130 "unrecognized {} value {}",
131 Self::OPTION_KEY,
132 v
133 ))),
134 None => Ok(Self::UtcWithoutSuffix),
137 }
138 }
139}
140
141#[derive(Clone)]
142pub enum CustomJsonType {
143 Doris(HashMap<String, u8>),
147 Es,
149 StarRocks,
151 None,
152}
153
154pub enum JsonbHandlingMode {
159 String,
160 Dynamic,
161}
162
163impl JsonbHandlingMode {
164 pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
165
166 pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
167 match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
168 Some("string") | None => Ok(Self::String),
169 Some("dynamic") => Ok(Self::Dynamic),
170 Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
171 "unrecognized {} value {}",
172 Self::OPTION_KEY,
173 v
174 ))),
175 }
176 }
177}
178
179#[derive(Debug)]
180struct FieldEncodeError {
181 message: String,
182 rev_path: Vec<String>,
183}
184
185impl FieldEncodeError {
186 fn new(message: impl std::fmt::Display) -> Self {
187 Self {
188 message: message.to_string(),
189 rev_path: vec![],
190 }
191 }
192
193 fn with_name(mut self, name: &str) -> Self {
194 self.rev_path.push(name.into());
195 self
196 }
197}
198
199impl std::fmt::Display for FieldEncodeError {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 use itertools::Itertools;
202
203 write!(
204 f,
205 "encode '{}' error: {}",
206 self.rev_path.iter().rev().join("."),
207 self.message
208 )
209 }
210}
211
212impl From<FieldEncodeError> for super::SinkError {
213 fn from(value: FieldEncodeError) -> Self {
214 Self::Encode(value.to_string())
215 }
216}
217
218#[derive(Clone)]
219pub struct KafkaConnectParams {
220 pub schema_name: String,
221}
222
223type KafkaConnectParamsRef = Arc<KafkaConnectParams>;