risingwave_connector/parser/debezium/
mongo_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::Context;
18use risingwave_common::bail;
19use risingwave_common::types::DataType;
20
21use crate::error::ConnectorResult;
22use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
23use crate::parser::unified::debezium::DebeziumChangeEvent;
24use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
25use crate::parser::{
26    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties,
27    MongoProperties as MongoEncodingProperties, ParserFormat, SourceStreamChunkRowWriter,
28};
29use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
30
31#[derive(Debug)]
32pub struct DebeziumMongoJsonParser {
33    pub(crate) rw_columns: Vec<SourceColumnDesc>,
34    source_ctx: SourceContextRef,
35    key_builder: AccessBuilderImpl,
36    payload_builder: AccessBuilderImpl,
37}
38
39fn build_accessor_builder(config: EncodingProperties) -> anyhow::Result<AccessBuilderImpl> {
40    match config {
41        EncodingProperties::MongoJson(mongo_props) => Ok(AccessBuilderImpl::DebeziumMongoJson(
42            DebeziumMongoJsonAccessBuilder::new(mongo_props)?,
43        )),
44        _ => bail!("unsupported encoding for DEBEZIUM_MONGO format"),
45    }
46}
47
48impl DebeziumMongoJsonParser {
49    pub fn new(
50        rw_columns: Vec<SourceColumnDesc>,
51        source_ctx: SourceContextRef,
52        props: MongoEncodingProperties,
53    ) -> ConnectorResult<Self> {
54        let _id_column = rw_columns
55            .iter()
56            .find(|desc| {
57                desc.name == "_id"
58                    && matches!(
59                        desc.data_type,
60                        DataType::Jsonb
61                            | DataType::Varchar
62                            | DataType::Int32
63                            | DataType::Int64
64                    )
65            })
66            .context("Debezium Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table")?.clone();
67
68        if !props.strong_schema {
69            let _payload_column = rw_columns
70                .iter()
71                .find(|desc| desc.name == "payload" && matches!(desc.data_type, DataType::Jsonb))
72                .context(
73                    "Debezium Mongo needs a `payload` column with supported types Jsonb in table",
74                )?
75                .clone();
76
77            let columns = rw_columns
78                .iter()
79                .filter(|desc| desc.is_visible() && desc.additional_column.column_type.is_none())
80                .count();
81
82            // _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically.
83            if columns != 2 || !rw_columns.iter().any(|desc| desc.name == "_id") {
84                bail!("Debezium Mongo needs a `_id` column in table");
85            }
86        }
87
88        // encodings are fixed to MongoJson
89        let encoding = EncodingProperties::MongoJson(props.clone());
90        // for key, it doesn't matter if strong schema is enabled or not
91        let key_builder = build_accessor_builder(encoding.clone())?;
92
93        let payload_builder = build_accessor_builder(encoding)?;
94
95        Ok(Self {
96            rw_columns,
97            source_ctx,
98            key_builder,
99            payload_builder,
100        })
101    }
102
103    pub async fn parse_inner(
104        &mut self,
105        key: Option<Vec<u8>>,
106        payload: Option<Vec<u8>>,
107        mut writer: SourceStreamChunkRowWriter<'_>,
108    ) -> ConnectorResult<()> {
109        let meta = writer.source_meta();
110        let key_accessor = match key {
111            None => None,
112            Some(data) => Some(self.key_builder.generate_accessor(data, meta).await?),
113        };
114        let payload_accessor = match payload {
115            None => None,
116            Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
117        };
118
119        let row_op = DebeziumChangeEvent::new_mongodb_event(key_accessor, payload_accessor);
120        apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
121    }
122}
123
124impl ByteStreamSourceParser for DebeziumMongoJsonParser {
125    fn columns(&self) -> &[SourceColumnDesc] {
126        &self.rw_columns
127    }
128
129    fn source_ctx(&self) -> &SourceContext {
130        &self.source_ctx
131    }
132
133    fn parser_format(&self) -> ParserFormat {
134        ParserFormat::DebeziumMongo
135    }
136
137    async fn parse_one<'a>(
138        &'a mut self,
139        key: Option<Vec<u8>>,
140        payload: Option<Vec<u8>>,
141        writer: SourceStreamChunkRowWriter<'a>,
142    ) -> ConnectorResult<()> {
143        self.parse_inner(key, payload, writer).await
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::sync::Arc;
150
151    use risingwave_common::array::Op;
152    use risingwave_common::catalog::{ColumnDesc, ColumnId};
153    use risingwave_common::row::Row;
154    use risingwave_common::types::{ScalarImpl, StructType, ToOwnedDatum};
155
156    use super::*;
157    use crate::parser::unified::debezium::{extract_bson_field, extract_bson_id};
158    use crate::parser::{MongoProperties, SourceStreamChunkBuilder};
159    use crate::source::{ConnectorProperties, SourceCtrlOpts};
160    fn generate_source_context_ref() -> Arc<SourceContext> {
161        Arc::new(SourceContext {
162            connector_props: ConnectorProperties::MongodbCdc(Box::default()),
163            ..SourceContext::dummy()
164        })
165    }
166
167    #[test]
168    fn test_parse_bson_value_id_int() {
169        let data = r#"{"_id":{"$numberInt":"2345"}}"#;
170        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
171        let a = extract_bson_id(&DataType::Int32, &pld).unwrap();
172        assert_eq!(a, Some(ScalarImpl::Int32(2345)));
173    }
174    #[test]
175    fn test_parse_bson_value_id_long() {
176        let data = r#"{"_id":{"$numberLong":"22423434544"}}"#;
177        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
178
179        let a = extract_bson_id(&DataType::Int64, &pld).unwrap();
180        assert_eq!(a, Some(ScalarImpl::Int64(22423434544)));
181    }
182
183    #[test]
184    fn test_parse_bson_value_id_oid() {
185        let data = r#"{"_id":{"$oid":"5d505646cf6d4fe581014ab2"}}"#;
186        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
187        let a = extract_bson_id(&DataType::Varchar, &pld).unwrap();
188        assert_eq!(a, Some(ScalarImpl::Utf8("5d505646cf6d4fe581014ab2".into())));
189    }
190
191    #[test]
192    fn test_parse_bson_date() {
193        let data = r#"{"$date": {"$numberLong": "631152000000"}}"#;
194        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
195        let a = extract_bson_field(&DataType::Date, &pld, None).unwrap();
196        assert_eq!(
197            a,
198            Some(ScalarImpl::Date(
199                chrono::NaiveDate::from_ymd_opt(1990, 1, 1).unwrap().into()
200            ))
201        );
202
203        let data = r#"{"$date": 631152000000}"#;
204        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
205        let a = extract_bson_field(&DataType::Date, &pld, None).unwrap();
206        assert_eq!(
207            a,
208            Some(ScalarImpl::Date(
209                chrono::NaiveDate::from_ymd_opt(1990, 1, 1).unwrap().into()
210            ))
211        );
212
213        let data = r#"null"#;
214        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
215        let a = extract_bson_field(&DataType::Date, &pld, None).unwrap();
216        assert_eq!(a, None);
217    }
218    #[test]
219    fn test_parse_bson_timestamp() {
220        let data = r#"{"$timestamp": {"t": 1735689600, "i": 0}}"#;
221        let pld: serde_json::Value = serde_json::from_str(data).unwrap();
222        let a = extract_bson_field(&DataType::Timestamptz, &pld, None).unwrap();
223        assert_eq!(
224            a,
225            Some(ScalarImpl::Timestamptz(
226                chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z")
227                    .unwrap()
228                    .into()
229            ))
230        );
231    }
232
233    #[tokio::test]
234    async fn test_parse_delete_message() {
235        let (key, payload) = (
236            // key
237            br#"{"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}"#.to_vec(),
238            // payload
239            br#"{"schema":null,"payload":{"before":null,"after":null,"updateDescription":null,"source":{"version":"2.4.2.Final","connector":"mongodb","name":"RW_CDC_3001","ts_ms":1706968217000,"snapshot":"false","db":"dev","sequence":null,"rs":"rs0","collection":"test","ord":2,"lsid":null,"txnNumber":null,"wallTime":null},"op":"d","ts_ms":1706968217377,"transaction":null}}"#.to_vec()
240        );
241
242        let columns = vec![
243            SourceColumnDesc::simple("_id", DataType::Varchar, ColumnId::from(0)),
244            SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
245        ];
246
247        let mut parser = DebeziumMongoJsonParser::new(
248            columns.clone(),
249            generate_source_context_ref(),
250            MongoProperties::default(),
251        )
252        .unwrap();
253        let mut builder =
254            SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
255        parser
256            .parse_inner(Some(key), Some(payload), builder.row_writer())
257            .await
258            .unwrap();
259        builder.finish_current_chunk();
260        let chunk = builder.consume_ready_chunks().next().unwrap();
261        let mut rows = chunk.rows();
262
263        let (op, row) = rows.next().unwrap();
264        assert_eq!(op, Op::Delete);
265        // oid
266        assert_eq!(
267            row.datum_at(0).to_owned_datum(),
268            (Some(ScalarImpl::Utf8("65bc9fb6c485f419a7a877fe".into())))
269        );
270
271        // payload should be null
272        assert_eq!(row.datum_at(1).to_owned_datum(), None);
273    }
274
275    #[tokio::test]
276    async fn test_long_id() {
277        let input = vec![
278            // data with payload and schema field
279            br#"{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
280            // data without payload and schema field
281            br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()];
282
283        let columns = vec![
284            SourceColumnDesc::simple("_id", DataType::Int64, ColumnId::from(0)),
285            SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
286        ];
287        for data in input {
288            let mut parser = DebeziumMongoJsonParser::new(
289                columns.clone(),
290                generate_source_context_ref(),
291                MongoProperties::default(),
292            )
293            .unwrap();
294
295            let mut builder =
296                SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
297
298            parser
299                .parse_inner(None, Some(data), builder.row_writer())
300                .await
301                .unwrap();
302            builder.finish_current_chunk();
303            let chunk = builder.consume_ready_chunks().next().unwrap();
304            let mut rows = chunk.rows();
305            let (op, row) = rows.next().unwrap();
306            assert_eq!(op, Op::Insert);
307
308            assert_eq!(
309            row.datum_at(1).to_owned_datum(),
310            (Some(ScalarImpl::Jsonb(
311                serde_json::json!({"_id": {"$numberLong": "1004"}, "first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"}).into()
312            )))
313            );
314            assert_eq!(
315                row.datum_at(0).to_owned_datum(),
316                (Some(ScalarImpl::Int64(1004)))
317            );
318        }
319    }
320
321    #[tokio::test]
322    async fn test_with_or_without_schema_field() {
323        let input = vec![
324        br#"{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1001\"},\"first_name\": \"Sally\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"true","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
325        br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1001\"},\"first_name\": \"Sally\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"true","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()
326        ];
327
328        let columns = vec![
329            SourceColumnDesc::simple("_id", DataType::Int64, ColumnId::from(0)),
330            SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
331        ];
332
333        for data in input {
334            let mut parser = DebeziumMongoJsonParser::new(
335                columns.clone(),
336                generate_source_context_ref(),
337                MongoProperties::default(),
338            )
339            .unwrap();
340
341            let mut builder =
342                SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
343
344            parser
345                .parse_inner(None, Some(data), builder.row_writer())
346                .await
347                .unwrap();
348            builder.finish_current_chunk();
349            let chunk = builder.consume_ready_chunks().next().unwrap();
350            let mut rows = chunk.rows();
351            let (op, row) = rows.next().unwrap();
352            assert_eq!(op, Op::Insert);
353
354            assert_eq!(
355            row.datum_at(1).to_owned_datum(),
356            (Some(ScalarImpl::Jsonb(
357serde_json::json!({"_id": {"$numberLong": "1001"},"first_name": "Sally","last_name": "Thomas","email": "sally.thomas@acme.com"}).into()
358            )))
359            );
360            assert_eq!(
361                row.datum_at(0).to_owned_datum(),
362                (Some(ScalarImpl::Int64(1001)))
363            );
364        }
365    }
366
367    #[tokio::test]
368    async fn test_strong_schema() {
369        let input = vec![
370            // data with payload and schema field
371            br#"{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
372            // data without payload and schema field
373            br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()];
374
375        let columns = vec![
376            ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
377            ColumnDesc::named("first_name", ColumnId::new(1), DataType::Varchar),
378            ColumnDesc::named("last_name", ColumnId::new(2), DataType::Varchar),
379            ColumnDesc::named("email", ColumnId::new(3), DataType::Varchar),
380        ];
381
382        let columns = columns
383            .iter()
384            .map(SourceColumnDesc::from)
385            .collect::<Vec<_>>();
386
387        let source_ctx = generate_source_context_ref();
388
389        for data in input {
390            let mut parser = DebeziumMongoJsonParser::new(
391                columns.clone(),
392                source_ctx.clone(),
393                MongoProperties::new(true),
394            )
395            .expect("build parser");
396
397            let mut builder =
398                SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
399
400            parser
401                .parse_inner(None, Some(data), builder.row_writer())
402                .await
403                .unwrap();
404            builder.finish_current_chunk();
405
406            let chunk = builder.consume_ready_chunks().next().unwrap();
407            let mut rows = chunk.rows();
408            let (op, row) = rows.next().unwrap();
409
410            assert_eq!(op, Op::Insert);
411
412            let data = [
413                ScalarImpl::Int64(1004),
414                ScalarImpl::Utf8("Anne".into()),
415                ScalarImpl::Utf8("Kretchmar".into()),
416                ScalarImpl::Utf8("annek@noanswer.org".into()),
417            ];
418
419            for (i, datum) in data.iter().enumerate() {
420                assert_eq!(row.datum_at(i).to_owned_datum(), Some(datum.clone()));
421            }
422        }
423    }
424
425    #[tokio::test]
426    async fn test_strong_schema_datetime() {
427        let columns = vec![
428            ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
429            ColumnDesc::named("rocket type", ColumnId::new(1), DataType::Varchar),
430            ColumnDesc::named("freezed at", ColumnId::new(2), DataType::Date),
431            ColumnDesc::named("launch time", ColumnId::new(3), DataType::Timestamptz),
432        ];
433
434        let columns = columns
435            .iter()
436            .map(SourceColumnDesc::from)
437            .collect::<Vec<_>>();
438
439        let data = vec![
440            // naked data
441 br#"
442{
443  "before": null,
444  "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"rocket type\": \"Starblazer X-2000\", \"freezed at\": {\"$date\": 1733011200000 }, \"launch time\": {\"$timestamp\": {\"t\": 1735689600, \"i\": 0}}}",
445  "source": {
446    "version": "2.1.4.Final",
447    "connector": "mongodb",
448    "name": "dbserver1",
449    "ts_ms": 1696502096000,
450    "snapshot": "false",
451    "db": "inventory",
452    "collection": "rockets",
453    "ord": 1,
454    "lsid": null,
455    "txnNumber": null
456  },
457  "op": "c",
458  "ts_ms": 1696502096000
459}
460"#.to_vec(),
461            br#"
462{
463  "schema": {
464    "type": "struct",
465    "fields": [
466      {
467        "field": "_id",
468        "type": "int64",
469        "optional": false
470      },
471      {
472        "field": "rocket type",
473        "type": "string",
474        "optional": true
475      },
476      {
477        "field": "freezed at",
478        "type": "string",
479        "name": "io.debezium.time.Date",
480        "version": 1,
481        "optional": true
482      },
483      {
484        "field": "launch time",
485        "type": "string",
486        "name": "io.debezium.time.ZonedTimestamp",
487        "version": 1,
488        "optional": true
489      }
490    ],
491    "optional": false,
492    "name": "dbserver1.inventory.rockets.Envelope"
493  },
494  "payload": {
495    "before": null,
496    "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"rocket type\": \"Starblazer X-2000\", \"freezed at\": {\"$date\": 1733011200000 }, \"launch time\": {\"$timestamp\": {\"t\": 1735689600, \"i\": 0}}}",
497    "source": {
498      "version": "2.1.4.Final",
499      "connector": "mongodb",
500      "name": "dbserver1",
501      "ts_ms": 1696502096000,
502      "snapshot": "false",
503      "db": "inventory",
504      "collection": "rockets",
505      "ord": 1,
506      "lsid": null,
507      "txnNumber": null
508    },
509    "op": "c",
510    "ts_ms": 1696502096000
511  }
512}
513"#.to_vec()
514        ];
515
516        let source_ctx: Arc<_> = generate_source_context_ref();
517        let expected_datetime =
518            chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z").unwrap();
519        let expected_date = chrono::DateTime::parse_from_rfc3339("2024-12-01T00:00:00Z")
520            .unwrap()
521            .date_naive();
522
523        for datum in data {
524            let mut parser = DebeziumMongoJsonParser::new(
525                columns.clone(),
526                source_ctx.clone(),
527                MongoProperties::new(true),
528            )
529            .expect("build parser");
530            let mut builder =
531                SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
532            parser
533                .parse_inner(None, Some(datum), builder.row_writer())
534                .await
535                .unwrap();
536            builder.finish_current_chunk();
537            let chunk = builder.consume_ready_chunks().next().unwrap();
538            let mut rows = chunk.rows();
539            let (op, row) = rows.next().unwrap();
540            assert_eq!(op, Op::Insert);
541            let data = [
542                ScalarImpl::Int64(1004),
543                ScalarImpl::Utf8("Starblazer X-2000".into()),
544                ScalarImpl::Date(expected_date.into()),
545                ScalarImpl::Timestamptz(expected_datetime.into()),
546            ];
547            for (i, datum) in data.iter().enumerate() {
548                assert_eq!(row.datum_at(i).to_owned_datum(), Some(datum.clone()));
549            }
550        }
551    }
552
553    #[tokio::test]
554    async fn test_bson_v2_debezium_basic_types() {
555        let columns = vec![
556            ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
557            ColumnDesc::named("name", ColumnId::new(1), DataType::Varchar),
558            ColumnDesc::named("age", ColumnId::new(2), DataType::Int32),
559            ColumnDesc::named("birth_date", ColumnId::new(3), DataType::Date),
560            ColumnDesc::named("created_at", ColumnId::new(4), DataType::Timestamptz),
561        ];
562
563        let columns = columns
564            .iter()
565            .map(SourceColumnDesc::from)
566            .collect::<Vec<_>>();
567
568        let packet = br#"
569        {
570            "schema": {
571                "type": "struct",
572                "fields": [
573                    {"field": "_id", "type": "int64", "optional": false},
574                    {"field": "name", "type": "string", "optional": true},
575                    {"field": "age", "type": "int32", "optional": true},
576                    {"field": "birth_date", "type": "string", "name": "io.debezium.time.Date", "version": 1, "optional": true},
577                    {"field": "created_at", "type": "string", "name": "io.debezium.time.ZonedTimestamp", "version": 1, "optional": true}
578                ],
579                "optional": false,
580                "name": "dbserver1.inventory.users.Envelope"
581            },
582            "payload": {
583                "before": null,
584                "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"name\": \"John Doe\", \"age\": {\"$numberInt\": \"30\"}, \"birth_date\": {\"$date\": 631152000000 }, \"created_at\": {\"$timestamp\": {\"t\": 1735689600, \"i\": 0}}}",
585                "source": {
586                    "version": "2.1.4.Final",
587                    "connector": "mongodb",
588                    "name": "dbserver1",
589                    "ts_ms": 1696502096000,
590                    "snapshot": "false",
591                    "db": "inventory",
592                    "collection": "users",
593                    "ord": 1,
594                    "lsid": null,
595                    "txnNumber": null
596                },
597                "op": "c",
598                "ts_ms": 1696502096000
599            }
600        }
601        "#.to_vec();
602
603        let source_ctx: Arc<_> = generate_source_context_ref();
604
605        let expected_birth_date = chrono::NaiveDate::from_ymd_opt(1990, 1, 1).unwrap();
606        let expected_created_at =
607            chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z").unwrap();
608
609        let mut parser = DebeziumMongoJsonParser::new(
610            columns.clone(),
611            source_ctx.clone(),
612            MongoProperties::new(true),
613        )
614        .expect("build parser");
615        let mut builder =
616            SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
617        parser
618            .parse_inner(None, Some(packet), builder.row_writer())
619            .await
620            .unwrap();
621        builder.finish_current_chunk();
622        let chunk = builder.consume_ready_chunks().next().unwrap();
623        let mut rows = chunk.rows();
624        let (op, row) = rows.next().unwrap();
625        assert_eq!(op, Op::Insert);
626
627        let data = [
628            ScalarImpl::Int64(1004),
629            ScalarImpl::Utf8("John Doe".into()),
630            ScalarImpl::Int32(30),
631            ScalarImpl::Date(expected_birth_date.into()),
632            ScalarImpl::Timestamptz(expected_created_at.into()),
633        ];
634        for (i, datum) in data.iter().enumerate() {
635            assert_eq!(row.datum_at(i).to_owned_datum(), Some(datum.clone()));
636        }
637    }
638
639    #[tokio::test]
640    async fn test_bson_v2_debezium_struct() {
641        let columns = vec![
642            ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
643            ColumnDesc::named(
644                "struct_data",
645                ColumnId::new(1),
646                DataType::Struct(StructType::new([
647                    ("struct_field1".to_owned(), DataType::Int64),
648                    ("struct_field2".to_owned(), DataType::Varchar),
649                ])),
650            ),
651        ];
652
653        let columns = columns
654            .iter()
655            .map(SourceColumnDesc::from)
656            .collect::<Vec<_>>();
657
658        let data = vec![
659        br#"
660        {
661            "schema": {
662                "type": "struct",
663                "fields": [
664                    {"field": "_id", "type": "int64", "optional": false},
665                    {
666                        "field": "struct_data",
667                        "type": "struct",
668                        "fields": [
669                            {"field": "struct_field1", "type": "int64", "optional": true},
670                            {"field": "struct_field2", "type": "string", "optional": true}
671                        ],
672                        "optional": true
673                    }
674                ],
675                "optional": false,
676                "name": "dbserver1.inventory.users.Envelope"
677            },
678            "payload": {
679                "before": null,
680                "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"struct_data\": {\"struct_field1\": {\"$numberLong\": \"1131231321\"}, \"struct_field2\": \"example_value\"}}",
681                "source": {
682                    "version": "2.1.4.Final",
683                    "connector": "mongodb",
684                    "name": "dbserver1",
685                    "ts_ms": 1696502096000,
686                    "snapshot": "false",
687                    "db": "inventory",
688                    "collection": "users",
689                    "ord": 1,
690                    "lsid": null,
691                    "txnNumber": null
692                },
693                "op": "c",
694                "ts_ms": 1696502096000
695            }
696        }
697        "#.to_vec(),
698    ];
699
700        let source_ctx = generate_source_context_ref();
701        for datum in data {
702            let mut parser = DebeziumMongoJsonParser::new(
703                columns.clone(),
704                source_ctx.clone(),
705                MongoProperties::new(true),
706            )
707            .expect("build parser");
708            let mut builder =
709                SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
710            parser
711                .parse_inner(None, Some(datum), builder.row_writer())
712                .await
713                .unwrap();
714            builder.finish_current_chunk();
715            let chunk = builder.consume_ready_chunks().next().unwrap();
716            let mut rows = chunk.rows();
717            let (op, row) = rows.next().unwrap();
718            assert_eq!(op, Op::Insert);
719
720            // Verify _id
721            assert_eq!(
722                row.datum_at(0).to_owned_datum(),
723                Some(ScalarImpl::Int64(1004))
724            );
725
726            // Verify struct_data (struct)
727            let struct_data = row.datum_at(1).to_owned_datum().unwrap();
728            if let ScalarImpl::Struct(struct_data) = struct_data {
729                assert_eq!(struct_data.fields()[0], Some(ScalarImpl::Int64(1131231321)));
730                assert_eq!(
731                    struct_data.fields()[1],
732                    Some(ScalarImpl::Utf8("example_value".into()))
733                );
734            } else {
735                panic!("Expected struct type for struct_data");
736            }
737        }
738    }
739
740    #[tokio::test]
741    async fn test_bson_v2_debezium_list() {
742        let columns = vec![
743            ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
744            ColumnDesc::named(
745                "hobbies",
746                ColumnId::new(1),
747                DataType::List(Box::new(DataType::Varchar)),
748            ),
749        ];
750
751        let columns = columns
752            .iter()
753            .map(SourceColumnDesc::from)
754            .collect::<Vec<_>>();
755
756        let data = vec![
757        br#"
758        {
759            "schema": {
760                "type": "struct",
761                "fields": [
762                    {"field": "_id", "type": "int64", "optional": false},
763                    {
764                        "field": "hobbies",
765                        "type": "array",
766                        "items": {"type": "string", "optional": true},
767                        "optional": true
768                    }
769                ],
770                "optional": false,
771                "name": "dbserver1.inventory.users.Envelope"
772            },
773            "payload": {
774                "before": null,
775                "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"hobbies\": [\"reading\", \"traveling\", \"coding\"]}",
776                "source": {
777                    "version": "2.1.4.Final",
778                    "connector": "mongodb",
779                    "name": "dbserver1",
780                    "ts_ms": 1696502096000,
781                    "snapshot": "false",
782                    "db": "inventory",
783                    "collection": "users",
784                    "ord": 1,
785                    "lsid": null,
786                    "txnNumber": null
787                },
788                "op": "c",
789                "ts_ms": 1696502096000
790            }
791        }
792        "#.to_vec(),
793    ];
794
795        let source_ctx = generate_source_context_ref();
796
797        for datum in data {
798            let mut parser = DebeziumMongoJsonParser::new(
799                columns.clone(),
800                source_ctx.clone(),
801                MongoProperties::new(true),
802            )
803            .expect("build parser");
804            let mut builder =
805                SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
806            parser
807                .parse_inner(None, Some(datum), builder.row_writer())
808                .await
809                .unwrap();
810            builder.finish_current_chunk();
811            let chunk = builder.consume_ready_chunks().next().unwrap();
812            let mut rows = chunk.rows();
813            let (op, row) = rows.next().unwrap();
814            assert_eq!(op, Op::Insert);
815
816            // Verify _id
817            assert_eq!(
818                row.datum_at(0).to_owned_datum(),
819                Some(ScalarImpl::Int64(1004))
820            );
821
822            // Verify hobbies (list)
823            let hobbies = row.datum_at(1).to_owned_datum().unwrap();
824            if let ScalarImpl::List(list_data) = hobbies {
825                assert_eq!(list_data.len(), 3);
826                assert_eq!(
827                    list_data.get(0),
828                    Some(Some(
829                        ScalarImpl::Utf8("reading".into()).as_scalar_ref_impl()
830                    ))
831                );
832                assert_eq!(
833                    list_data.get(1),
834                    Some(Some(
835                        ScalarImpl::Utf8("traveling".into()).as_scalar_ref_impl()
836                    ))
837                );
838                assert_eq!(
839                    list_data.get(2),
840                    Some(Some(ScalarImpl::Utf8("coding".into()).as_scalar_ref_impl()))
841                );
842            } else {
843                panic!("Expected list type for hobbies");
844            }
845        }
846    }
847    #[tokio::test]
848    async fn test_null_and_overflow() {
849        let columns = vec![
850            ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
851            ColumnDesc::named("name", ColumnId::new(1), DataType::Varchar),
852            ColumnDesc::named("age", ColumnId::new(2), DataType::Int32),
853            ColumnDesc::named("birth_date", ColumnId::new(3), DataType::Date),
854            ColumnDesc::named("created_at", ColumnId::new(4), DataType::Timestamptz),
855        ];
856        let columns = columns
857            .iter()
858            .map(SourceColumnDesc::from)
859            .collect::<Vec<_>>();
860
861        let data = [
862            // all, all fields except _id is null
863            br#"{"schema":null,"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"}, \"name\": null, \"age\": null, \"birth_date\": null, \"created_at\": null}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
864            // payload field only
865            br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"}, \"name\": null, \"age\": null, \"birth_date\": null, \"created_at\": null}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec(),
866        ];
867
868        let source_ctx = generate_source_context_ref();
869
870        let mut parser = DebeziumMongoJsonParser::new(
871            columns.clone(),
872            source_ctx.clone(),
873            MongoProperties::new(true),
874        )
875        .expect("build parser");
876        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
877
878        // all null
879        for text in data {
880            parser
881                .parse_inner(None, Some(text.clone()), builder.row_writer())
882                .await
883                .unwrap();
884            builder.finish_current_chunk();
885            let chunk = builder.consume_ready_chunks().next().unwrap();
886            let mut rows = chunk.rows();
887            let (op, row) = rows.next().unwrap();
888            assert_eq!(op, Op::Insert);
889            assert_eq!(
890                row.datum_at(0).to_owned_datum(),
891                Some(ScalarImpl::Int64(1004))
892            );
893            for i in 1..5 {
894                assert_eq!(row.datum_at(i).to_owned_datum(), None);
895            }
896        }
897    }
898}