risingwave_connector/sink/encoder/
bson.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use anyhow::anyhow;
18use mongodb::bson::spec::BinarySubtype;
19use mongodb::bson::{Binary, Bson, DateTime, Document};
20use risingwave_common::array::RowRef;
21use risingwave_common::catalog::{Field, Schema};
22use risingwave_common::log::LogSuppresser;
23use risingwave_common::row::Row;
24use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use thiserror_ext::AsReport;
27
28use super::{Result as SinkResult, RowEncoder, SerTo};
29use crate::sink::SinkError;
30
31static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
32
33pub struct BsonEncoder {
34    schema: Schema,
35    col_indices: Option<Vec<usize>>,
36    pk_indices: Vec<usize>,
37}
38
39impl BsonEncoder {
40    pub fn new(schema: Schema, col_indices: Option<Vec<usize>>, pk_indices: Vec<usize>) -> Self {
41        Self {
42            schema,
43            col_indices,
44            pk_indices,
45        }
46    }
47
48    pub fn construct_pk(&self, row: RowRef<'_>) -> Bson {
49        if self.pk_indices.len() == 1 {
50            let pk_field = &self.schema.fields[self.pk_indices[0]];
51            let pk_datum = row.datum_at(self.pk_indices[0]);
52            datum_to_bson(pk_field, pk_datum)
53        } else {
54            self.pk_indices
55                .iter()
56                .map(|&idx| {
57                    let pk_field = &self.schema.fields[idx];
58                    (
59                        pk_field.name.clone(),
60                        datum_to_bson(pk_field, row.datum_at(idx)),
61                    )
62                })
63                .collect::<Document>()
64                .into()
65        }
66    }
67}
68
69impl SerTo<Vec<u8>> for Document {
70    fn ser_to(self) -> SinkResult<Vec<u8>> {
71        mongodb::bson::to_vec(&self).map_err(|err| {
72            SinkError::Mongodb(anyhow!(err).context("cannot serialize Document to Vec<u8>"))
73        })
74    }
75}
76
77impl RowEncoder for BsonEncoder {
78    type Output = Document;
79
80    fn encode_cols(
81        &self,
82        row: impl Row,
83        col_indices: impl Iterator<Item = usize>,
84    ) -> SinkResult<Self::Output> {
85        Ok(col_indices
86            .map(|idx| (&self.schema.fields[idx], row.datum_at(idx)))
87            .map(|(field, datum)| (field.name.clone(), datum_to_bson(field, datum)))
88            .collect())
89    }
90
91    fn schema(&self) -> &Schema {
92        &self.schema
93    }
94
95    fn col_indices(&self) -> Option<&[usize]> {
96        self.col_indices.as_ref().map(Vec::as_ref)
97    }
98}
99
100/// We support converting all types to `MongoDB`. If there is an unmatched type, it will be
101/// converted to its string representation. If there is a conversion error, a warning log is printed
102/// and a `Bson::Null` is returned
103fn datum_to_bson(field: &Field, datum: DatumRef<'_>) -> Bson {
104    let scalar_ref = match datum {
105        None => {
106            return Bson::Null;
107        }
108        Some(datum) => datum,
109    };
110
111    let data_type = field.data_type();
112
113    match (data_type, scalar_ref) {
114        (DataType::Int16, ScalarRefImpl::Int16(v)) => Bson::Int32(v as i32),
115        (DataType::Int32, ScalarRefImpl::Int32(v)) => Bson::Int32(v),
116        (DataType::Int64, ScalarRefImpl::Int64(v)) => Bson::Int64(v),
117        (DataType::Int256, ScalarRefImpl::Int256(v)) => Bson::String(v.to_string()),
118        (DataType::Float32, ScalarRefImpl::Float32(v)) => Bson::Double(v.into_inner() as f64),
119        (DataType::Float64, ScalarRefImpl::Float64(v)) => Bson::Double(v.into_inner()),
120        (DataType::Varchar, ScalarRefImpl::Utf8(v)) => Bson::String(v.to_owned()),
121        (DataType::Boolean, ScalarRefImpl::Bool(v)) => Bson::Boolean(v),
122        (DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
123            let decimal_str = v.to_string();
124            let converted = decimal_str.parse();
125            match converted {
126                Ok(v) => Bson::Decimal128(v),
127                Err(err) => {
128                    if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
129                        tracing::warn!(
130                            suppressed_count,
131                            error = %err.as_report(),
132                            ?field,
133                            "risingwave decimal {} convert to bson decimal128 failed",
134                            decimal_str,
135                        );
136                    }
137                    Bson::Null
138                }
139            }
140        }
141        (DataType::Interval, ScalarRefImpl::Interval(v)) => Bson::String(v.to_string()),
142        (DataType::Date, ScalarRefImpl::Date(v)) => Bson::String(v.to_string()),
143        (DataType::Time, ScalarRefImpl::Time(v)) => Bson::String(v.to_string()),
144        (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
145            Bson::DateTime(DateTime::from_millis(v.0.and_utc().timestamp_millis()))
146        }
147        (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
148            Bson::DateTime(DateTime::from_millis(v.timestamp_millis()))
149        }
150        (DataType::Jsonb, ScalarRefImpl::Jsonb(v)) => {
151            let jsonb_val: JsonbVal = v.into();
152            match jsonb_val.take().try_into() {
153                Ok(doc) => doc,
154                Err(err) => {
155                    if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
156                        tracing::warn!(
157                            suppressed_count,
158                            error = %err.as_report(),
159                            ?field,
160                            "convert jsonb to mongodb bson failed",
161                        );
162                    }
163                    Bson::Null
164                }
165            }
166        }
167        (DataType::Serial, ScalarRefImpl::Serial(v)) => Bson::Int64(v.into_inner()),
168        (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
169            let mut doc = Document::new();
170            for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
171                st.iter()
172                    .map(|(name, dt)| Field::with_name(dt.clone(), name)),
173            ) {
174                doc.insert(
175                    sub_field.name.clone(),
176                    datum_to_bson(&sub_field, sub_datum_ref),
177                );
178            }
179            Bson::Document(doc)
180        }
181        (DataType::List(dt), ScalarRefImpl::List(v)) => {
182            let inner_field = Field::unnamed(Box::<DataType>::into_inner(dt));
183            v.iter()
184                .map(|scalar_ref| datum_to_bson(&inner_field, scalar_ref))
185                .collect::<Bson>()
186        }
187        (DataType::Bytea, ScalarRefImpl::Bytea(v)) => Bson::Binary(Binary {
188            subtype: BinarySubtype::Generic,
189            bytes: v.into(),
190        }),
191        // TODO(map): support map
192        _ => {
193            if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
194                tracing::warn!(
195                    suppressed_count,
196                    ?field,
197                    ?scalar_ref,
198                    "datum_to_bson: unsupported data type"
199                );
200            }
201            Bson::Null
202        }
203    }
204}