risingwave_connector/sink/encoder/
mod.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono_tz::Tz;
19use risingwave_common::catalog::Schema;
20use risingwave_common::row::Row;
21
22use crate::sink::Result;
23
24mod avro;
25mod bson;
26pub mod bytes;
27mod json;
28mod proto;
29pub mod template;
30pub mod text;
31
32pub use avro::{AvroEncoder, AvroHeader};
33pub use bson::BsonEncoder;
34pub use json::JsonEncoder;
35pub use proto::{ProtoEncoder, ProtoHeader};
36
37pub trait RowEncoder {
42 type Output: SerTo<Vec<u8>>;
43
44 fn encode_cols(
45 &self,
46 row: impl Row,
47 col_indices: impl Iterator<Item = usize>,
48 ) -> Result<Self::Output>;
49 fn schema(&self) -> &Schema;
50 fn col_indices(&self) -> Option<&[usize]>;
51
52 fn encode(&self, row: impl Row) -> Result<Self::Output> {
53 assert_eq!(row.len(), self.schema().len());
54 match self.col_indices() {
55 Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
56 None => self.encode_cols(row, 0..self.schema().len()),
57 }
58 }
59}
60
61pub trait SerTo<T> {
75 fn ser_to(self) -> Result<T>;
76}
77
78impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
79 fn ser_to(self) -> Result<Vec<u8>> {
80 self.ser_to().map(|s: String| s.into_bytes())
81 }
82}
83
84impl<T> SerTo<T> for T {
85 fn ser_to(self) -> Result<T> {
86 Ok(self)
87 }
88}
89
90#[derive(Clone, Copy, Default)]
91pub enum DateHandlingMode {
92 #[default]
93 FromCe,
94 FromEpoch,
95 String,
96}
97
98#[derive(Clone, Copy)]
100pub enum TimestampHandlingMode {
101 Milli,
102 String,
103}
104
105#[derive(Clone, Copy)]
106pub enum TimeHandlingMode {
107 Milli,
108 String,
109}
110
111#[derive(Clone, Copy, Default)]
112pub enum TimestamptzHandlingMode {
113 #[default]
114 UtcString,
115 UtcWithoutSuffix,
116 SpecifiedTimezoneWithoutSuffix(Tz),
117 Micro,
118 Milli,
119}
120
121impl TimestamptzHandlingMode {
122 pub const FRONTEND_DEFAULT: &'static str = "utc_string";
123 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
124
125 pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
126 match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
127 Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
128 Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
129 Some("micro") => Ok(Self::Micro),
130 Some("milli") => Ok(Self::Milli),
131 Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
132 "unrecognized {} value {}",
133 Self::OPTION_KEY,
134 v
135 ))),
136 None => Ok(Self::UtcWithoutSuffix),
139 }
140 }
141}
142
143#[derive(Clone)]
144pub enum CustomJsonType {
145 Doris(HashMap<String, u8>),
149 Es,
151 StarRocks,
153 None,
154}
155
156pub enum JsonbHandlingMode {
161 String,
162 Dynamic,
163}
164
165impl JsonbHandlingMode {
166 pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
167
168 pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
169 match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
170 Some("string") | None => Ok(Self::String),
171 Some("dynamic") => Ok(Self::Dynamic),
172 Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
173 "unrecognized {} value {}",
174 Self::OPTION_KEY,
175 v
176 ))),
177 }
178 }
179}
180
181#[derive(Debug)]
182struct FieldEncodeError {
183 message: String,
184 rev_path: Vec<String>,
185}
186
187impl FieldEncodeError {
188 fn new(message: impl std::fmt::Display) -> Self {
189 Self {
190 message: message.to_string(),
191 rev_path: vec![],
192 }
193 }
194
195 fn with_name(mut self, name: &str) -> Self {
196 self.rev_path.push(name.into());
197 self
198 }
199}
200
201impl std::fmt::Display for FieldEncodeError {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 use itertools::Itertools;
204
205 write!(
206 f,
207 "encode '{}' error: {}",
208 self.rev_path.iter().rev().join("."),
209 self.message
210 )
211 }
212}
213
214impl From<FieldEncodeError> for super::SinkError {
215 fn from(value: FieldEncodeError) -> Self {
216 Self::Encode(value.to_string())
217 }
218}
219
220#[derive(Clone)]
221pub struct KafkaConnectParams {
222 pub schema_name: String,
223}
224
225type KafkaConnectParamsRef = Arc<KafkaConnectParams>;