risingwave_connector/sink/encoder/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono_tz::Tz;
19use risingwave_common::catalog::Schema;
20use risingwave_common::row::Row;
21
22use crate::sink::Result;
23
24mod avro;
25mod bson;
26pub mod bytes;
27mod json;
28mod proto;
29pub mod template;
30pub mod text;
31
32pub use avro::{AvroEncoder, AvroHeader};
33pub use bson::BsonEncoder;
34pub use json::JsonEncoder;
35pub use proto::{ProtoEncoder, ProtoHeader};
36
37/// Encode a row of a relation into
38/// * an object in json
39/// * a message in protobuf
40/// * a record in avro
41pub trait RowEncoder {
42    type Output: SerTo<Vec<u8>>;
43
44    fn encode_cols(
45        &self,
46        row: impl Row,
47        col_indices: impl Iterator<Item = usize>,
48    ) -> Result<Self::Output>;
49    fn schema(&self) -> &Schema;
50    fn col_indices(&self) -> Option<&[usize]>;
51
52    fn encode(&self, row: impl Row) -> Result<Self::Output> {
53        assert_eq!(row.len(), self.schema().len());
54        match self.col_indices() {
55            Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
56            None => self.encode_cols(row, 0..self.schema().len()),
57        }
58    }
59}
60
61/// Do the actual encoding from
62/// * an json object
63/// * a protobuf message
64/// * an avro record
65///   into
66/// * string (required by kinesis key)
67/// * bytes
68///
69/// This is like `TryInto` but allows us to `impl<T: SerTo<String>> SerTo<Vec<u8>> for T`.
70///
71/// Note that `serde` does not fit here because its data model does not contain logical types.
72/// For example, although `chrono::DateTime` implements `Serialize`,
73/// it produces avro String rather than avro `TimestampMicros`.
74pub trait SerTo<T> {
75    fn ser_to(self) -> Result<T>;
76}
77
78impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
79    fn ser_to(self) -> Result<Vec<u8>> {
80        self.ser_to().map(|s: String| s.into_bytes())
81    }
82}
83
84impl<T> SerTo<T> for T {
85    fn ser_to(self) -> Result<T> {
86        Ok(self)
87    }
88}
89
90#[derive(Clone, Copy, Default)]
91pub enum DateHandlingMode {
92    #[default]
93    FromCe,
94    FromEpoch,
95    String,
96}
97
98/// Useful for both json and protobuf
99#[derive(Clone, Copy)]
100pub enum TimestampHandlingMode {
101    Milli,
102    String,
103}
104
105#[derive(Clone, Copy)]
106pub enum TimeHandlingMode {
107    Milli,
108    String,
109}
110
111#[derive(Clone, Copy, Default)]
112pub enum TimestamptzHandlingMode {
113    #[default]
114    UtcString,
115    UtcWithoutSuffix,
116    SpecifiedTimezoneWithoutSuffix(Tz),
117    Micro,
118    Milli,
119}
120
121impl TimestamptzHandlingMode {
122    pub const FRONTEND_DEFAULT: &'static str = "utc_string";
123    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
124
125    pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
126        match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
127            Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
128            Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
129            Some("micro") => Ok(Self::Micro),
130            Some("milli") => Ok(Self::Milli),
131            Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
132                "unrecognized {} value {}",
133                Self::OPTION_KEY,
134                v
135            ))),
136            // This is not a good default. We just have to select it when no option is provided
137            // for compatibility with old version.
138            None => Ok(Self::UtcWithoutSuffix),
139        }
140    }
141}
142
143#[derive(Clone)]
144pub enum CustomJsonType {
145    // Doris's json need date is string.
146    // The internal order of the struct should follow the insertion order.
147    // The decimal needs verification and calibration.
148    Doris(HashMap<String, u8>),
149    // Es's json need jsonb is struct
150    Es,
151    // starrocks' need jsonb is struct
152    StarRocks,
153    None,
154}
155
156/// How the jsonb type is encoded.
157///
158/// - `String`: encode jsonb as string. `[1, true, "foo"] -> "[1, true, \"foo\"]"`
159/// - `Dynamic`: encode jsonb as json type dynamically. `[1, true, "foo"] -> [1, true, "foo"]`
160pub enum JsonbHandlingMode {
161    String,
162    Dynamic,
163}
164
165impl JsonbHandlingMode {
166    pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
167
168    pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
169        match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
170            Some("string") | None => Ok(Self::String),
171            Some("dynamic") => Ok(Self::Dynamic),
172            Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
173                "unrecognized {} value {}",
174                Self::OPTION_KEY,
175                v
176            ))),
177        }
178    }
179}
180
181#[derive(Debug)]
182struct FieldEncodeError {
183    message: String,
184    rev_path: Vec<String>,
185}
186
187impl FieldEncodeError {
188    fn new(message: impl std::fmt::Display) -> Self {
189        Self {
190            message: message.to_string(),
191            rev_path: vec![],
192        }
193    }
194
195    fn with_name(mut self, name: &str) -> Self {
196        self.rev_path.push(name.into());
197        self
198    }
199}
200
201impl std::fmt::Display for FieldEncodeError {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        use itertools::Itertools;
204
205        write!(
206            f,
207            "encode '{}' error: {}",
208            self.rev_path.iter().rev().join("."),
209            self.message
210        )
211    }
212}
213
214impl From<FieldEncodeError> for super::SinkError {
215    fn from(value: FieldEncodeError) -> Self {
216        Self::Encode(value.to_string())
217    }
218}
219
220#[derive(Clone)]
221pub struct KafkaConnectParams {
222    pub schema_name: String,
223}
224
225type KafkaConnectParamsRef = Arc<KafkaConnectParams>;