risingwave_connector/sink/encoder/
mod.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono_tz::Tz;
19use risingwave_common::catalog::Schema;
20use risingwave_common::row::Row;
21
22use crate::sink::Result;
23
24mod avro;
25mod bson;
26pub mod bytes;
27mod json;
28mod proto;
29pub mod template;
30pub mod text;
31
32pub use avro::{AvroEncoder, AvroHeader};
33pub use bson::BsonEncoder;
34pub use json::JsonEncoder;
35pub use proto::{ProtoEncoder, ProtoHeader};
36
37pub trait RowEncoder {
42 type Output: SerTo<Vec<u8>>;
43
44 fn encode_cols(
45 &self,
46 row: impl Row,
47 col_indices: impl Iterator<Item = usize>,
48 ) -> Result<Self::Output>;
49 fn schema(&self) -> &Schema;
50 fn col_indices(&self) -> Option<&[usize]>;
51
52 fn encode(&self, row: impl Row) -> Result<Self::Output> {
53 assert_eq!(row.len(), self.schema().len());
54 match self.col_indices() {
55 Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
56 None => self.encode_cols(row, 0..self.schema().len()),
57 }
58 }
59}
60
61pub trait SerTo<T> {
75 fn ser_to(self) -> Result<T>;
76}
77
78impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
79 fn ser_to(self) -> Result<Vec<u8>> {
80 self.ser_to().map(|s: String| s.into_bytes())
81 }
82}
83
84impl<T> SerTo<T> for T {
85 fn ser_to(self) -> Result<T> {
86 Ok(self)
87 }
88}
89
90#[derive(Clone, Copy, Default)]
91pub enum DateHandlingMode {
92 #[default]
93 FromCe,
94 FromEpoch,
95 String,
96}
97
98#[derive(Clone, Copy)]
100pub enum TimestampHandlingMode {
101 Milli,
102 String,
103 Iso8601String,
105}
106
107#[derive(Clone, Copy)]
108pub enum TimeHandlingMode {
109 Milli,
110 String,
111}
112
113#[derive(Clone, Copy, Default)]
114pub enum TimestamptzHandlingMode {
115 #[default]
116 UtcString,
117 UtcWithoutSuffix,
118 SpecifiedTimezoneWithoutSuffix(Tz),
119 Micro,
120 Milli,
121}
122
123impl TimestamptzHandlingMode {
124 pub const FRONTEND_DEFAULT: &'static str = "utc_string";
125 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
126
127 pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
128 match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
129 Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
130 Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
131 Some("micro") => Ok(Self::Micro),
132 Some("milli") => Ok(Self::Milli),
133 Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
134 "unrecognized {} value {}",
135 Self::OPTION_KEY,
136 v
137 ))),
138 None => Ok(Self::UtcWithoutSuffix),
141 }
142 }
143}
144
145#[derive(Clone)]
146pub enum CustomJsonType {
147 Doris(HashMap<String, u8>),
151 Es,
153 StarRocks,
155 Turbopuffer,
157 None,
158}
159
160pub enum JsonbHandlingMode {
165 String,
166 Dynamic,
167}
168
169impl JsonbHandlingMode {
170 pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
171
172 pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
173 match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
174 Some("string") | None => Ok(Self::String),
175 Some("dynamic") => Ok(Self::Dynamic),
176 Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
177 "unrecognized {} value {}",
178 Self::OPTION_KEY,
179 v
180 ))),
181 }
182 }
183}
184
185#[derive(Debug)]
186struct FieldEncodeError {
187 message: String,
188 rev_path: Vec<String>,
189}
190
191impl FieldEncodeError {
192 fn new(message: impl std::fmt::Display) -> Self {
193 Self {
194 message: message.to_string(),
195 rev_path: vec![],
196 }
197 }
198
199 fn with_name(mut self, name: &str) -> Self {
200 self.rev_path.push(name.into());
201 self
202 }
203}
204
205impl std::fmt::Display for FieldEncodeError {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 use itertools::Itertools;
208
209 write!(
210 f,
211 "encode '{}' error: {}",
212 self.rev_path.iter().rev().join("."),
213 self.message
214 )
215 }
216}
217
218impl From<FieldEncodeError> for super::SinkError {
219 fn from(value: FieldEncodeError) -> Self {
220 Self::Encode(value.to_string())
221 }
222}
223
224#[derive(Clone)]
225pub struct KafkaConnectParams {
226 pub schema_name: String,
227}
228
229type KafkaConnectParamsRef = Arc<KafkaConnectParams>;