Skip to main content

risingwave_connector/sink/encoder/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use chrono_tz::Tz;
19use risingwave_common::catalog::Schema;
20use risingwave_common::row::Row;
21
22use crate::sink::Result;
23
24mod avro;
25mod bson;
26pub mod bytes;
27mod json;
28mod proto;
29pub mod template;
30pub mod text;
31
32pub use avro::{AvroEncoder, AvroHeader};
33pub use bson::BsonEncoder;
34pub use json::JsonEncoder;
35pub use proto::{ProtoEncoder, ProtoHeader};
36
37/// Encode a row of a relation into
38/// * an object in json
39/// * a message in protobuf
40/// * a record in avro
41pub trait RowEncoder {
42    type Output: SerTo<Vec<u8>>;
43
44    fn encode_cols(
45        &self,
46        row: impl Row,
47        col_indices: impl Iterator<Item = usize>,
48    ) -> Result<Self::Output>;
49    fn schema(&self) -> &Schema;
50    fn col_indices(&self) -> Option<&[usize]>;
51
52    fn encode(&self, row: impl Row) -> Result<Self::Output> {
53        assert_eq!(row.len(), self.schema().len());
54        match self.col_indices() {
55            Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()),
56            None => self.encode_cols(row, 0..self.schema().len()),
57        }
58    }
59}
60
61/// Do the actual encoding from
62/// * an json object
63/// * a protobuf message
64/// * an avro record
65///   into
66/// * string (required by kinesis key)
67/// * bytes
68///
69/// This is like `TryInto` but allows us to `impl<T: SerTo<String>> SerTo<Vec<u8>> for T`.
70///
71/// Note that `serde` does not fit here because its data model does not contain logical types.
72/// For example, although `chrono::DateTime` implements `Serialize`,
73/// it produces avro String rather than avro `TimestampMicros`.
74pub trait SerTo<T> {
75    fn ser_to(self) -> Result<T>;
76}
77
78impl<T: SerTo<String>> SerTo<Vec<u8>> for T {
79    fn ser_to(self) -> Result<Vec<u8>> {
80        self.ser_to().map(|s: String| s.into_bytes())
81    }
82}
83
84impl<T> SerTo<T> for T {
85    fn ser_to(self) -> Result<T> {
86        Ok(self)
87    }
88}
89
90#[derive(Clone, Copy, Default)]
91pub enum DateHandlingMode {
92    #[default]
93    FromCe,
94    FromEpoch,
95    String,
96}
97
98/// Useful for both json and protobuf
99#[derive(Clone, Copy)]
100pub enum TimestampHandlingMode {
101    Milli,
102    String,
103    /// ISO 8601 string without a timezone suffix.
104    Iso8601String,
105}
106
107#[derive(Clone, Copy)]
108pub enum TimeHandlingMode {
109    Milli,
110    String,
111}
112
113#[derive(Clone, Copy, Default)]
114pub enum TimestamptzHandlingMode {
115    #[default]
116    UtcString,
117    UtcWithoutSuffix,
118    SpecifiedTimezoneWithoutSuffix(Tz),
119    Micro,
120    Milli,
121}
122
123impl TimestamptzHandlingMode {
124    pub const FRONTEND_DEFAULT: &'static str = "utc_string";
125    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
126
127    pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
128        match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
129            Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
130            Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
131            Some("micro") => Ok(Self::Micro),
132            Some("milli") => Ok(Self::Milli),
133            Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
134                "unrecognized {} value {}",
135                Self::OPTION_KEY,
136                v
137            ))),
138            // This is not a good default. We just have to select it when no option is provided
139            // for compatibility with old version.
140            None => Ok(Self::UtcWithoutSuffix),
141        }
142    }
143}
144
145#[derive(Clone)]
146pub enum CustomJsonType {
147    // Doris's json need date is string.
148    // The internal order of the struct should follow the insertion order.
149    // The decimal needs verification and calibration.
150    Doris(HashMap<String, u8>),
151    // Es's json need jsonb is struct
152    Es,
153    // starrocks' need jsonb is struct
154    StarRocks,
155    // turbopuffer expects serial and decimal attributes to match `int` and `float` schema types.
156    Turbopuffer,
157    None,
158}
159
160/// How the jsonb type is encoded.
161///
162/// - `String`: encode jsonb as string. `[1, true, "foo"] -> "[1, true, \"foo\"]"`
163/// - `Dynamic`: encode jsonb as json type dynamically. `[1, true, "foo"] -> [1, true, "foo"]`
164pub enum JsonbHandlingMode {
165    String,
166    Dynamic,
167}
168
169impl JsonbHandlingMode {
170    pub const OPTION_KEY: &'static str = "jsonb.handling.mode";
171
172    pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
173        match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
174            Some("string") | None => Ok(Self::String),
175            Some("dynamic") => Ok(Self::Dynamic),
176            Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
177                "unrecognized {} value {}",
178                Self::OPTION_KEY,
179                v
180            ))),
181        }
182    }
183}
184
185#[derive(Debug)]
186struct FieldEncodeError {
187    message: String,
188    rev_path: Vec<String>,
189}
190
191impl FieldEncodeError {
192    fn new(message: impl std::fmt::Display) -> Self {
193        Self {
194            message: message.to_string(),
195            rev_path: vec![],
196        }
197    }
198
199    fn with_name(mut self, name: &str) -> Self {
200        self.rev_path.push(name.into());
201        self
202    }
203}
204
205impl std::fmt::Display for FieldEncodeError {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        use itertools::Itertools;
208
209        write!(
210            f,
211            "encode '{}' error: {}",
212            self.rev_path.iter().rev().join("."),
213            self.message
214        )
215    }
216}
217
218impl From<FieldEncodeError> for super::SinkError {
219    fn from(value: FieldEncodeError) -> Self {
220        Self::Encode(value.to_string())
221    }
222}
223
224#[derive(Clone)]
225pub struct KafkaConnectParams {
226    pub schema_name: String,
227}
228
229type KafkaConnectParamsRef = Arc<KafkaConnectParams>;