risingwave_connector/parser/unified/
debezium.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::types::{
20    DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
21    Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
22};
23use risingwave_connector_codec::decoder::AccessExt;
24use risingwave_pb::plan_common::additional_column::ColumnType;
25use thiserror_ext::AsReport;
26
27use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
28use crate::parser::TransactionControl;
29use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
30use crate::parser::schema_change::TableChangeType;
31use crate::source::cdc::build_cdc_table_id;
32use crate::source::cdc::external::mysql::{
33    mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
34};
35use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
36use crate::source::{ConnectorProperties, SourceColumnDesc};
37
38/// Checks if a given default value expression is a BIGSERIAL default.
39///
40/// Normally, all unsupported expressions should fail in `from_text` parsing.
41/// However, we make a special exception for `nextval()` function because:
42/// 1. When users set a column as BIGSERIAL (usually as primary key), PostgreSQL automatically generates
43///    a default value expression like `nextval('sequence_name'::regclass)`
44/// 2. This is a very common scenario in real-world usage
45/// 3. For existing columns with `nextval()` default, we skip parsing the default value
46///    to avoid schema change failures
47///
48/// TODO: In the future, if we can distinguish between newly added columns and existing columns,
49/// we should modify this logic to:
50/// - Skip default value parsing for existing columns with `nextval()`
51/// - Report error for newly added columns with `nextval()` (since they should be handled differently)
52fn is_bigserial_default(default_value_expression: &str) -> bool {
53    if default_value_expression.trim().is_empty() {
54        return false;
55    }
56
57    let expr = default_value_expression.trim();
58
59    // Return true if it is a `nextval()` function (bigserial default).
60    expr.starts_with("nextval(")
61}
62
63// Example of Debezium JSON value:
64// {
65//     "payload":
66//     {
67//         "before": null,
68//         "after":
69//         {
70//             "O_ORDERKEY": 5,
71//             "O_CUSTKEY": 44485,
72//             "O_ORDERSTATUS": "F",
73//             "O_TOTALPRICE": "144659.20",
74//             "O_ORDERDATE": "1994-07-30"
75//         },
76//         "source":
77//         {
78//             "version": "1.9.7.Final",
79//             "connector": "mysql",
80//             "name": "RW_CDC_1002",
81//             "ts_ms": 1695277757000,
82//             "db": "mydb",
83//             "sequence": null,
84//             "table": "orders",
85//             "server_id": 0,
86//             "gtid": null,
87//             "file": "binlog.000008",
88//             "pos": 3693,
89//             "row": 0,
90//         },
91//         "op": "r",
92//         "ts_ms": 1695277757017,
93//         "transaction": null
94//     }
95// }
96pub struct DebeziumChangeEvent<A> {
97    value_accessor: Option<A>,
98    key_accessor: Option<A>,
99    is_mongodb: bool,
100}
101
102const BEFORE: &str = "before";
103const AFTER: &str = "after";
104
105const UPSTREAM_DDL: &str = "ddl";
106const SOURCE: &str = "source";
107const SOURCE_TS_MS: &str = "ts_ms";
108const SOURCE_DB: &str = "db";
109const SOURCE_SCHEMA: &str = "schema";
110const SOURCE_TABLE: &str = "table";
111const SOURCE_COLLECTION: &str = "collection";
112
113const OP: &str = "op";
114pub const TRANSACTION_STATUS: &str = "status";
115pub const TRANSACTION_ID: &str = "id";
116
117pub const TABLE_CHANGES: &str = "tableChanges";
118
119pub const DEBEZIUM_READ_OP: &str = "r";
120pub const DEBEZIUM_CREATE_OP: &str = "c";
121pub const DEBEZIUM_UPDATE_OP: &str = "u";
122pub const DEBEZIUM_DELETE_OP: &str = "d";
123
124pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
125pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
126
127pub fn parse_transaction_meta(
128    accessor: &impl Access,
129    connector_props: &ConnectorProperties,
130) -> AccessResult<TransactionControl> {
131    if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
132        accessor
133            .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
134            .to_datum_ref(),
135        accessor
136            .access(&[TRANSACTION_ID], &DataType::Varchar)?
137            .to_datum_ref(),
138    ) {
139        // The id field has different meanings for different databases:
140        // PG: txID:LSN
141        // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23)
142        // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002)
143        match status {
144            DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
145                ConnectorProperties::PostgresCdc(_) => {
146                    let (tx_id, _) = id.split_once(':').unwrap();
147                    return Ok(TransactionControl::Begin { id: tx_id.into() });
148                }
149                ConnectorProperties::MysqlCdc(_) => {
150                    return Ok(TransactionControl::Begin { id: id.into() });
151                }
152                ConnectorProperties::SqlServerCdc(_) => {
153                    return Ok(TransactionControl::Begin { id: id.into() });
154                }
155                _ => {}
156            },
157            DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
158                ConnectorProperties::PostgresCdc(_) => {
159                    let (tx_id, _) = id.split_once(':').unwrap();
160                    return Ok(TransactionControl::Commit { id: tx_id.into() });
161                }
162                ConnectorProperties::MysqlCdc(_) => {
163                    return Ok(TransactionControl::Commit { id: id.into() });
164                }
165                ConnectorProperties::SqlServerCdc(_) => {
166                    return Ok(TransactionControl::Commit { id: id.into() });
167                }
168                _ => {}
169            },
170            _ => {}
171        }
172    }
173
174    Err(AccessError::Undefined {
175        name: "transaction status".into(),
176        path: TRANSACTION_STATUS.into(),
177    })
178}
179
180macro_rules! jsonb_access_field {
181    ($col:expr, $field:expr, $as_type:tt) => {
182        $crate::paste! {
183            $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
184        }
185    };
186}
187
188/// Parse the schema change message from Debezium.
189/// The layout of MySQL schema change message can refer to
190/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
191pub fn parse_schema_change(
192    accessor: &impl Access,
193    source_id: u32,
194    source_name: &str,
195    connector_props: &ConnectorProperties,
196) -> AccessResult<SchemaChangeEnvelope> {
197    let mut schema_changes = vec![];
198
199    let upstream_ddl: String = accessor
200        .access(&[UPSTREAM_DDL], &DataType::Varchar)?
201        .to_owned_datum()
202        .unwrap()
203        .as_utf8()
204        .to_string();
205
206    if let Some(ScalarRefImpl::List(table_changes)) = accessor
207        .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
208        .to_datum_ref()
209    {
210        for datum in table_changes.iter() {
211            let jsonb = match datum {
212                Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
213                _ => unreachable!(""),
214            };
215            let id: String = jsonb_access_field!(jsonb, "id", string);
216            let ty = jsonb_access_field!(jsonb, "type", string);
217
218            let table_name = id.trim_matches('"').to_owned();
219            let ddl_type: TableChangeType = ty.as_str().into();
220            if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
221                tracing::debug!("skip table schema change for create/drop command");
222                continue;
223            }
224
225            let mut column_descs: Vec<ColumnDesc> = vec![];
226            if let Some(table) = jsonb.access_object_field("table")
227                && let Some(columns) = table.access_object_field("columns")
228            {
229                for col in columns.array_elements().unwrap() {
230                    let name = jsonb_access_field!(col, "name", string);
231                    let type_name = jsonb_access_field!(col, "typeName", string);
232                    // Determine if this column is an enum type
233                    let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
234                    let data_type = match *connector_props {
235                        ConnectorProperties::PostgresCdc(_) => {
236                            let ty = type_name_to_pg_type(type_name.as_str());
237                            if is_enum {
238                                tracing::debug!(target: "auto_schema_change",
239                                    "Convert PostgreSQL user defined enum type '{}' to VARCHAR", type_name);
240                                DataType::Varchar
241                            } else {
242                                match ty {
243                                    Some(ty) => match pg_type_to_rw_type(&ty) {
244                                        Ok(data_type) => data_type,
245                                        Err(err) => {
246                                            tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
247                                            return Err(AccessError::CdcAutoSchemaChangeError {
248                                                ty: type_name,
249                                                table_name: format!(
250                                                    "{}.{}",
251                                                    source_name, table_name
252                                                ),
253                                            });
254                                        }
255                                    },
256                                    None => {
257                                        return Err(AccessError::CdcAutoSchemaChangeError {
258                                            ty: type_name,
259                                            table_name: format!("{}.{}", source_name, table_name),
260                                        });
261                                    }
262                                }
263                            }
264                        }
265                        ConnectorProperties::MysqlCdc(_) => {
266                            let ty = type_name_to_mysql_type(type_name.as_str());
267                            match ty {
268                                Some(ty) => match mysql_type_to_rw_type(&ty) {
269                                    Ok(data_type) => data_type,
270                                    Err(err) => {
271                                        tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
272                                        return Err(AccessError::CdcAutoSchemaChangeError {
273                                            ty: type_name,
274                                            table_name: format!("{}.{}", source_name, table_name),
275                                        });
276                                    }
277                                },
278                                None => {
279                                    return Err(AccessError::CdcAutoSchemaChangeError {
280                                        ty: type_name,
281                                        table_name: format!("{}.{}", source_name, table_name),
282                                    });
283                                }
284                            }
285                        }
286                        _ => {
287                            unreachable!()
288                        }
289                    };
290
291                    // handle default value expression, currently we only support constant expression
292                    let column_desc = match col.access_object_field("defaultValueExpression") {
293                        Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
294                            let default_val_expr_str = default_val_expr_str.as_str().unwrap();
295                            // Only process constant default values
296                            if is_bigserial_default(default_val_expr_str) {
297                                tracing::warn!(target: "auto_schema_change",
298                                    "Ignoring unsupported BIGSERIAL default value expression: {}", default_val_expr_str);
299                                ColumnDesc::named(name, ColumnId::placeholder(), data_type)
300                            } else {
301                                let value_text: Option<String>;
302                                match *connector_props {
303                                    ConnectorProperties::PostgresCdc(_) => {
304                                        // default value of non-number data type will be stored as
305                                        // "'value'::type"
306                                        match default_val_expr_str
307                                            .split("::")
308                                            .map(|s| s.trim_matches('\''))
309                                            .next()
310                                        {
311                                            None => {
312                                                value_text = None;
313                                            }
314                                            Some(val_text) => {
315                                                value_text = Some(val_text.to_owned());
316                                            }
317                                        }
318                                    }
319                                    ConnectorProperties::MysqlCdc(_) => {
320                                        // mysql timestamp is mapped to timestamptz, we use UTC timezone to
321                                        // interpret its value
322                                        if data_type == DataType::Timestamptz {
323                                            value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
324                                                tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
325                                                AccessError::TypeError {
326                                                    expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
327                                                    got: data_type.to_string(),
328                                                    value: default_val_expr_str.to_owned(),
329                                                }
330                                            })?);
331                                        } else {
332                                            value_text = Some(default_val_expr_str.to_owned());
333                                        }
334                                    }
335                                    _ => {
336                                        unreachable!("connector doesn't support schema change")
337                                    }
338                                }
339
340                                let snapshot_value: Datum = if let Some(value_text) = value_text {
341                                    Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
342                                        |err| {
343                                            tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
344                                            AccessError::TypeError {
345                                                expected: "constant expression".into(),
346                                                got: data_type.to_string(),
347                                                value: value_text,
348                                            }
349                                        },
350                                    )?)
351                                } else {
352                                    None
353                                };
354
355                                if snapshot_value.is_none() {
356                                    tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
357                                    ColumnDesc::named(name, ColumnId::placeholder(), data_type)
358                                } else {
359                                    ColumnDesc::named_with_default_value(
360                                        name,
361                                        ColumnId::placeholder(),
362                                        data_type,
363                                        snapshot_value,
364                                    )
365                                }
366                            }
367                        }
368                        _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
369                    };
370                    column_descs.push(column_desc);
371                }
372            }
373
374            // concatenate the source_id to the cdc_table_id
375            let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
376            schema_changes.push(TableSchemaChange {
377                cdc_table_id,
378                columns: column_descs
379                    .into_iter()
380                    .map(|column_desc| ColumnCatalog {
381                        column_desc,
382                        is_hidden: false,
383                    })
384                    .collect_vec(),
385                change_type: ty.as_str().into(),
386                upstream_ddl: upstream_ddl.clone(),
387            });
388        }
389
390        Ok(SchemaChangeEnvelope {
391            table_changes: schema_changes,
392        })
393    } else {
394        Err(AccessError::Undefined {
395            name: "table schema change".into(),
396            path: TABLE_CHANGES.into(),
397        })
398    }
399}
400
401impl<A> DebeziumChangeEvent<A>
402where
403    A: Access,
404{
405    /// Panic: one of the `key_accessor` or `value_accessor` must be provided.
406    pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
407        assert!(key_accessor.is_some() || value_accessor.is_some());
408        Self {
409            value_accessor,
410            key_accessor,
411            is_mongodb: false,
412        }
413    }
414
415    pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
416        assert!(key_accessor.is_some() || value_accessor.is_some());
417        Self {
418            value_accessor,
419            key_accessor,
420            is_mongodb: true,
421        }
422    }
423
424    /// Returns the transaction metadata if exists.
425    ///
426    /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details.
427    pub(crate) fn transaction_control(
428        &self,
429        connector_props: &ConnectorProperties,
430    ) -> Option<TransactionControl> {
431        // Ignore if `value_accessor` is not provided or there's any error when
432        // trying to parse the transaction metadata.
433        self.value_accessor
434            .as_ref()
435            .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
436    }
437}
438
439impl<A> ChangeEvent for DebeziumChangeEvent<A>
440where
441    A: Access,
442{
443    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
444        match self.op()? {
445            ChangeEventOperation::Delete => {
446                // For delete events of MongoDB, the "before" and "after" field both are null in the value,
447                // we need to extract the _id field from the key.
448                if self.is_mongodb && desc.name == "_id" {
449                    return self
450                        .key_accessor
451                        .as_ref()
452                        .expect("key_accessor must be provided for delete operation")
453                        .access(&[&desc.name], &desc.data_type);
454                }
455
456                if let Some(va) = self.value_accessor.as_ref() {
457                    va.access(&[BEFORE, &desc.name], &desc.data_type)
458                } else {
459                    self.key_accessor
460                        .as_ref()
461                        .unwrap()
462                        .access(&[&desc.name], &desc.data_type)
463                }
464            }
465
466            // value should not be None.
467            ChangeEventOperation::Upsert => {
468                // For upsert operation, if desc is an additional column, access field in the `SOURCE` field.
469                desc.additional_column.column_type.as_ref().map_or_else(
470                    || {
471                        self.value_accessor
472                            .as_ref()
473                            .expect("value_accessor must be provided for upsert operation")
474                            .access(&[AFTER, &desc.name], &desc.data_type)
475                    },
476                    |additional_column_type| {
477                        match *additional_column_type {
478                            ColumnType::Timestamp(_) => {
479                                // access payload.source.ts_ms
480                                let ts_ms = self
481                                    .value_accessor
482                                    .as_ref()
483                                    .expect("value_accessor must be provided for upsert operation")
484                                    .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
485                                Ok(DatumCow::Owned(ts_ms.map(|scalar| {
486                                    Timestamptz::from_millis(scalar.into_int64())
487                                        .expect("source.ts_ms must in millisecond")
488                                        .to_scalar_value()
489                                })))
490                            }
491                            ColumnType::DatabaseName(_) => self
492                                .value_accessor
493                                .as_ref()
494                                .expect("value_accessor must be provided for upsert operation")
495                                .access(&[SOURCE, SOURCE_DB], &desc.data_type),
496                            ColumnType::SchemaName(_) => self
497                                .value_accessor
498                                .as_ref()
499                                .expect("value_accessor must be provided for upsert operation")
500                                .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
501                            ColumnType::TableName(_) => self
502                                .value_accessor
503                                .as_ref()
504                                .expect("value_accessor must be provided for upsert operation")
505                                .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
506                            ColumnType::CollectionName(_) => self
507                                .value_accessor
508                                .as_ref()
509                                .expect("value_accessor must be provided for upsert operation")
510                                .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
511                            _ => Err(AccessError::UnsupportedAdditionalColumn {
512                                name: desc.name.clone(),
513                            }),
514                        }
515                    },
516                )
517            }
518        }
519    }
520
521    fn op(&self) -> Result<ChangeEventOperation, AccessError> {
522        if let Some(accessor) = &self.value_accessor {
523            if let Some(ScalarRefImpl::Utf8(op)) =
524                accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
525            {
526                match op {
527                    DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
528                        return Ok(ChangeEventOperation::Upsert);
529                    }
530                    DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
531                    _ => (),
532                }
533            }
534            Err(super::AccessError::Undefined {
535                name: "op".into(),
536                path: Default::default(),
537            })
538        } else {
539            Ok(ChangeEventOperation::Delete)
540        }
541    }
542}
543
544/// Access support for Mongo
545///
546/// For now, we considerate `strong_schema` typed `MongoDB` Debezium event jsons only.
547pub struct MongoJsonAccess<A> {
548    accessor: A,
549    strong_schema: bool,
550}
551
552pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
553    let id_field = if let Some(value) = bson_doc.get("_id") {
554        value
555    } else {
556        bson_doc
557    };
558
559    let type_error = || AccessError::TypeError {
560        expected: id_type.to_string(),
561        got: match id_field {
562            serde_json::Value::Null => "null",
563            serde_json::Value::Bool(_) => "bool",
564            serde_json::Value::Number(_) => "number",
565            serde_json::Value::String(_) => "string",
566            serde_json::Value::Array(_) => "array",
567            serde_json::Value::Object(_) => "object",
568        }
569        .to_owned(),
570        value: id_field.to_string(),
571    };
572
573    let id: Datum = match id_type {
574        DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
575        DataType::Varchar => match id_field {
576            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
577            serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
578                obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
579            )),
580            _ => return Err(type_error()),
581        },
582        DataType::Int32 => {
583            if let serde_json::Value::Object(obj) = id_field
584                && obj.contains_key("$numberInt")
585            {
586                let int_str = obj["$numberInt"].as_str().unwrap_or_default();
587                Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
588            } else {
589                return Err(type_error());
590            }
591        }
592        DataType::Int64 => {
593            if let serde_json::Value::Object(obj) = id_field
594                && obj.contains_key("$numberLong")
595            {
596                let int_str = obj["$numberLong"].as_str().unwrap_or_default();
597                Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
598            } else {
599                return Err(type_error());
600            }
601        }
602        _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
603    };
604    Ok(id)
605}
606
607/// Extract the field data from the bson document
608///
609/// BSON document is a JSON object with some special fields, such as:
610/// long integer: {"$numberLong": "1630454400000"}
611/// date time: {"$date": {"$numberLong": "1630454400000"}}
612///
613/// For now, we support only the Canonical format of the date and timestamp.
614///
615/// # NOTE:
616///
617/// - `field` indicates the field name in the bson document, if it is None, the `bson_doc` is the field itself.
618// similar to extract the "_id" field from the message payload
619pub fn extract_bson_field(
620    type_expected: &DataType,
621    bson_doc: &serde_json::Value,
622    field: Option<&str>,
623) -> AccessResult {
624    let type_error = |datum: &serde_json::Value| AccessError::TypeError {
625        expected: type_expected.to_string(),
626        got: match bson_doc {
627            serde_json::Value::Null => "null",
628            serde_json::Value::Bool(_) => "bool",
629            serde_json::Value::Number(_) => "number",
630            serde_json::Value::String(_) => "string",
631            serde_json::Value::Array(_) => "array",
632            serde_json::Value::Object(_) => "object",
633        }
634        .to_owned(),
635        value: datum.to_string(),
636    };
637
638    let datum = if let Some(field) = field {
639        let Some(bson_doc) = bson_doc.get(field) else {
640            return Err(type_error(bson_doc));
641        };
642        bson_doc
643    } else {
644        bson_doc
645    };
646
647    if datum.is_null() {
648        return Ok(None);
649    }
650
651    let field_datum: Datum = match type_expected {
652        DataType::Boolean => {
653            if datum.is_boolean() {
654                Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
655            } else {
656                return Err(type_error(datum));
657            }
658        }
659        DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
660        DataType::Varchar => match datum {
661            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
662            serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
663                obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
664            }
665            _ => return Err(type_error(datum)),
666        },
667        DataType::Int16
668        | DataType::Int32
669        | DataType::Int64
670        | DataType::Int256
671        | DataType::Float32
672        | DataType::Float64 => {
673            if !datum.is_object() {
674                return Err(type_error(datum));
675            };
676
677            bson_extract_number(datum, type_expected)?
678        }
679
680        DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
681            if let serde_json::Value::Object(mp) = datum {
682                if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
683                    bson_extract_timestamp(datum, type_expected)?
684                } else if mp.contains_key("$date") {
685                    bson_extract_date(datum, type_expected)?
686                } else {
687                    return Err(type_error(datum));
688                }
689            } else {
690                return Err(type_error(datum));
691            }
692        }
693        DataType::Decimal => {
694            if let serde_json::Value::Object(obj) = datum
695                && obj.contains_key("$numberDecimal")
696                && obj["$numberDecimal"].is_string()
697            {
698                let number = obj["$numberDecimal"].as_str().unwrap();
699
700                let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
701                    AccessError::TypeError {
702                        expected: type_expected.to_string(),
703                        got: "unparsable string".into(),
704                        value: number.to_owned(),
705                    }
706                })?;
707                Some(ScalarImpl::Decimal(dec))
708            } else {
709                return Err(type_error(datum));
710            }
711        }
712
713        DataType::Bytea => {
714            if let serde_json::Value::Object(obj) = datum
715                && obj.contains_key("$binary")
716                && obj["$binary"].is_object()
717            {
718                use base64::Engine;
719
720                let binary = obj["$binary"].as_object().unwrap();
721
722                if !binary.contains_key("$base64")
723                    || !binary["$base64"].is_string()
724                    || !binary.contains_key("$subType")
725                    || !binary["$subType"].is_string()
726                {
727                    return Err(AccessError::TypeError {
728                        expected: type_expected.to_string(),
729                        got: "object".into(),
730                        value: datum.to_string(),
731                    });
732                }
733
734                let b64_str = binary["$base64"]
735                    .as_str()
736                    .ok_or_else(|| AccessError::TypeError {
737                        expected: type_expected.to_string(),
738                        got: "object".into(),
739                        value: datum.to_string(),
740                    })?;
741
742                // type is not used for now
743                let _type_str =
744                    binary["$subType"]
745                        .as_str()
746                        .ok_or_else(|| AccessError::TypeError {
747                            expected: type_expected.to_string(),
748                            got: "object".into(),
749                            value: datum.to_string(),
750                        })?;
751
752                let bytes = base64::prelude::BASE64_STANDARD
753                    .decode(b64_str)
754                    .map_err(|_| AccessError::TypeError {
755                        expected: "$binary object with $base64 string and $subType string field"
756                            .to_owned(),
757                        got: "string".to_owned(),
758                        value: bson_doc.to_string(),
759                    })?;
760                let bytea = ScalarImpl::Bytea(bytes.into());
761                Some(bytea)
762            } else {
763                return Err(type_error(datum));
764            }
765        }
766
767        DataType::Struct(struct_fields) => {
768            let mut datums = vec![];
769            for (field_name, field_type) in struct_fields.iter() {
770                let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
771                datums.push(field_datum);
772            }
773            let value = StructValue::new(datums);
774
775            Some(ScalarImpl::Struct(value))
776        }
777
778        DataType::List(list_type) => {
779            let elem_type = list_type.elem();
780            let Some(d_array) = datum.as_array() else {
781                return Err(type_error(datum));
782            };
783
784            let mut builder = elem_type.create_array_builder(d_array.len());
785            for item in d_array {
786                builder.append(extract_bson_field(elem_type, item, None)?);
787            }
788            Some(ScalarImpl::from(ListValue::new(builder.finish())))
789        }
790
791        _ => {
792            if let Some(field_name) = field {
793                unreachable!(
794                    "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
795                )
796            } else {
797                let type_expected = type_expected.to_string();
798                unreachable!(
799                    "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
800                )
801            }
802        }
803    };
804    Ok(field_datum)
805}
806
807fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
808    let field_name = match type_expected {
809        DataType::Int16 => "$numberInt",
810        DataType::Int32 => "$numberInt",
811        DataType::Int64 => "$numberLong",
812        DataType::Int256 => "$numberLong",
813        DataType::Float32 => "$numberDouble",
814        DataType::Float64 => "$numberDouble",
815        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
816    };
817
818    let datum = bson_doc.get(field_name);
819    if datum.is_none() {
820        return Err(AccessError::TypeError {
821            expected: type_expected.to_string(),
822            got: "object".into(),
823            value: bson_doc.to_string(),
824        });
825    }
826
827    let datum = datum.unwrap();
828
829    if datum.is_string() {
830        let Some(num_str) = datum.as_str() else {
831            return Err(AccessError::TypeError {
832                expected: type_expected.to_string(),
833                got: "string".into(),
834                value: datum.to_string(),
835            });
836        };
837        // parse to float
838        if [DataType::Float32, DataType::Float64].contains(type_expected) {
839            match (num_str, type_expected) {
840                ("Infinity", DataType::Float64) => {
841                    return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
842                }
843                ("Infinity", DataType::Float32) => {
844                    return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
845                }
846                ("-Infinity", DataType::Float64) => {
847                    return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
848                }
849                ("-Infinity", DataType::Float32) => {
850                    return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
851                }
852                ("NaN", DataType::Float64) => {
853                    return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
854                }
855                ("NaN", DataType::Float32) => {
856                    return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
857                }
858                _ => {}
859            }
860
861            let parsed_num: f64 = match num_str.parse() {
862                Ok(n) => n,
863                Err(_e) => {
864                    return Err(AccessError::TypeError {
865                        expected: type_expected.to_string(),
866                        got: "string".into(),
867                        value: num_str.to_owned(),
868                    });
869                }
870            };
871            if *type_expected == DataType::Float64 {
872                return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
873            } else {
874                let parsed_num = parsed_num as f32;
875                return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
876            }
877        }
878        // parse to large int
879        if *type_expected == DataType::Int256 {
880            let parsed_num = match Int256::from_str(num_str) {
881                Ok(n) => n,
882                Err(_) => {
883                    return Err(AccessError::TypeError {
884                        expected: type_expected.to_string(),
885                        got: "string".into(),
886                        value: num_str.to_owned(),
887                    });
888                }
889            };
890            return Ok(Some(ScalarImpl::Int256(parsed_num)));
891        }
892
893        // parse to integer
894        let parsed_num: i64 = match num_str.parse() {
895            Ok(n) => n,
896            Err(_e) => {
897                return Err(AccessError::TypeError {
898                    expected: type_expected.to_string(),
899                    got: "string".into(),
900                    value: num_str.to_owned(),
901                });
902            }
903        };
904        match type_expected {
905            DataType::Int16 => {
906                if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
907                    return Err(AccessError::TypeError {
908                        expected: type_expected.to_string(),
909                        got: "string".into(),
910                        value: num_str.to_owned(),
911                    });
912                }
913                return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
914            }
915            DataType::Int32 => {
916                if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
917                    return Err(AccessError::TypeError {
918                        expected: type_expected.to_string(),
919                        got: "string".into(),
920                        value: num_str.to_owned(),
921                    });
922                }
923                return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
924            }
925            DataType::Int64 => {
926                return Ok(Some(ScalarImpl::Int64(parsed_num)));
927            }
928            _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
929        }
930    }
931    if datum.is_null() {
932        return Err(AccessError::TypeError {
933            expected: type_expected.to_string(),
934            got: "null".into(),
935            value: bson_doc.to_string(),
936        });
937    }
938
939    if datum.is_array() {
940        return Err(AccessError::TypeError {
941            expected: type_expected.to_string(),
942            got: "array".to_owned(),
943            value: datum.to_string(),
944        });
945    }
946
947    if datum.is_object() {
948        return Err(AccessError::TypeError {
949            expected: type_expected.to_string(),
950            got: "object".to_owned(),
951            value: datum.to_string(),
952        });
953    }
954
955    if datum.is_boolean() {
956        return Err(AccessError::TypeError {
957            expected: type_expected.to_string(),
958            got: "boolean".into(),
959            value: bson_doc.to_string(),
960        });
961    }
962
963    if datum.is_number() {
964        let got_type = if datum.is_f64() { "f64" } else { "i64" };
965        return Err(AccessError::TypeError {
966            expected: type_expected.to_string(),
967            got: got_type.into(),
968            value: bson_doc.to_string(),
969        });
970    }
971
972    Err(AccessError::TypeError {
973        expected: type_expected.to_string(),
974        got: "unknown".into(),
975        value: bson_doc.to_string(),
976    })
977}
978
979fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
980    // according to mongodb extended json v2
981    // the date could be:
982    //
983    // the timestamp type could be:
984    //
985    // both Canonical and Relaxed format:
986    // {"$timestamp": {"t": 1630454400, "i": 1}}
987    //
988    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
989    // date is encoded as number of milliseconds since the Unix epoch
990    //
991    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
992    // date is encoded as ISO8601 string
993
994    let datum = &bson_doc["$date"];
995
996    let type_error = || AccessError::TypeError {
997        expected: type_expected.to_string(),
998        got: match bson_doc {
999            serde_json::Value::Null => "null",
1000            serde_json::Value::Bool(_) => "bool",
1001            serde_json::Value::Number(_) => "number",
1002            serde_json::Value::String(_) => "string",
1003            serde_json::Value::Array(_) => "array",
1004            serde_json::Value::Object(_) => "object",
1005        }
1006        .to_owned(),
1007        value: datum.to_string(),
1008    };
1009
1010    // deal with the Canonical format only
1011    let millis = match datum {
1012        // Canonical format {"$date": {"$numberLong": "1630454400000"}}
1013        serde_json::Value::Object(obj)
1014            if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1015        {
1016            obj["$numberLong"]
1017                .as_str()
1018                .unwrap()
1019                .parse::<i64>()
1020                .map_err(|_| AccessError::TypeError {
1021                    expected: "timestamp".into(),
1022                    got: "object".into(),
1023                    value: datum.to_string(),
1024                })?
1025        }
1026        // Relaxed format {"$date": "2021-09-01T00:00:00.000Z"}
1027        serde_json::Value::String(s) => {
1028            let dt =
1029                chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1030                    expected: "valid ISO-8601 date string".into(),
1031                    got: "string".into(),
1032                    value: datum.to_string(),
1033                })?;
1034            dt.timestamp_millis()
1035        }
1036
1037        // jsonv1 format
1038        // {"$date": 1630454400000}
1039        serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1040            expected: "timestamp".into(),
1041            got: "number".into(),
1042            value: datum.to_string(),
1043        })?,
1044
1045        _ => return Err(type_error()),
1046    };
1047
1048    let datetime =
1049        chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1050            expected: "timestamp".into(),
1051            got: "object".into(),
1052            value: datum.to_string(),
1053        })?;
1054
1055    let res = match type_expected {
1056        DataType::Date => {
1057            let naive = datetime.naive_local();
1058            let dt = naive.date();
1059            Some(ScalarImpl::Date(dt.into()))
1060        }
1061        DataType::Time => {
1062            let naive = datetime.naive_local();
1063            let dt = naive.time();
1064            Some(ScalarImpl::Time(dt.into()))
1065        }
1066        DataType::Timestamp => {
1067            let naive = datetime.naive_local();
1068            let dt = Timestamp::from(naive);
1069            Some(ScalarImpl::Timestamp(dt))
1070        }
1071        DataType::Timestamptz => {
1072            let dt = datetime.into();
1073            Some(ScalarImpl::Timestamptz(dt))
1074        }
1075        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1076    };
1077    Ok(res)
1078}
1079
1080fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1081    // according to mongodb extended json v2
1082    // the date could be:
1083    //
1084    // the timestamp type could be:
1085    //
1086    // both Canonical and Relaxed format:
1087    // {"$timestamp": {"t": 1630454400, "i": 1}}
1088    // t is the number of seconds since the Unix epoch
1089    //
1090    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1091    // date is encoded as number of milliseconds since the Unix epoch
1092    //
1093    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1094    // date is encoded as ISO8601 string
1095    //
1096    // *For now, we support the Canonical format only.*
1097
1098    let Some(obj) = bson_doc["$timestamp"].as_object() else {
1099        return Err(AccessError::TypeError {
1100            expected: "timestamp".into(),
1101            got: "object".into(),
1102            value: bson_doc.to_string(),
1103        });
1104    };
1105
1106    if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1107    {
1108        return Err(AccessError::TypeError {
1109            expected: "timestamp with valid seconds since epoch".into(),
1110            got: "object".into(),
1111            value: bson_doc.to_string(),
1112        });
1113    }
1114
1115    let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1116        expected: "timestamp with valid seconds since epoch".into(),
1117        got: "object".into(),
1118        value: bson_doc.to_string(),
1119    })?;
1120
1121    let chrono_datetime =
1122        chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1123            expected: type_expected.to_string(),
1124            got: "object".to_owned(),
1125            value: bson_doc.to_string(),
1126        })?;
1127
1128    let res = match type_expected {
1129        DataType::Date => {
1130            let naive = chrono_datetime.naive_local();
1131            let dt = naive.date();
1132            Some(ScalarImpl::Date(dt.into()))
1133        }
1134        DataType::Time => {
1135            let naive = chrono_datetime.naive_local();
1136            let dt = naive.time();
1137            Some(ScalarImpl::Time(dt.into()))
1138        }
1139        DataType::Timestamp => {
1140            let naive = chrono_datetime.naive_local();
1141            let dt = Timestamp::from(naive);
1142            Some(ScalarImpl::Timestamp(dt))
1143        }
1144        DataType::Timestamptz => {
1145            let dt = chrono_datetime.into();
1146            Some(ScalarImpl::Timestamptz(dt))
1147        }
1148        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1149    };
1150
1151    Ok(res)
1152}
1153
1154impl<A> MongoJsonAccess<A> {
1155    pub fn new(accessor: A, strong_schema: bool) -> Self {
1156        Self {
1157            accessor,
1158            strong_schema,
1159        }
1160    }
1161}
1162
1163impl<A> Access for MongoJsonAccess<A>
1164where
1165    A: Access,
1166{
1167    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1168        match path {
1169            ["after" | "before", "_id"] => {
1170                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1171                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1172                    Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1173                } else {
1174                    // fail to extract the "_id" field from the message payload
1175                    Err(AccessError::Undefined {
1176                        name: "_id".to_owned(),
1177                        path: path[0].to_owned(),
1178                    })?
1179                }
1180            }
1181
1182            ["after" | "before", "payload"] if !self.strong_schema => {
1183                self.access(&[path[0]], &DataType::Jsonb)
1184            }
1185
1186            ["after" | "before", field] if self.strong_schema => {
1187                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1188                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1189                    Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1190                } else {
1191                    // fail to extract the expected field from the message payload
1192                    Err(AccessError::Undefined {
1193                        name: field.to_string(),
1194                        path: path[0].to_owned(),
1195                    })?
1196                }
1197            }
1198
1199            // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
1200            // In addition, the "_id" field is named as "id" in the key. An example of message key:
1201            // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
1202            ["_id"] => {
1203                let ret = self.accessor.access(path, type_expected);
1204                if matches!(ret, Err(AccessError::Undefined { .. })) {
1205                    let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1206                    if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1207                        Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1208                    } else {
1209                        // fail to extract the "_id" field from the message key
1210                        Err(AccessError::Undefined {
1211                            name: "_id".to_owned(),
1212                            path: "id".to_owned(),
1213                        })?
1214                    }
1215                } else {
1216                    ret
1217                }
1218            }
1219            _ => self.accessor.access(path, type_expected),
1220        }
1221    }
1222}