risingwave_connector/sink/encoder/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use risingwave_common::catalog::Schema;
19use risingwave_common::row::Row;
20
21use crate::sink::Result;
22
23mod avro;
24mod bson;
25pub mod bytes;
26mod json;
27mod proto;
28pub mod template;
29pub mod text;
30
31pub use avro::{AvroEncoder, AvroHeader};
32pub use bson::BsonEncoder;
33pub use json::JsonEncoder;
34pub use proto::{ProtoEncoder, ProtoHeader};
35
36/// Encode a row of a relation into
37/// * an object in json
38/// * a message in protobuf
39/// * a record in avro
40pub trait RowEncoder {
41    type Output: SerTo<Vec<u8>>;
42
43    fn encode_cols(
44        &self,
45        row: impl Row,
46        col_indices: impl Iterator<Item = usize>,
47    ) -> Result<Self::Output>;
48    fn schema(&self) -> &Schema;
49    fn col_indices(&self) -> Option<&[usize]>;
50
51    fn encode(&self, row: impl Row) -> Result<Self::Output> {
52        assert_eq!(row.len(), self.schema().len());
53        match self.col_indices() {
54            Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
55            None => self.encode_cols(row, 0..self.schema().len()),
56        }
57    }
58}
59
60/// Do the actual encoding from
61/// * an json object
62/// * a protobuf message
63/// * an avro record
64///   into
65/// * string (required by kinesis key)
66/// * bytes
67///
68/// This is like `TryInto` but allows us to `impl<T: SerTo<String>> SerTo<Vec<u8>> for T`.
69///
70/// Note that `serde` does not fit here because its data model does not contain logical types.
71/// For example, although `chrono::DateTime` implements `Serialize`,
72/// it produces avro String rather than avro `TimestampMicros`.
73pub trait SerTo<T> {
74    fn ser_to(self) -> Result<T>;
75}
76
77impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
78    fn ser_to(self) -> Result<Vec<u8>> {
79        self.ser_to().map(|s: String| s.into_bytes())
80    }
81}
82
83impl<T> SerTo<T> for T {
84    fn ser_to(self) -> Result<T> {
85        Ok(self)
86    }
87}
88
89#[derive(Clone, Copy, Default)]
90pub enum DateHandlingMode {
91    #[default]
92    FromCe,
93    FromEpoch,
94    String,
95}
96
97/// Useful for both json and protobuf
98#[derive(Clone, Copy)]
99pub enum TimestampHandlingMode {
100    Milli,
101    String,
102}
103
104#[derive(Clone, Copy)]
105pub enum TimeHandlingMode {
106    Milli,
107    String,
108}
109
110#[derive(Clone, Copy, Default)]
111pub enum TimestamptzHandlingMode {
112    #[default]
113    UtcString,
114    UtcWithoutSuffix,
115    Micro,
116    Milli,
117}
118
119impl TimestamptzHandlingMode {
120    pub const FRONTEND_DEFAULT: &'static str = "utc_string";
121    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
122
123    pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
124        match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
125            Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
126            Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
127            Some("micro") => Ok(Self::Micro),
128            Some("milli") => Ok(Self::Milli),
129            Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
130                "unrecognized {} value {}",
131                Self::OPTION_KEY,
132                v
133            ))),
134            // This is not a good default. We just have to select it when no option is provided
135            // for compatibility with old version.
136            None => Ok(Self::UtcWithoutSuffix),
137        }
138    }
139}
140
141#[derive(Clone)]
142pub enum CustomJsonType {
143    // Doris's json need date is string.
144    // The internal order of the struct should follow the insertion order.
145    // The decimal needs verification and calibration.
146    Doris(HashMap<String, u8>),
147    // Es's json need jsonb is struct
148    Es,
149    // starrocks' need jsonb is struct
150    StarRocks,
151    None,
152}
153
154/// How the jsonb type is encoded.
155///
156/// - `String`: encode jsonb as string. `[1, true, "foo"] -> "[1, true, \"foo\"]"`
157/// - `Dynamic`: encode jsonb as json type dynamically. `[1, true, "foo"] -> [1, true, "foo"]`
158pub enum JsonbHandlingMode {
159    String,
160    Dynamic,
161}
162
163impl JsonbHandlingMode {
164    pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
165
166    pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
167        match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
168            Some("string") | None => Ok(Self::String),
169            Some("dynamic") => Ok(Self::Dynamic),
170            Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
171                "unrecognized {} value {}",
172                Self::OPTION_KEY,
173                v
174            ))),
175        }
176    }
177}
178
179#[derive(Debug)]
180struct FieldEncodeError {
181    message: String,
182    rev_path: Vec<String>,
183}
184
185impl FieldEncodeError {
186    fn new(message: impl std::fmt::Display) -> Self {
187        Self {
188            message: message.to_string(),
189            rev_path: vec![],
190        }
191    }
192
193    fn with_name(mut self, name: &str) -> Self {
194        self.rev_path.push(name.into());
195        self
196    }
197}
198
199impl std::fmt::Display for FieldEncodeError {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        use itertools::Itertools;
202
203        write!(
204            f,
205            "encode '{}' error: {}",
206            self.rev_path.iter().rev().join("."),
207            self.message
208        )
209    }
210}
211
212impl From<FieldEncodeError> for super::SinkError {
213    fn from(value: FieldEncodeError) -> Self {
214        Self::Encode(value.to_string())
215    }
216}
217
218#[derive(Clone)]
219pub struct KafkaConnectParams {
220    pub schema_name: String,
221}
222
223type KafkaConnectParamsRef = Arc<KafkaConnectParams>;