risingwave_connector/parser/unified/
debezium.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::types::{
20    DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
21    Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
22};
23use risingwave_connector_codec::decoder::AccessExt;
24use risingwave_pb::plan_common::additional_column::ColumnType;
25use thiserror_ext::AsReport;
26
27use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
28use crate::parser::TransactionControl;
29use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
30use crate::parser::schema_change::TableChangeType;
31use crate::source::cdc::build_cdc_table_id;
32use crate::source::cdc::external::mysql::{
33    mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
34};
35use crate::source::{ConnectorProperties, SourceColumnDesc};
36
37// Example of Debezium JSON value:
38// {
39//     "payload":
40//     {
41//         "before": null,
42//         "after":
43//         {
44//             "O_ORDERKEY": 5,
45//             "O_CUSTKEY": 44485,
46//             "O_ORDERSTATUS": "F",
47//             "O_TOTALPRICE": "144659.20",
48//             "O_ORDERDATE": "1994-07-30"
49//         },
50//         "source":
51//         {
52//             "version": "1.9.7.Final",
53//             "connector": "mysql",
54//             "name": "RW_CDC_1002",
55//             "ts_ms": 1695277757000,
56//             "db": "mydb",
57//             "sequence": null,
58//             "table": "orders",
59//             "server_id": 0,
60//             "gtid": null,
61//             "file": "binlog.000008",
62//             "pos": 3693,
63//             "row": 0,
64//         },
65//         "op": "r",
66//         "ts_ms": 1695277757017,
67//         "transaction": null
68//     }
69// }
70pub struct DebeziumChangeEvent<A> {
71    value_accessor: Option<A>,
72    key_accessor: Option<A>,
73    is_mongodb: bool,
74}
75
76const BEFORE: &str = "before";
77const AFTER: &str = "after";
78
79const UPSTREAM_DDL: &str = "ddl";
80const SOURCE: &str = "source";
81const SOURCE_TS_MS: &str = "ts_ms";
82const SOURCE_DB: &str = "db";
83const SOURCE_SCHEMA: &str = "schema";
84const SOURCE_TABLE: &str = "table";
85const SOURCE_COLLECTION: &str = "collection";
86
87const OP: &str = "op";
88pub const TRANSACTION_STATUS: &str = "status";
89pub const TRANSACTION_ID: &str = "id";
90
91pub const TABLE_CHANGES: &str = "tableChanges";
92
93pub const DEBEZIUM_READ_OP: &str = "r";
94pub const DEBEZIUM_CREATE_OP: &str = "c";
95pub const DEBEZIUM_UPDATE_OP: &str = "u";
96pub const DEBEZIUM_DELETE_OP: &str = "d";
97
98pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
99pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
100
101pub fn parse_transaction_meta(
102    accessor: &impl Access,
103    connector_props: &ConnectorProperties,
104) -> AccessResult<TransactionControl> {
105    if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
106        accessor
107            .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
108            .to_datum_ref(),
109        accessor
110            .access(&[TRANSACTION_ID], &DataType::Varchar)?
111            .to_datum_ref(),
112    ) {
113        // The id field has different meanings for different databases:
114        // PG: txID:LSN
115        // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23)
116        // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002)
117        match status {
118            DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
119                ConnectorProperties::PostgresCdc(_) => {
120                    let (tx_id, _) = id.split_once(':').unwrap();
121                    return Ok(TransactionControl::Begin { id: tx_id.into() });
122                }
123                ConnectorProperties::MysqlCdc(_) => {
124                    return Ok(TransactionControl::Begin { id: id.into() });
125                }
126                ConnectorProperties::SqlServerCdc(_) => {
127                    return Ok(TransactionControl::Begin { id: id.into() });
128                }
129                _ => {}
130            },
131            DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
132                ConnectorProperties::PostgresCdc(_) => {
133                    let (tx_id, _) = id.split_once(':').unwrap();
134                    return Ok(TransactionControl::Commit { id: tx_id.into() });
135                }
136                ConnectorProperties::MysqlCdc(_) => {
137                    return Ok(TransactionControl::Commit { id: id.into() });
138                }
139                ConnectorProperties::SqlServerCdc(_) => {
140                    return Ok(TransactionControl::Commit { id: id.into() });
141                }
142                _ => {}
143            },
144            _ => {}
145        }
146    }
147
148    Err(AccessError::Undefined {
149        name: "transaction status".into(),
150        path: TRANSACTION_STATUS.into(),
151    })
152}
153
154macro_rules! jsonb_access_field {
155    ($col:expr, $field:expr, $as_type:tt) => {
156        $crate::paste! {
157            $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
158        }
159    };
160}
161
162/// Parse the schema change message from Debezium.
163/// The layout of MySQL schema change message can refer to
164/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
165pub fn parse_schema_change(
166    accessor: &impl Access,
167    source_id: u32,
168    connector_props: &ConnectorProperties,
169) -> AccessResult<SchemaChangeEnvelope> {
170    let mut schema_changes = vec![];
171
172    let upstream_ddl: String = accessor
173        .access(&[UPSTREAM_DDL], &DataType::Varchar)?
174        .to_owned_datum()
175        .unwrap()
176        .as_utf8()
177        .to_string();
178
179    if let Some(ScalarRefImpl::List(table_changes)) = accessor
180        .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
181        .to_datum_ref()
182    {
183        for datum in table_changes.iter() {
184            let jsonb = match datum {
185                Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
186                _ => unreachable!(""),
187            };
188
189            let id = jsonb_access_field!(jsonb, "id", string);
190            let ty = jsonb_access_field!(jsonb, "type", string);
191            let ddl_type: TableChangeType = ty.as_str().into();
192            if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
193                tracing::debug!("skip table schema change for create/drop command");
194                continue;
195            }
196
197            let mut column_descs: Vec<ColumnDesc> = vec![];
198            if let Some(table) = jsonb.access_object_field("table")
199                && let Some(columns) = table.access_object_field("columns")
200            {
201                for col in columns.array_elements().unwrap() {
202                    let name = jsonb_access_field!(col, "name", string);
203                    let type_name = jsonb_access_field!(col, "typeName", string);
204
205                    let data_type = match *connector_props {
206                        ConnectorProperties::PostgresCdc(_) => {
207                            DataType::from_str(type_name.as_str()).map_err(|err| {
208                                tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
209                                AccessError::UnsupportedType {
210                                    ty: type_name.clone(),
211                                }
212                            })?
213                        }
214                        ConnectorProperties::MysqlCdc(_) => {
215                            let ty = type_name_to_mysql_type(type_name.as_str());
216                            match ty {
217                                Some(ty) => mysql_type_to_rw_type(&ty).map_err(|err| {
218                                    tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
219                                    AccessError::UnsupportedType {
220                                        ty: type_name.clone(),
221                                    }
222                                })?,
223                                None => {
224                                    Err(AccessError::UnsupportedType { ty: type_name })?
225                                }
226                            }
227                        }
228                        _ => {
229                            unreachable!()
230                        }
231                    };
232
233                    // handle default value expression, currently we only support constant expression
234                    let column_desc = match col.access_object_field("defaultValueExpression") {
235                        Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
236                            let value_text: Option<String>;
237                            let default_val_expr_str = default_val_expr_str.as_str().unwrap();
238                            match *connector_props {
239                                ConnectorProperties::PostgresCdc(_) => {
240                                    // default value of non-number data type will be stored as
241                                    // "'value'::type"
242                                    match default_val_expr_str
243                                        .split("::")
244                                        .map(|s| s.trim_matches('\''))
245                                        .next()
246                                    {
247                                        None => {
248                                            value_text = None;
249                                        }
250                                        Some(val_text) => {
251                                            value_text = Some(val_text.to_owned());
252                                        }
253                                    }
254                                }
255                                ConnectorProperties::MysqlCdc(_) => {
256                                    // mysql timestamp is mapped to timestamptz, we use UTC timezone to
257                                    // interpret its value
258                                    if data_type == DataType::Timestamptz {
259                                        value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
260                                            tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
261                                            AccessError::TypeError {
262                                                expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
263                                                got: data_type.to_string(),
264                                                value: default_val_expr_str.to_owned(),
265                                            }
266                                        })?);
267                                    } else {
268                                        value_text = Some(default_val_expr_str.to_owned());
269                                    }
270                                }
271                                _ => {
272                                    unreachable!("connector doesn't support schema change")
273                                }
274                            }
275
276                            let snapshot_value: Datum = if let Some(value_text) = value_text {
277                                Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
278                                    |err| {
279                                        tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
280                                        AccessError::TypeError {
281                                            expected: "constant expression".into(),
282                                            got: data_type.to_string(),
283                                            value: value_text,
284                                        }
285                                    },
286                                )?)
287                            } else {
288                                None
289                            };
290
291                            if snapshot_value.is_none() {
292                                tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
293                                ColumnDesc::named(name, ColumnId::placeholder(), data_type)
294                            } else {
295                                ColumnDesc::named_with_default_value(
296                                    name,
297                                    ColumnId::placeholder(),
298                                    data_type,
299                                    snapshot_value,
300                                )
301                            }
302                        }
303                        _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
304                    };
305                    column_descs.push(column_desc);
306                }
307            }
308
309            // concatenate the source_id to the cdc_table_id
310            let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
311            schema_changes.push(TableSchemaChange {
312                cdc_table_id,
313                columns: column_descs
314                    .into_iter()
315                    .map(|column_desc| ColumnCatalog {
316                        column_desc,
317                        is_hidden: false,
318                    })
319                    .collect_vec(),
320                change_type: ty.as_str().into(),
321                upstream_ddl: upstream_ddl.clone(),
322            });
323        }
324
325        Ok(SchemaChangeEnvelope {
326            table_changes: schema_changes,
327        })
328    } else {
329        Err(AccessError::Undefined {
330            name: "table schema change".into(),
331            path: TABLE_CHANGES.into(),
332        })
333    }
334}
335
336impl<A> DebeziumChangeEvent<A>
337where
338    A: Access,
339{
340    /// Panic: one of the `key_accessor` or `value_accessor` must be provided.
341    pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
342        assert!(key_accessor.is_some() || value_accessor.is_some());
343        Self {
344            value_accessor,
345            key_accessor,
346            is_mongodb: false,
347        }
348    }
349
350    pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
351        assert!(key_accessor.is_some() || value_accessor.is_some());
352        Self {
353            value_accessor,
354            key_accessor,
355            is_mongodb: true,
356        }
357    }
358
359    /// Returns the transaction metadata if exists.
360    ///
361    /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details.
362    pub(crate) fn transaction_control(
363        &self,
364        connector_props: &ConnectorProperties,
365    ) -> Option<TransactionControl> {
366        // Ignore if `value_accessor` is not provided or there's any error when
367        // trying to parse the transaction metadata.
368        self.value_accessor
369            .as_ref()
370            .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
371    }
372}
373
374impl<A> ChangeEvent for DebeziumChangeEvent<A>
375where
376    A: Access,
377{
378    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
379        match self.op()? {
380            ChangeEventOperation::Delete => {
381                // For delete events of MongoDB, the "before" and "after" field both are null in the value,
382                // we need to extract the _id field from the key.
383                if self.is_mongodb && desc.name == "_id" {
384                    return self
385                        .key_accessor
386                        .as_ref()
387                        .expect("key_accessor must be provided for delete operation")
388                        .access(&[&desc.name], &desc.data_type);
389                }
390
391                if let Some(va) = self.value_accessor.as_ref() {
392                    va.access(&[BEFORE, &desc.name], &desc.data_type)
393                } else {
394                    self.key_accessor
395                        .as_ref()
396                        .unwrap()
397                        .access(&[&desc.name], &desc.data_type)
398                }
399            }
400
401            // value should not be None.
402            ChangeEventOperation::Upsert => {
403                // For upsert operation, if desc is an additional column, access field in the `SOURCE` field.
404                desc.additional_column.column_type.as_ref().map_or_else(
405                    || {
406                        self.value_accessor
407                            .as_ref()
408                            .expect("value_accessor must be provided for upsert operation")
409                            .access(&[AFTER, &desc.name], &desc.data_type)
410                    },
411                    |additional_column_type| {
412                        match *additional_column_type {
413                            ColumnType::Timestamp(_) => {
414                                // access payload.source.ts_ms
415                                let ts_ms = self
416                                    .value_accessor
417                                    .as_ref()
418                                    .expect("value_accessor must be provided for upsert operation")
419                                    .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
420                                Ok(DatumCow::Owned(ts_ms.map(|scalar| {
421                                    Timestamptz::from_millis(scalar.into_int64())
422                                        .expect("source.ts_ms must in millisecond")
423                                        .to_scalar_value()
424                                })))
425                            }
426                            ColumnType::DatabaseName(_) => self
427                                .value_accessor
428                                .as_ref()
429                                .expect("value_accessor must be provided for upsert operation")
430                                .access(&[SOURCE, SOURCE_DB], &desc.data_type),
431                            ColumnType::SchemaName(_) => self
432                                .value_accessor
433                                .as_ref()
434                                .expect("value_accessor must be provided for upsert operation")
435                                .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
436                            ColumnType::TableName(_) => self
437                                .value_accessor
438                                .as_ref()
439                                .expect("value_accessor must be provided for upsert operation")
440                                .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
441                            ColumnType::CollectionName(_) => self
442                                .value_accessor
443                                .as_ref()
444                                .expect("value_accessor must be provided for upsert operation")
445                                .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
446                            _ => Err(AccessError::UnsupportedAdditionalColumn {
447                                name: desc.name.clone(),
448                            }),
449                        }
450                    },
451                )
452            }
453        }
454    }
455
456    fn op(&self) -> Result<ChangeEventOperation, AccessError> {
457        if let Some(accessor) = &self.value_accessor {
458            if let Some(ScalarRefImpl::Utf8(op)) =
459                accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
460            {
461                match op {
462                    DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
463                        return Ok(ChangeEventOperation::Upsert);
464                    }
465                    DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
466                    _ => (),
467                }
468            }
469            Err(super::AccessError::Undefined {
470                name: "op".into(),
471                path: Default::default(),
472            })
473        } else {
474            Ok(ChangeEventOperation::Delete)
475        }
476    }
477}
478
479/// Access support for Mongo
480///
481/// For now, we considerate `strong_schema` typed `MongoDB` Debezium event jsons only.
482pub struct MongoJsonAccess<A> {
483    accessor: A,
484    strong_schema: bool,
485}
486
487pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
488    let id_field = if let Some(value) = bson_doc.get("_id") {
489        value
490    } else {
491        bson_doc
492    };
493
494    let type_error = || AccessError::TypeError {
495        expected: id_type.to_string(),
496        got: match id_field {
497            serde_json::Value::Null => "null",
498            serde_json::Value::Bool(_) => "bool",
499            serde_json::Value::Number(_) => "number",
500            serde_json::Value::String(_) => "string",
501            serde_json::Value::Array(_) => "array",
502            serde_json::Value::Object(_) => "object",
503        }
504        .to_owned(),
505        value: id_field.to_string(),
506    };
507
508    let id: Datum = match id_type {
509        DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
510        DataType::Varchar => match id_field {
511            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
512            serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
513                obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
514            )),
515            _ => return Err(type_error()),
516        },
517        DataType::Int32 => {
518            if let serde_json::Value::Object(obj) = id_field
519                && obj.contains_key("$numberInt")
520            {
521                let int_str = obj["$numberInt"].as_str().unwrap_or_default();
522                Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
523            } else {
524                return Err(type_error());
525            }
526        }
527        DataType::Int64 => {
528            if let serde_json::Value::Object(obj) = id_field
529                && obj.contains_key("$numberLong")
530            {
531                let int_str = obj["$numberLong"].as_str().unwrap_or_default();
532                Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
533            } else {
534                return Err(type_error());
535            }
536        }
537        _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
538    };
539    Ok(id)
540}
541
542/// Extract the field data from the bson document
543///
544/// BSON document is a JSON object with some special fields, such as:
545/// long integer: {"$numberLong": "1630454400000"}
546/// date time: {"$date": {"$numberLong": "1630454400000"}}
547///
548/// For now, we support only the Canonical format of the date and timestamp.
549///
550/// # NOTE:
551///
552/// - `field` indicates the field name in the bson document, if it is None, the `bson_doc` is the field itself.
553// similar to extract the "_id" field from the message payload
554pub fn extract_bson_field(
555    type_expected: &DataType,
556    bson_doc: &serde_json::Value,
557    field: Option<&str>,
558) -> AccessResult {
559    let type_error = |datum: &serde_json::Value| AccessError::TypeError {
560        expected: type_expected.to_string(),
561        got: match bson_doc {
562            serde_json::Value::Null => "null",
563            serde_json::Value::Bool(_) => "bool",
564            serde_json::Value::Number(_) => "number",
565            serde_json::Value::String(_) => "string",
566            serde_json::Value::Array(_) => "array",
567            serde_json::Value::Object(_) => "object",
568        }
569        .to_owned(),
570        value: datum.to_string(),
571    };
572
573    let datum = if field.is_some() {
574        let Some(bson_doc) = bson_doc.get(field.unwrap()) else {
575            return Err(type_error(bson_doc));
576        };
577        bson_doc
578    } else {
579        bson_doc
580    };
581
582    if datum.is_null() {
583        return Ok(None);
584    }
585
586    let field_datum: Datum = match type_expected {
587        DataType::Boolean => {
588            if datum.is_boolean() {
589                Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
590            } else {
591                return Err(type_error(datum));
592            }
593        }
594        DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
595        DataType::Varchar => match datum {
596            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
597            serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
598                obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
599            }
600            _ => return Err(type_error(datum)),
601        },
602        DataType::Int16
603        | DataType::Int32
604        | DataType::Int64
605        | DataType::Int256
606        | DataType::Float32
607        | DataType::Float64 => {
608            if !datum.is_object() {
609                return Err(type_error(datum));
610            };
611
612            bson_extract_number(datum, type_expected)?
613        }
614
615        DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
616            if let serde_json::Value::Object(mp) = datum {
617                if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
618                    bson_extract_timestamp(datum, type_expected)?
619                } else if mp.contains_key("$date") {
620                    bson_extract_date(datum, type_expected)?
621                } else {
622                    return Err(type_error(datum));
623                }
624            } else {
625                return Err(type_error(datum));
626            }
627        }
628        DataType::Decimal => {
629            if let serde_json::Value::Object(obj) = datum
630                && obj.contains_key("$numberDecimal")
631                && obj["$numberDecimal"].is_string()
632            {
633                let number = obj["$numberDecimal"].as_str().unwrap();
634
635                let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
636                    AccessError::TypeError {
637                        expected: type_expected.to_string(),
638                        got: "unparsable string".into(),
639                        value: number.to_owned(),
640                    }
641                })?;
642                Some(ScalarImpl::Decimal(dec))
643            } else {
644                return Err(type_error(datum));
645            }
646        }
647
648        DataType::Bytea => {
649            if let serde_json::Value::Object(obj) = datum
650                && obj.contains_key("$binary")
651                && obj["$binary"].is_object()
652            {
653                use base64::Engine;
654
655                let binary = obj["$binary"].as_object().unwrap();
656
657                if !binary.contains_key("$base64")
658                    || !binary["$base64"].is_string()
659                    || !binary.contains_key("$subType")
660                    || !binary["$subType"].is_string()
661                {
662                    return Err(AccessError::TypeError {
663                        expected: type_expected.to_string(),
664                        got: "object".into(),
665                        value: datum.to_string(),
666                    });
667                }
668
669                let b64_str = binary["$base64"]
670                    .as_str()
671                    .ok_or_else(|| AccessError::TypeError {
672                        expected: type_expected.to_string(),
673                        got: "object".into(),
674                        value: datum.to_string(),
675                    })?;
676
677                // type is not used for now
678                let _type_str =
679                    binary["$subType"]
680                        .as_str()
681                        .ok_or_else(|| AccessError::TypeError {
682                            expected: type_expected.to_string(),
683                            got: "object".into(),
684                            value: datum.to_string(),
685                        })?;
686
687                let bytes = base64::prelude::BASE64_STANDARD
688                    .decode(b64_str)
689                    .map_err(|_| AccessError::TypeError {
690                        expected: "$binary object with $base64 string and $subType string field"
691                            .to_owned(),
692                        got: "string".to_owned(),
693                        value: bson_doc.to_string(),
694                    })?;
695                let bytea = ScalarImpl::Bytea(bytes.into());
696                Some(bytea)
697            } else {
698                return Err(type_error(datum));
699            }
700        }
701
702        DataType::Struct(struct_fields) => {
703            let mut datums = vec![];
704            for (field_name, field_type) in struct_fields.iter() {
705                let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
706                datums.push(field_datum);
707            }
708            let value = StructValue::new(datums);
709
710            Some(ScalarImpl::Struct(value))
711        }
712
713        DataType::List(list_type) => {
714            let Some(d_array) = datum.as_array() else {
715                return Err(type_error(datum));
716            };
717
718            let mut builder = list_type.create_array_builder(d_array.len());
719            for item in d_array {
720                builder.append(extract_bson_field(list_type, item, None)?);
721            }
722            Some(ScalarImpl::from(ListValue::new(builder.finish())))
723        }
724
725        _ => {
726            if let Some(field_name) = field {
727                unreachable!(
728                    "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
729                )
730            } else {
731                let type_expected = type_expected.to_string();
732                unreachable!(
733                    "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
734                )
735            }
736        }
737    };
738    Ok(field_datum)
739}
740
741fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
742    let field_name = match type_expected {
743        DataType::Int16 => "$numberInt",
744        DataType::Int32 => "$numberInt",
745        DataType::Int64 => "$numberLong",
746        DataType::Int256 => "$numberLong",
747        DataType::Float32 => "$numberDouble",
748        DataType::Float64 => "$numberDouble",
749        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
750    };
751
752    let datum = bson_doc.get(field_name);
753    if datum.is_none() {
754        return Err(AccessError::TypeError {
755            expected: type_expected.to_string(),
756            got: "object".into(),
757            value: bson_doc.to_string(),
758        });
759    }
760
761    let datum = datum.unwrap();
762
763    if datum.is_string() {
764        let Some(num_str) = datum.as_str() else {
765            return Err(AccessError::TypeError {
766                expected: type_expected.to_string(),
767                got: "string".into(),
768                value: datum.to_string(),
769            });
770        };
771        // parse to float
772        if [DataType::Float32, DataType::Float64].contains(type_expected) {
773            match (num_str, type_expected) {
774                ("Infinity", DataType::Float64) => {
775                    return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
776                }
777                ("Infinity", DataType::Float32) => {
778                    return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
779                }
780                ("-Infinity", DataType::Float64) => {
781                    return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
782                }
783                ("-Infinity", DataType::Float32) => {
784                    return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
785                }
786                ("NaN", DataType::Float64) => {
787                    return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
788                }
789                ("NaN", DataType::Float32) => {
790                    return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
791                }
792                _ => {}
793            }
794
795            let parsed_num: f64 = match num_str.parse() {
796                Ok(n) => n,
797                Err(_e) => {
798                    return Err(AccessError::TypeError {
799                        expected: type_expected.to_string(),
800                        got: "string".into(),
801                        value: num_str.to_owned(),
802                    });
803                }
804            };
805            if *type_expected == DataType::Float64 {
806                return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
807            } else {
808                let parsed_num = parsed_num as f32;
809                return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
810            }
811        }
812        // parse to large int
813        if *type_expected == DataType::Int256 {
814            let parsed_num = match Int256::from_str(num_str) {
815                Ok(n) => n,
816                Err(_) => {
817                    return Err(AccessError::TypeError {
818                        expected: type_expected.to_string(),
819                        got: "string".into(),
820                        value: num_str.to_owned(),
821                    });
822                }
823            };
824            return Ok(Some(ScalarImpl::Int256(parsed_num)));
825        }
826
827        // parse to integer
828        let parsed_num: i64 = match num_str.parse() {
829            Ok(n) => n,
830            Err(_e) => {
831                return Err(AccessError::TypeError {
832                    expected: type_expected.to_string(),
833                    got: "string".into(),
834                    value: num_str.to_owned(),
835                });
836            }
837        };
838        match type_expected {
839            DataType::Int16 => {
840                if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
841                    return Err(AccessError::TypeError {
842                        expected: type_expected.to_string(),
843                        got: "string".into(),
844                        value: num_str.to_owned(),
845                    });
846                }
847                return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
848            }
849            DataType::Int32 => {
850                if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
851                    return Err(AccessError::TypeError {
852                        expected: type_expected.to_string(),
853                        got: "string".into(),
854                        value: num_str.to_owned(),
855                    });
856                }
857                return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
858            }
859            DataType::Int64 => {
860                return Ok(Some(ScalarImpl::Int64(parsed_num)));
861            }
862            _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
863        }
864    }
865    if datum.is_null() {
866        return Err(AccessError::TypeError {
867            expected: type_expected.to_string(),
868            got: "null".into(),
869            value: bson_doc.to_string(),
870        });
871    }
872
873    if datum.is_array() {
874        return Err(AccessError::TypeError {
875            expected: type_expected.to_string(),
876            got: "array".to_owned(),
877            value: datum.to_string(),
878        });
879    }
880
881    if datum.is_object() {
882        return Err(AccessError::TypeError {
883            expected: type_expected.to_string(),
884            got: "object".to_owned(),
885            value: datum.to_string(),
886        });
887    }
888
889    if datum.is_boolean() {
890        return Err(AccessError::TypeError {
891            expected: type_expected.to_string(),
892            got: "boolean".into(),
893            value: bson_doc.to_string(),
894        });
895    }
896
897    if datum.is_number() {
898        let got_type = if datum.is_f64() { "f64" } else { "i64" };
899        return Err(AccessError::TypeError {
900            expected: type_expected.to_string(),
901            got: got_type.into(),
902            value: bson_doc.to_string(),
903        });
904    }
905
906    Err(AccessError::TypeError {
907        expected: type_expected.to_string(),
908        got: "unknown".into(),
909        value: bson_doc.to_string(),
910    })
911}
912
913fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
914    // according to mongodb extended json v2
915    // the date could be:
916    //
917    // the timestamp type could be:
918    //
919    // both Canonical and Relaxed format:
920    // {"$timestamp": {"t": 1630454400, "i": 1}}
921    //
922    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
923    // date is encoded as number of milliseconds since the Unix epoch
924    //
925    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
926    // date is encoded as ISO8601 string
927
928    let datum = &bson_doc["$date"];
929
930    let type_error = || AccessError::TypeError {
931        expected: type_expected.to_string(),
932        got: match bson_doc {
933            serde_json::Value::Null => "null",
934            serde_json::Value::Bool(_) => "bool",
935            serde_json::Value::Number(_) => "number",
936            serde_json::Value::String(_) => "string",
937            serde_json::Value::Array(_) => "array",
938            serde_json::Value::Object(_) => "object",
939        }
940        .to_owned(),
941        value: datum.to_string(),
942    };
943
944    // deal with the Canonical format only
945    let millis = match datum {
946        // Canonical format {"$date": {"$numberLong": "1630454400000"}}
947        serde_json::Value::Object(obj)
948            if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
949        {
950            obj["$numberLong"]
951                .as_str()
952                .unwrap()
953                .parse::<i64>()
954                .map_err(|_| AccessError::TypeError {
955                    expected: "timestamp".into(),
956                    got: "object".into(),
957                    value: datum.to_string(),
958                })?
959        }
960        // Relaxed format {"$date": "2021-09-01T00:00:00.000Z"}
961        serde_json::Value::String(s) => {
962            let dt =
963                chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
964                    expected: "valid ISO-8601 date string".into(),
965                    got: "string".into(),
966                    value: datum.to_string(),
967                })?;
968            dt.timestamp_millis()
969        }
970
971        // jsonv1 format
972        // {"$date": 1630454400000}
973        serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
974            expected: "timestamp".into(),
975            got: "number".into(),
976            value: datum.to_string(),
977        })?,
978
979        _ => return Err(type_error()),
980    };
981
982    let datetime =
983        chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
984            expected: "timestamp".into(),
985            got: "object".into(),
986            value: datum.to_string(),
987        })?;
988
989    let res = match type_expected {
990        DataType::Date => {
991            let naive = datetime.naive_local();
992            let dt = naive.date();
993            Some(ScalarImpl::Date(dt.into()))
994        }
995        DataType::Time => {
996            let naive = datetime.naive_local();
997            let dt = naive.time();
998            Some(ScalarImpl::Time(dt.into()))
999        }
1000        DataType::Timestamp => {
1001            let naive = datetime.naive_local();
1002            let dt = Timestamp::from(naive);
1003            Some(ScalarImpl::Timestamp(dt))
1004        }
1005        DataType::Timestamptz => {
1006            let dt = datetime.into();
1007            Some(ScalarImpl::Timestamptz(dt))
1008        }
1009        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1010    };
1011    Ok(res)
1012}
1013
1014fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1015    // according to mongodb extended json v2
1016    // the date could be:
1017    //
1018    // the timestamp type could be:
1019    //
1020    // both Canonical and Relaxed format:
1021    // {"$timestamp": {"t": 1630454400, "i": 1}}
1022    // t is the number of seconds since the Unix epoch
1023    //
1024    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1025    // date is encoded as number of milliseconds since the Unix epoch
1026    //
1027    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1028    // date is encoded as ISO8601 string
1029    //
1030    // *For now, we support the Canonical format only.*
1031
1032    let Some(obj) = bson_doc["$timestamp"].as_object() else {
1033        return Err(AccessError::TypeError {
1034            expected: "timestamp".into(),
1035            got: "object".into(),
1036            value: bson_doc.to_string(),
1037        });
1038    };
1039
1040    if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1041    {
1042        return Err(AccessError::TypeError {
1043            expected: "timestamp with valid seconds since epoch".into(),
1044            got: "object".into(),
1045            value: bson_doc.to_string(),
1046        });
1047    }
1048
1049    let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1050        expected: "timestamp with valid seconds since epoch".into(),
1051        got: "object".into(),
1052        value: bson_doc.to_string(),
1053    })?;
1054
1055    let chrono_datetime =
1056        chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1057            expected: type_expected.to_string(),
1058            got: "object".to_owned(),
1059            value: bson_doc.to_string(),
1060        })?;
1061
1062    let res = match type_expected {
1063        DataType::Date => {
1064            let naive = chrono_datetime.naive_local();
1065            let dt = naive.date();
1066            Some(ScalarImpl::Date(dt.into()))
1067        }
1068        DataType::Time => {
1069            let naive = chrono_datetime.naive_local();
1070            let dt = naive.time();
1071            Some(ScalarImpl::Time(dt.into()))
1072        }
1073        DataType::Timestamp => {
1074            let naive = chrono_datetime.naive_local();
1075            let dt = Timestamp::from(naive);
1076            Some(ScalarImpl::Timestamp(dt))
1077        }
1078        DataType::Timestamptz => {
1079            let dt = chrono_datetime.into();
1080            Some(ScalarImpl::Timestamptz(dt))
1081        }
1082        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1083    };
1084
1085    Ok(res)
1086}
1087
1088impl<A> MongoJsonAccess<A> {
1089    pub fn new(accessor: A, strong_schema: bool) -> Self {
1090        Self {
1091            accessor,
1092            strong_schema,
1093        }
1094    }
1095}
1096
1097impl<A> Access for MongoJsonAccess<A>
1098where
1099    A: Access,
1100{
1101    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1102        match path {
1103            ["after" | "before", "_id"] => {
1104                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1105                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1106                    Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1107                } else {
1108                    // fail to extract the "_id" field from the message payload
1109                    Err(AccessError::Undefined {
1110                        name: "_id".to_owned(),
1111                        path: path[0].to_owned(),
1112                    })?
1113                }
1114            }
1115
1116            ["after" | "before", "payload"] if !self.strong_schema => {
1117                self.access(&[path[0]], &DataType::Jsonb)
1118            }
1119
1120            ["after" | "before", field] if self.strong_schema => {
1121                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1122                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1123                    Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1124                } else {
1125                    // fail to extract the expected field from the message payload
1126                    Err(AccessError::Undefined {
1127                        name: field.to_string(),
1128                        path: path[0].to_owned(),
1129                    })?
1130                }
1131            }
1132
1133            // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
1134            // In addition, the "_id" field is named as "id" in the key. An example of message key:
1135            // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
1136            ["_id"] => {
1137                let ret = self.accessor.access(path, type_expected);
1138                if matches!(ret, Err(AccessError::Undefined { .. })) {
1139                    let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1140                    if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1141                        Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1142                    } else {
1143                        // fail to extract the "_id" field from the message key
1144                        Err(AccessError::Undefined {
1145                            name: "_id".to_owned(),
1146                            path: "id".to_owned(),
1147                        })?
1148                    }
1149                } else {
1150                    ret
1151                }
1152            }
1153            _ => self.accessor.access(path, type_expected),
1154        }
1155    }
1156}