risingwave_connector/sink/encoder/
bson.rs1use std::sync::LazyLock;
16
17use anyhow::anyhow;
18use mongodb::bson::spec::BinarySubtype;
19use mongodb::bson::{Binary, Bson, DateTime, Document};
20use risingwave_common::array::RowRef;
21use risingwave_common::catalog::{Field, Schema};
22use risingwave_common::log::LogSuppresser;
23use risingwave_common::row::Row;
24use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use thiserror_ext::AsReport;
27
28use super::{Result as SinkResult, RowEncoder, SerTo};
29use crate::sink::SinkError;
30
31static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
32
33pub struct BsonEncoder {
34 schema: Schema,
35 col_indices: Option<Vec<usize>>,
36 pk_indices: Vec<usize>,
37}
38
39impl BsonEncoder {
40 pub fn new(schema: Schema, col_indices: Option<Vec<usize>>, pk_indices: Vec<usize>) -> Self {
41 Self {
42 schema,
43 col_indices,
44 pk_indices,
45 }
46 }
47
48 pub fn construct_pk(&self, row: RowRef<'_>) -> Bson {
49 if self.pk_indices.len() == 1 {
50 let pk_field = &self.schema.fields[self.pk_indices[0]];
51 let pk_datum = row.datum_at(self.pk_indices[0]);
52 datum_to_bson(pk_field, pk_datum)
53 } else {
54 self.pk_indices
55 .iter()
56 .map(|&idx| {
57 let pk_field = &self.schema.fields[idx];
58 (
59 pk_field.name.clone(),
60 datum_to_bson(pk_field, row.datum_at(idx)),
61 )
62 })
63 .collect::<Document>()
64 .into()
65 }
66 }
67}
68
69impl SerTo<Vec<u8>> for Document {
70 fn ser_to(self) -> SinkResult<Vec<u8>> {
71 mongodb::bson::to_vec(&self).map_err(|err| {
72 SinkError::Mongodb(anyhow!(err).context("cannot serialize Document to Vec<u8>"))
73 })
74 }
75}
76
77impl RowEncoder for BsonEncoder {
78 type Output = Document;
79
80 fn encode_cols(
81 &self,
82 row: impl Row,
83 col_indices: impl Iterator<Item = usize>,
84 ) -> SinkResult<Self::Output> {
85 Ok(col_indices
86 .map(|idx| (&self.schema.fields[idx], row.datum_at(idx)))
87 .map(|(field, datum)| (field.name.clone(), datum_to_bson(field, datum)))
88 .collect())
89 }
90
91 fn schema(&self) -> &Schema {
92 &self.schema
93 }
94
95 fn col_indices(&self) -> Option<&[usize]> {
96 self.col_indices.as_ref().map(Vec::as_ref)
97 }
98}
99
100fn datum_to_bson(field: &Field, datum: DatumRef<'_>) -> Bson {
104 let scalar_ref = match datum {
105 None => {
106 return Bson::Null;
107 }
108 Some(datum) => datum,
109 };
110
111 let data_type = field.data_type();
112
113 match (data_type, scalar_ref) {
114 (DataType::Int16, ScalarRefImpl::Int16(v)) => Bson::Int32(v as i32),
115 (DataType::Int32, ScalarRefImpl::Int32(v)) => Bson::Int32(v),
116 (DataType::Int64, ScalarRefImpl::Int64(v)) => Bson::Int64(v),
117 (DataType::Int256, ScalarRefImpl::Int256(v)) => Bson::String(v.to_string()),
118 (DataType::Float32, ScalarRefImpl::Float32(v)) => Bson::Double(v.into_inner() as f64),
119 (DataType::Float64, ScalarRefImpl::Float64(v)) => Bson::Double(v.into_inner()),
120 (DataType::Varchar, ScalarRefImpl::Utf8(v)) => Bson::String(v.to_owned()),
121 (DataType::Boolean, ScalarRefImpl::Bool(v)) => Bson::Boolean(v),
122 (DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
123 let decimal_str = v.to_string();
124 let converted = decimal_str.parse();
125 match converted {
126 Ok(v) => Bson::Decimal128(v),
127 Err(err) => {
128 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
129 tracing::warn!(
130 suppressed_count,
131 error = %err.as_report(),
132 ?field,
133 "risingwave decimal {} convert to bson decimal128 failed",
134 decimal_str,
135 );
136 }
137 Bson::Null
138 }
139 }
140 }
141 (DataType::Interval, ScalarRefImpl::Interval(v)) => Bson::String(v.to_string()),
142 (DataType::Date, ScalarRefImpl::Date(v)) => Bson::String(v.to_string()),
143 (DataType::Time, ScalarRefImpl::Time(v)) => Bson::String(v.to_string()),
144 (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
145 Bson::DateTime(DateTime::from_millis(v.0.and_utc().timestamp_millis()))
146 }
147 (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
148 Bson::DateTime(DateTime::from_millis(v.timestamp_millis()))
149 }
150 (DataType::Jsonb, ScalarRefImpl::Jsonb(v)) => {
151 let jsonb_val: JsonbVal = v.into();
152 match jsonb_val.take().try_into() {
153 Ok(doc) => doc,
154 Err(err) => {
155 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
156 tracing::warn!(
157 suppressed_count,
158 error = %err.as_report(),
159 ?field,
160 "convert jsonb to mongodb bson failed",
161 );
162 }
163 Bson::Null
164 }
165 }
166 }
167 (DataType::Serial, ScalarRefImpl::Serial(v)) => Bson::Int64(v.into_inner()),
168 (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
169 let mut doc = Document::new();
170 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
171 st.iter()
172 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
173 ) {
174 doc.insert(
175 sub_field.name.clone(),
176 datum_to_bson(&sub_field, sub_datum_ref),
177 );
178 }
179 Bson::Document(doc)
180 }
181 (DataType::List(dt), ScalarRefImpl::List(v)) => {
182 let inner_field = Field::unnamed(Box::<DataType>::into_inner(dt));
183 v.iter()
184 .map(|scalar_ref| datum_to_bson(&inner_field, scalar_ref))
185 .collect::<Bson>()
186 }
187 (DataType::Bytea, ScalarRefImpl::Bytea(v)) => Bson::Binary(Binary {
188 subtype: BinarySubtype::Generic,
189 bytes: v.into(),
190 }),
191 _ => {
193 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
194 tracing::warn!(
195 suppressed_count,
196 ?field,
197 ?scalar_ref,
198 "datum_to_bson: unsupported data type"
199 );
200 }
201 Bson::Null
202 }
203 }
204}