1use std::fmt::Debug;
16
17use anyhow::Context;
18use risingwave_common::bail;
19use risingwave_common::types::DataType;
20
21use crate::error::ConnectorResult;
22use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
23use crate::parser::unified::debezium::DebeziumChangeEvent;
24use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
25use crate::parser::{
26 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties,
27 MongoProperties as MongoEncodingProperties, ParserFormat, SourceStreamChunkRowWriter,
28};
29use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
30
31#[derive(Debug)]
32pub struct DebeziumMongoJsonParser {
33 pub(crate) rw_columns: Vec<SourceColumnDesc>,
34 source_ctx: SourceContextRef,
35 key_builder: AccessBuilderImpl,
36 payload_builder: AccessBuilderImpl,
37}
38
39fn build_accessor_builder(config: EncodingProperties) -> anyhow::Result<AccessBuilderImpl> {
40 match config {
41 EncodingProperties::MongoJson(mongo_props) => Ok(AccessBuilderImpl::DebeziumMongoJson(
42 DebeziumMongoJsonAccessBuilder::new(mongo_props)?,
43 )),
44 _ => bail!("unsupported encoding for DEBEZIUM_MONGO format"),
45 }
46}
47
48impl DebeziumMongoJsonParser {
49 pub fn new(
50 rw_columns: Vec<SourceColumnDesc>,
51 source_ctx: SourceContextRef,
52 props: MongoEncodingProperties,
53 ) -> ConnectorResult<Self> {
54 let _id_column = rw_columns
55 .iter()
56 .find(|desc| {
57 desc.name == "_id"
58 && matches!(
59 desc.data_type,
60 DataType::Jsonb
61 | DataType::Varchar
62 | DataType::Int32
63 | DataType::Int64
64 )
65 })
66 .context("Debezium Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table")?.clone();
67
68 if !props.strong_schema {
69 let _payload_column = rw_columns
70 .iter()
71 .find(|desc| desc.name == "payload" && matches!(desc.data_type, DataType::Jsonb))
72 .context(
73 "Debezium Mongo needs a `payload` column with supported types Jsonb in table",
74 )?
75 .clone();
76
77 let columns = rw_columns
78 .iter()
79 .filter(|desc| desc.is_visible() && desc.additional_column.column_type.is_none())
80 .count();
81
82 if columns != 2 || !rw_columns.iter().any(|desc| desc.name == "_id") {
84 bail!("Debezium Mongo needs a `_id` column in table");
85 }
86 }
87
88 let encoding = EncodingProperties::MongoJson(props.clone());
90 let key_builder = build_accessor_builder(encoding.clone())?;
92
93 let payload_builder = build_accessor_builder(encoding)?;
94
95 Ok(Self {
96 rw_columns,
97 source_ctx,
98 key_builder,
99 payload_builder,
100 })
101 }
102
103 pub async fn parse_inner(
104 &mut self,
105 key: Option<Vec<u8>>,
106 payload: Option<Vec<u8>>,
107 mut writer: SourceStreamChunkRowWriter<'_>,
108 ) -> ConnectorResult<()> {
109 let meta = writer.source_meta();
110 let key_accessor = match key {
111 None => None,
112 Some(data) => Some(self.key_builder.generate_accessor(data, meta).await?),
113 };
114 let payload_accessor = match payload {
115 None => None,
116 Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
117 };
118
119 let row_op = DebeziumChangeEvent::new_mongodb_event(key_accessor, payload_accessor);
120 apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
121 }
122}
123
124impl ByteStreamSourceParser for DebeziumMongoJsonParser {
125 fn columns(&self) -> &[SourceColumnDesc] {
126 &self.rw_columns
127 }
128
129 fn source_ctx(&self) -> &SourceContext {
130 &self.source_ctx
131 }
132
133 fn parser_format(&self) -> ParserFormat {
134 ParserFormat::DebeziumMongo
135 }
136
137 async fn parse_one<'a>(
138 &'a mut self,
139 key: Option<Vec<u8>>,
140 payload: Option<Vec<u8>>,
141 writer: SourceStreamChunkRowWriter<'a>,
142 ) -> ConnectorResult<()> {
143 self.parse_inner(key, payload, writer).await
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::sync::Arc;
150
151 use risingwave_common::array::Op;
152 use risingwave_common::catalog::{ColumnDesc, ColumnId};
153 use risingwave_common::row::Row;
154 use risingwave_common::types::{ScalarImpl, StructType, ToOwnedDatum};
155
156 use super::*;
157 use crate::parser::unified::debezium::{extract_bson_field, extract_bson_id};
158 use crate::parser::{MongoProperties, SourceStreamChunkBuilder};
159 use crate::source::{ConnectorProperties, SourceCtrlOpts};
160 fn generate_source_context_ref() -> Arc<SourceContext> {
161 Arc::new(SourceContext {
162 connector_props: ConnectorProperties::MongodbCdc(Box::default()),
163 ..SourceContext::dummy()
164 })
165 }
166
167 #[test]
168 fn test_parse_bson_value_id_int() {
169 let data = r#"{"_id":{"$numberInt":"2345"}}"#;
170 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
171 let a = extract_bson_id(&DataType::Int32, &pld).unwrap();
172 assert_eq!(a, Some(ScalarImpl::Int32(2345)));
173 }
174 #[test]
175 fn test_parse_bson_value_id_long() {
176 let data = r#"{"_id":{"$numberLong":"22423434544"}}"#;
177 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
178
179 let a = extract_bson_id(&DataType::Int64, &pld).unwrap();
180 assert_eq!(a, Some(ScalarImpl::Int64(22423434544)));
181 }
182
183 #[test]
184 fn test_parse_bson_value_id_oid() {
185 let data = r#"{"_id":{"$oid":"5d505646cf6d4fe581014ab2"}}"#;
186 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
187 let a = extract_bson_id(&DataType::Varchar, &pld).unwrap();
188 assert_eq!(a, Some(ScalarImpl::Utf8("5d505646cf6d4fe581014ab2".into())));
189 }
190
191 #[test]
192 fn test_parse_bson_date() {
193 let data = r#"{"$date": {"$numberLong": "631152000000"}}"#;
194 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
195 let a = extract_bson_field(&DataType::Date, &pld, None).unwrap();
196 assert_eq!(
197 a,
198 Some(ScalarImpl::Date(
199 chrono::NaiveDate::from_ymd_opt(1990, 1, 1).unwrap().into()
200 ))
201 );
202
203 let data = r#"{"$date": 631152000000}"#;
204 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
205 let a = extract_bson_field(&DataType::Date, &pld, None).unwrap();
206 assert_eq!(
207 a,
208 Some(ScalarImpl::Date(
209 chrono::NaiveDate::from_ymd_opt(1990, 1, 1).unwrap().into()
210 ))
211 );
212
213 let data = r#"null"#;
214 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
215 let a = extract_bson_field(&DataType::Date, &pld, None).unwrap();
216 assert_eq!(a, None);
217 }
218 #[test]
219 fn test_parse_bson_timestamp() {
220 let data = r#"{"$timestamp": {"t": 1735689600, "i": 0}}"#;
221 let pld: serde_json::Value = serde_json::from_str(data).unwrap();
222 let a = extract_bson_field(&DataType::Timestamptz, &pld, None).unwrap();
223 assert_eq!(
224 a,
225 Some(ScalarImpl::Timestamptz(
226 chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z")
227 .unwrap()
228 .into()
229 ))
230 );
231 }
232
233 #[tokio::test]
234 async fn test_parse_delete_message() {
235 let (key, payload) = (
236 br#"{"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}"#.to_vec(),
238 br#"{"schema":null,"payload":{"before":null,"after":null,"updateDescription":null,"source":{"version":"2.4.2.Final","connector":"mongodb","name":"RW_CDC_3001","ts_ms":1706968217000,"snapshot":"false","db":"dev","sequence":null,"rs":"rs0","collection":"test","ord":2,"lsid":null,"txnNumber":null,"wallTime":null},"op":"d","ts_ms":1706968217377,"transaction":null}}"#.to_vec()
240 );
241
242 let columns = vec![
243 SourceColumnDesc::simple("_id", DataType::Varchar, ColumnId::from(0)),
244 SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
245 ];
246
247 let mut parser = DebeziumMongoJsonParser::new(
248 columns.clone(),
249 generate_source_context_ref(),
250 MongoProperties::default(),
251 )
252 .unwrap();
253 let mut builder =
254 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
255 parser
256 .parse_inner(Some(key), Some(payload), builder.row_writer())
257 .await
258 .unwrap();
259 builder.finish_current_chunk();
260 let chunk = builder.consume_ready_chunks().next().unwrap();
261 let mut rows = chunk.rows();
262
263 let (op, row) = rows.next().unwrap();
264 assert_eq!(op, Op::Delete);
265 assert_eq!(
267 row.datum_at(0).to_owned_datum(),
268 (Some(ScalarImpl::Utf8("65bc9fb6c485f419a7a877fe".into())))
269 );
270
271 assert_eq!(row.datum_at(1).to_owned_datum(), None);
273 }
274
275 #[tokio::test]
276 async fn test_long_id() {
277 let input = vec![
278 br#"{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
280 br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()];
282
283 let columns = vec![
284 SourceColumnDesc::simple("_id", DataType::Int64, ColumnId::from(0)),
285 SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
286 ];
287 for data in input {
288 let mut parser = DebeziumMongoJsonParser::new(
289 columns.clone(),
290 generate_source_context_ref(),
291 MongoProperties::default(),
292 )
293 .unwrap();
294
295 let mut builder =
296 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
297
298 parser
299 .parse_inner(None, Some(data), builder.row_writer())
300 .await
301 .unwrap();
302 builder.finish_current_chunk();
303 let chunk = builder.consume_ready_chunks().next().unwrap();
304 let mut rows = chunk.rows();
305 let (op, row) = rows.next().unwrap();
306 assert_eq!(op, Op::Insert);
307
308 assert_eq!(
309 row.datum_at(1).to_owned_datum(),
310 (Some(ScalarImpl::Jsonb(
311 serde_json::json!({"_id": {"$numberLong": "1004"}, "first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"}).into()
312 )))
313 );
314 assert_eq!(
315 row.datum_at(0).to_owned_datum(),
316 (Some(ScalarImpl::Int64(1004)))
317 );
318 }
319 }
320
321 #[tokio::test]
322 async fn test_with_or_without_schema_field() {
323 let input = vec![
324 br#"{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1001\"},\"first_name\": \"Sally\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"true","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
325 br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1001\"},\"first_name\": \"Sally\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"true","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()
326 ];
327
328 let columns = vec![
329 SourceColumnDesc::simple("_id", DataType::Int64, ColumnId::from(0)),
330 SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
331 ];
332
333 for data in input {
334 let mut parser = DebeziumMongoJsonParser::new(
335 columns.clone(),
336 generate_source_context_ref(),
337 MongoProperties::default(),
338 )
339 .unwrap();
340
341 let mut builder =
342 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
343
344 parser
345 .parse_inner(None, Some(data), builder.row_writer())
346 .await
347 .unwrap();
348 builder.finish_current_chunk();
349 let chunk = builder.consume_ready_chunks().next().unwrap();
350 let mut rows = chunk.rows();
351 let (op, row) = rows.next().unwrap();
352 assert_eq!(op, Op::Insert);
353
354 assert_eq!(
355 row.datum_at(1).to_owned_datum(),
356 (Some(ScalarImpl::Jsonb(
357serde_json::json!({"_id": {"$numberLong": "1001"},"first_name": "Sally","last_name": "Thomas","email": "sally.thomas@acme.com"}).into()
358 )))
359 );
360 assert_eq!(
361 row.datum_at(0).to_owned_datum(),
362 (Some(ScalarImpl::Int64(1001)))
363 );
364 }
365 }
366
367 #[tokio::test]
368 async fn test_strong_schema() {
369 let input = vec![
370 br#"{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
372 br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()];
374
375 let columns = vec![
376 ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
377 ColumnDesc::named("first_name", ColumnId::new(1), DataType::Varchar),
378 ColumnDesc::named("last_name", ColumnId::new(2), DataType::Varchar),
379 ColumnDesc::named("email", ColumnId::new(3), DataType::Varchar),
380 ];
381
382 let columns = columns
383 .iter()
384 .map(SourceColumnDesc::from)
385 .collect::<Vec<_>>();
386
387 let source_ctx = generate_source_context_ref();
388
389 for data in input {
390 let mut parser = DebeziumMongoJsonParser::new(
391 columns.clone(),
392 source_ctx.clone(),
393 MongoProperties::new(true),
394 )
395 .expect("build parser");
396
397 let mut builder =
398 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
399
400 parser
401 .parse_inner(None, Some(data), builder.row_writer())
402 .await
403 .unwrap();
404 builder.finish_current_chunk();
405
406 let chunk = builder.consume_ready_chunks().next().unwrap();
407 let mut rows = chunk.rows();
408 let (op, row) = rows.next().unwrap();
409
410 assert_eq!(op, Op::Insert);
411
412 let data = [
413 ScalarImpl::Int64(1004),
414 ScalarImpl::Utf8("Anne".into()),
415 ScalarImpl::Utf8("Kretchmar".into()),
416 ScalarImpl::Utf8("annek@noanswer.org".into()),
417 ];
418
419 for (i, datum) in data.iter().enumerate() {
420 assert_eq!(row.datum_at(i).to_owned_datum(), Some(datum.clone()));
421 }
422 }
423 }
424
425 #[tokio::test]
426 async fn test_strong_schema_datetime() {
427 let columns = vec![
428 ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
429 ColumnDesc::named("rocket type", ColumnId::new(1), DataType::Varchar),
430 ColumnDesc::named("freezed at", ColumnId::new(2), DataType::Date),
431 ColumnDesc::named("launch time", ColumnId::new(3), DataType::Timestamptz),
432 ];
433
434 let columns = columns
435 .iter()
436 .map(SourceColumnDesc::from)
437 .collect::<Vec<_>>();
438
439 let data = vec![
440 br#"
442{
443 "before": null,
444 "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"rocket type\": \"Starblazer X-2000\", \"freezed at\": {\"$date\": 1733011200000 }, \"launch time\": {\"$timestamp\": {\"t\": 1735689600, \"i\": 0}}}",
445 "source": {
446 "version": "2.1.4.Final",
447 "connector": "mongodb",
448 "name": "dbserver1",
449 "ts_ms": 1696502096000,
450 "snapshot": "false",
451 "db": "inventory",
452 "collection": "rockets",
453 "ord": 1,
454 "lsid": null,
455 "txnNumber": null
456 },
457 "op": "c",
458 "ts_ms": 1696502096000
459}
460"#.to_vec(),
461 br#"
462{
463 "schema": {
464 "type": "struct",
465 "fields": [
466 {
467 "field": "_id",
468 "type": "int64",
469 "optional": false
470 },
471 {
472 "field": "rocket type",
473 "type": "string",
474 "optional": true
475 },
476 {
477 "field": "freezed at",
478 "type": "string",
479 "name": "io.debezium.time.Date",
480 "version": 1,
481 "optional": true
482 },
483 {
484 "field": "launch time",
485 "type": "string",
486 "name": "io.debezium.time.ZonedTimestamp",
487 "version": 1,
488 "optional": true
489 }
490 ],
491 "optional": false,
492 "name": "dbserver1.inventory.rockets.Envelope"
493 },
494 "payload": {
495 "before": null,
496 "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"rocket type\": \"Starblazer X-2000\", \"freezed at\": {\"$date\": 1733011200000 }, \"launch time\": {\"$timestamp\": {\"t\": 1735689600, \"i\": 0}}}",
497 "source": {
498 "version": "2.1.4.Final",
499 "connector": "mongodb",
500 "name": "dbserver1",
501 "ts_ms": 1696502096000,
502 "snapshot": "false",
503 "db": "inventory",
504 "collection": "rockets",
505 "ord": 1,
506 "lsid": null,
507 "txnNumber": null
508 },
509 "op": "c",
510 "ts_ms": 1696502096000
511 }
512}
513"#.to_vec()
514 ];
515
516 let source_ctx: Arc<_> = generate_source_context_ref();
517 let expected_datetime =
518 chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z").unwrap();
519 let expected_date = chrono::DateTime::parse_from_rfc3339("2024-12-01T00:00:00Z")
520 .unwrap()
521 .date_naive();
522
523 for datum in data {
524 let mut parser = DebeziumMongoJsonParser::new(
525 columns.clone(),
526 source_ctx.clone(),
527 MongoProperties::new(true),
528 )
529 .expect("build parser");
530 let mut builder =
531 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
532 parser
533 .parse_inner(None, Some(datum), builder.row_writer())
534 .await
535 .unwrap();
536 builder.finish_current_chunk();
537 let chunk = builder.consume_ready_chunks().next().unwrap();
538 let mut rows = chunk.rows();
539 let (op, row) = rows.next().unwrap();
540 assert_eq!(op, Op::Insert);
541 let data = [
542 ScalarImpl::Int64(1004),
543 ScalarImpl::Utf8("Starblazer X-2000".into()),
544 ScalarImpl::Date(expected_date.into()),
545 ScalarImpl::Timestamptz(expected_datetime.into()),
546 ];
547 for (i, datum) in data.iter().enumerate() {
548 assert_eq!(row.datum_at(i).to_owned_datum(), Some(datum.clone()));
549 }
550 }
551 }
552
553 #[tokio::test]
554 async fn test_bson_v2_debezium_basic_types() {
555 let columns = vec![
556 ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
557 ColumnDesc::named("name", ColumnId::new(1), DataType::Varchar),
558 ColumnDesc::named("age", ColumnId::new(2), DataType::Int32),
559 ColumnDesc::named("birth_date", ColumnId::new(3), DataType::Date),
560 ColumnDesc::named("created_at", ColumnId::new(4), DataType::Timestamptz),
561 ];
562
563 let columns = columns
564 .iter()
565 .map(SourceColumnDesc::from)
566 .collect::<Vec<_>>();
567
568 let packet = br#"
569 {
570 "schema": {
571 "type": "struct",
572 "fields": [
573 {"field": "_id", "type": "int64", "optional": false},
574 {"field": "name", "type": "string", "optional": true},
575 {"field": "age", "type": "int32", "optional": true},
576 {"field": "birth_date", "type": "string", "name": "io.debezium.time.Date", "version": 1, "optional": true},
577 {"field": "created_at", "type": "string", "name": "io.debezium.time.ZonedTimestamp", "version": 1, "optional": true}
578 ],
579 "optional": false,
580 "name": "dbserver1.inventory.users.Envelope"
581 },
582 "payload": {
583 "before": null,
584 "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"name\": \"John Doe\", \"age\": {\"$numberInt\": \"30\"}, \"birth_date\": {\"$date\": 631152000000 }, \"created_at\": {\"$timestamp\": {\"t\": 1735689600, \"i\": 0}}}",
585 "source": {
586 "version": "2.1.4.Final",
587 "connector": "mongodb",
588 "name": "dbserver1",
589 "ts_ms": 1696502096000,
590 "snapshot": "false",
591 "db": "inventory",
592 "collection": "users",
593 "ord": 1,
594 "lsid": null,
595 "txnNumber": null
596 },
597 "op": "c",
598 "ts_ms": 1696502096000
599 }
600 }
601 "#.to_vec();
602
603 let source_ctx: Arc<_> = generate_source_context_ref();
604
605 let expected_birth_date = chrono::NaiveDate::from_ymd_opt(1990, 1, 1).unwrap();
606 let expected_created_at =
607 chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z").unwrap();
608
609 let mut parser = DebeziumMongoJsonParser::new(
610 columns.clone(),
611 source_ctx.clone(),
612 MongoProperties::new(true),
613 )
614 .expect("build parser");
615 let mut builder =
616 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
617 parser
618 .parse_inner(None, Some(packet), builder.row_writer())
619 .await
620 .unwrap();
621 builder.finish_current_chunk();
622 let chunk = builder.consume_ready_chunks().next().unwrap();
623 let mut rows = chunk.rows();
624 let (op, row) = rows.next().unwrap();
625 assert_eq!(op, Op::Insert);
626
627 let data = [
628 ScalarImpl::Int64(1004),
629 ScalarImpl::Utf8("John Doe".into()),
630 ScalarImpl::Int32(30),
631 ScalarImpl::Date(expected_birth_date.into()),
632 ScalarImpl::Timestamptz(expected_created_at.into()),
633 ];
634 for (i, datum) in data.iter().enumerate() {
635 assert_eq!(row.datum_at(i).to_owned_datum(), Some(datum.clone()));
636 }
637 }
638
639 #[tokio::test]
640 async fn test_bson_v2_debezium_struct() {
641 let columns = vec![
642 ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
643 ColumnDesc::named(
644 "struct_data",
645 ColumnId::new(1),
646 DataType::Struct(StructType::new([
647 ("struct_field1".to_owned(), DataType::Int64),
648 ("struct_field2".to_owned(), DataType::Varchar),
649 ])),
650 ),
651 ];
652
653 let columns = columns
654 .iter()
655 .map(SourceColumnDesc::from)
656 .collect::<Vec<_>>();
657
658 let data = vec![
659 br#"
660 {
661 "schema": {
662 "type": "struct",
663 "fields": [
664 {"field": "_id", "type": "int64", "optional": false},
665 {
666 "field": "struct_data",
667 "type": "struct",
668 "fields": [
669 {"field": "struct_field1", "type": "int64", "optional": true},
670 {"field": "struct_field2", "type": "string", "optional": true}
671 ],
672 "optional": true
673 }
674 ],
675 "optional": false,
676 "name": "dbserver1.inventory.users.Envelope"
677 },
678 "payload": {
679 "before": null,
680 "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"struct_data\": {\"struct_field1\": {\"$numberLong\": \"1131231321\"}, \"struct_field2\": \"example_value\"}}",
681 "source": {
682 "version": "2.1.4.Final",
683 "connector": "mongodb",
684 "name": "dbserver1",
685 "ts_ms": 1696502096000,
686 "snapshot": "false",
687 "db": "inventory",
688 "collection": "users",
689 "ord": 1,
690 "lsid": null,
691 "txnNumber": null
692 },
693 "op": "c",
694 "ts_ms": 1696502096000
695 }
696 }
697 "#.to_vec(),
698 ];
699
700 let source_ctx = generate_source_context_ref();
701 for datum in data {
702 let mut parser = DebeziumMongoJsonParser::new(
703 columns.clone(),
704 source_ctx.clone(),
705 MongoProperties::new(true),
706 )
707 .expect("build parser");
708 let mut builder =
709 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
710 parser
711 .parse_inner(None, Some(datum), builder.row_writer())
712 .await
713 .unwrap();
714 builder.finish_current_chunk();
715 let chunk = builder.consume_ready_chunks().next().unwrap();
716 let mut rows = chunk.rows();
717 let (op, row) = rows.next().unwrap();
718 assert_eq!(op, Op::Insert);
719
720 assert_eq!(
722 row.datum_at(0).to_owned_datum(),
723 Some(ScalarImpl::Int64(1004))
724 );
725
726 let struct_data = row.datum_at(1).to_owned_datum().unwrap();
728 if let ScalarImpl::Struct(struct_data) = struct_data {
729 assert_eq!(struct_data.fields()[0], Some(ScalarImpl::Int64(1131231321)));
730 assert_eq!(
731 struct_data.fields()[1],
732 Some(ScalarImpl::Utf8("example_value".into()))
733 );
734 } else {
735 panic!("Expected struct type for struct_data");
736 }
737 }
738 }
739
740 #[tokio::test]
741 async fn test_bson_v2_debezium_list() {
742 let columns = vec![
743 ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
744 ColumnDesc::named(
745 "hobbies",
746 ColumnId::new(1),
747 DataType::List(Box::new(DataType::Varchar)),
748 ),
749 ];
750
751 let columns = columns
752 .iter()
753 .map(SourceColumnDesc::from)
754 .collect::<Vec<_>>();
755
756 let data = vec![
757 br#"
758 {
759 "schema": {
760 "type": "struct",
761 "fields": [
762 {"field": "_id", "type": "int64", "optional": false},
763 {
764 "field": "hobbies",
765 "type": "array",
766 "items": {"type": "string", "optional": true},
767 "optional": true
768 }
769 ],
770 "optional": false,
771 "name": "dbserver1.inventory.users.Envelope"
772 },
773 "payload": {
774 "before": null,
775 "after": "{\"_id\": {\"$numberLong\": \"1004\"}, \"hobbies\": [\"reading\", \"traveling\", \"coding\"]}",
776 "source": {
777 "version": "2.1.4.Final",
778 "connector": "mongodb",
779 "name": "dbserver1",
780 "ts_ms": 1696502096000,
781 "snapshot": "false",
782 "db": "inventory",
783 "collection": "users",
784 "ord": 1,
785 "lsid": null,
786 "txnNumber": null
787 },
788 "op": "c",
789 "ts_ms": 1696502096000
790 }
791 }
792 "#.to_vec(),
793 ];
794
795 let source_ctx = generate_source_context_ref();
796
797 for datum in data {
798 let mut parser = DebeziumMongoJsonParser::new(
799 columns.clone(),
800 source_ctx.clone(),
801 MongoProperties::new(true),
802 )
803 .expect("build parser");
804 let mut builder =
805 SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test());
806 parser
807 .parse_inner(None, Some(datum), builder.row_writer())
808 .await
809 .unwrap();
810 builder.finish_current_chunk();
811 let chunk = builder.consume_ready_chunks().next().unwrap();
812 let mut rows = chunk.rows();
813 let (op, row) = rows.next().unwrap();
814 assert_eq!(op, Op::Insert);
815
816 assert_eq!(
818 row.datum_at(0).to_owned_datum(),
819 Some(ScalarImpl::Int64(1004))
820 );
821
822 let hobbies = row.datum_at(1).to_owned_datum().unwrap();
824 if let ScalarImpl::List(list_data) = hobbies {
825 assert_eq!(list_data.len(), 3);
826 assert_eq!(
827 list_data.get(0),
828 Some(Some(
829 ScalarImpl::Utf8("reading".into()).as_scalar_ref_impl()
830 ))
831 );
832 assert_eq!(
833 list_data.get(1),
834 Some(Some(
835 ScalarImpl::Utf8("traveling".into()).as_scalar_ref_impl()
836 ))
837 );
838 assert_eq!(
839 list_data.get(2),
840 Some(Some(ScalarImpl::Utf8("coding".into()).as_scalar_ref_impl()))
841 );
842 } else {
843 panic!("Expected list type for hobbies");
844 }
845 }
846 }
847 #[tokio::test]
848 async fn test_null_and_overflow() {
849 let columns = vec![
850 ColumnDesc::named("_id", ColumnId::new(0), DataType::Int64),
851 ColumnDesc::named("name", ColumnId::new(1), DataType::Varchar),
852 ColumnDesc::named("age", ColumnId::new(2), DataType::Int32),
853 ColumnDesc::named("birth_date", ColumnId::new(3), DataType::Date),
854 ColumnDesc::named("created_at", ColumnId::new(4), DataType::Timestamptz),
855 ];
856 let columns = columns
857 .iter()
858 .map(SourceColumnDesc::from)
859 .collect::<Vec<_>>();
860
861 let data = [
862 br#"{"schema":null,"payload":{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"}, \"name\": null, \"age\": null, \"birth_date\": null, \"created_at\": null}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}}"#.to_vec(),
864 br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"}, \"name\": null, \"age\": null, \"birth_date\": null, \"created_at\": null}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec(),
866 ];
867
868 let source_ctx = generate_source_context_ref();
869
870 let mut parser = DebeziumMongoJsonParser::new(
871 columns.clone(),
872 source_ctx.clone(),
873 MongoProperties::new(true),
874 )
875 .expect("build parser");
876 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
877
878 for text in data {
880 parser
881 .parse_inner(None, Some(text.clone()), builder.row_writer())
882 .await
883 .unwrap();
884 builder.finish_current_chunk();
885 let chunk = builder.consume_ready_chunks().next().unwrap();
886 let mut rows = chunk.rows();
887 let (op, row) = rows.next().unwrap();
888 assert_eq!(op, Op::Insert);
889 assert_eq!(
890 row.datum_at(0).to_owned_datum(),
891 Some(ScalarImpl::Int64(1004))
892 );
893 for i in 1..5 {
894 assert_eq!(row.datum_at(i).to_owned_datum(), None);
895 }
896 }
897 }
898}