risingwave_connector/parser/unified/
debezium.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::id::SourceId;
20use risingwave_common::types::{
21    DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
22    Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
23};
24use risingwave_connector_codec::decoder::AccessExt;
25use risingwave_pb::plan_common::additional_column::ColumnType;
26use thiserror_ext::AsReport;
27
28use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
29use crate::parser::TransactionControl;
30use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
31use crate::parser::schema_change::TableChangeType;
32use crate::source::cdc::build_cdc_table_id;
33use crate::source::cdc::external::mysql::{
34    mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
35};
36use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
37use crate::source::{ConnectorProperties, SourceColumnDesc};
38
39/// Checks if a given default value expression is a BIGSERIAL default.
40///
41/// Normally, all unsupported expressions should fail in `from_text` parsing.
42/// However, we make a special exception for `nextval()` function because:
43/// 1. When users set a column as BIGSERIAL (usually as primary key), PostgreSQL automatically generates
44///    a default value expression like `nextval('sequence_name'::regclass)`
45/// 2. This is a very common scenario in real-world usage
46/// 3. For existing columns with `nextval()` default, we skip parsing the default value
47///    to avoid schema change failures
48///
49/// TODO: In the future, if we can distinguish between newly added columns and existing columns,
50/// we should modify this logic to:
51/// - Skip default value parsing for existing columns with `nextval()`
52/// - Report error for newly added columns with `nextval()` (since they should be handled differently)
53fn is_bigserial_default(default_value_expression: &str) -> bool {
54    if default_value_expression.trim().is_empty() {
55        return false;
56    }
57
58    let expr = default_value_expression.trim();
59
60    // Return true if it is a `nextval()` function (bigserial default).
61    expr.starts_with("nextval(")
62}
63
64// Example of Debezium JSON value:
65// {
66//     "payload":
67//     {
68//         "before": null,
69//         "after":
70//         {
71//             "O_ORDERKEY": 5,
72//             "O_CUSTKEY": 44485,
73//             "O_ORDERSTATUS": "F",
74//             "O_TOTALPRICE": "144659.20",
75//             "O_ORDERDATE": "1994-07-30"
76//         },
77//         "source":
78//         {
79//             "version": "1.9.7.Final",
80//             "connector": "mysql",
81//             "name": "RW_CDC_1002",
82//             "ts_ms": 1695277757000,
83//             "db": "mydb",
84//             "sequence": null,
85//             "table": "orders",
86//             "server_id": 0,
87//             "gtid": null,
88//             "file": "binlog.000008",
89//             "pos": 3693,
90//             "row": 0,
91//         },
92//         "op": "r",
93//         "ts_ms": 1695277757017,
94//         "transaction": null
95//     }
96// }
97pub struct DebeziumChangeEvent<A> {
98    value_accessor: Option<A>,
99    key_accessor: Option<A>,
100    is_mongodb: bool,
101}
102
103const BEFORE: &str = "before";
104const AFTER: &str = "after";
105
106const UPSTREAM_DDL: &str = "ddl";
107const SOURCE: &str = "source";
108const SOURCE_TS_MS: &str = "ts_ms";
109const SOURCE_DB: &str = "db";
110const SOURCE_SCHEMA: &str = "schema";
111const SOURCE_TABLE: &str = "table";
112const SOURCE_COLLECTION: &str = "collection";
113
114const OP: &str = "op";
115pub const TRANSACTION_STATUS: &str = "status";
116pub const TRANSACTION_ID: &str = "id";
117
118pub const TABLE_CHANGES: &str = "tableChanges";
119
120pub const DEBEZIUM_READ_OP: &str = "r";
121pub const DEBEZIUM_CREATE_OP: &str = "c";
122pub const DEBEZIUM_UPDATE_OP: &str = "u";
123pub const DEBEZIUM_DELETE_OP: &str = "d";
124
125pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
126pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
127
128pub fn parse_transaction_meta(
129    accessor: &impl Access,
130    connector_props: &ConnectorProperties,
131) -> AccessResult<TransactionControl> {
132    if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
133        accessor
134            .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
135            .to_datum_ref(),
136        accessor
137            .access(&[TRANSACTION_ID], &DataType::Varchar)?
138            .to_datum_ref(),
139    ) {
140        // The id field has different meanings for different databases:
141        // PG: txID:LSN
142        // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23)
143        // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002)
144        match status {
145            DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
146                ConnectorProperties::PostgresCdc(_) => {
147                    let (tx_id, _) = id.split_once(':').unwrap();
148                    return Ok(TransactionControl::Begin { id: tx_id.into() });
149                }
150                ConnectorProperties::MysqlCdc(_) => {
151                    return Ok(TransactionControl::Begin { id: id.into() });
152                }
153                ConnectorProperties::SqlServerCdc(_) => {
154                    return Ok(TransactionControl::Begin { id: id.into() });
155                }
156                _ => {}
157            },
158            DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
159                ConnectorProperties::PostgresCdc(_) => {
160                    let (tx_id, _) = id.split_once(':').unwrap();
161                    return Ok(TransactionControl::Commit { id: tx_id.into() });
162                }
163                ConnectorProperties::MysqlCdc(_) => {
164                    return Ok(TransactionControl::Commit { id: id.into() });
165                }
166                ConnectorProperties::SqlServerCdc(_) => {
167                    return Ok(TransactionControl::Commit { id: id.into() });
168                }
169                _ => {}
170            },
171            _ => {}
172        }
173    }
174
175    Err(AccessError::Undefined {
176        name: "transaction status".into(),
177        path: TRANSACTION_STATUS.into(),
178    })
179}
180
181macro_rules! jsonb_access_field {
182    ($col:expr, $field:expr, $as_type:tt) => {
183        $crate::paste! {
184            $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
185        }
186    };
187}
188
189/// Parse the schema change message from Debezium.
190/// The layout of MySQL schema change message can refer to
191/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
192pub fn parse_schema_change(
193    accessor: &impl Access,
194    source_id: SourceId,
195    source_name: &str,
196    connector_props: &ConnectorProperties,
197) -> AccessResult<SchemaChangeEnvelope> {
198    let mut schema_changes = vec![];
199
200    let upstream_ddl: String = accessor
201        .access(&[UPSTREAM_DDL], &DataType::Varchar)?
202        .to_owned_datum()
203        .unwrap()
204        .as_utf8()
205        .to_string();
206
207    if let Some(ScalarRefImpl::List(table_changes)) = accessor
208        .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
209        .to_datum_ref()
210    {
211        for datum in table_changes.iter() {
212            let jsonb = match datum {
213                Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
214                _ => unreachable!(""),
215            };
216            let id: String = jsonb_access_field!(jsonb, "id", string);
217            let ty = jsonb_access_field!(jsonb, "type", string);
218
219            let table_name = id.trim_matches('"').to_owned();
220            let ddl_type: TableChangeType = ty.as_str().into();
221            if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
222                tracing::debug!("skip table schema change for create/drop command");
223                continue;
224            }
225
226            let mut column_descs: Vec<ColumnDesc> = vec![];
227            if let Some(table) = jsonb.access_object_field("table")
228                && let Some(columns) = table.access_object_field("columns")
229            {
230                for col in columns.array_elements().unwrap() {
231                    let name = jsonb_access_field!(col, "name", string);
232                    let type_name = jsonb_access_field!(col, "typeName", string);
233                    // Determine if this column is an enum type
234                    let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
235                    let data_type = match *connector_props {
236                        ConnectorProperties::PostgresCdc(_) => {
237                            let ty = type_name_to_pg_type(type_name.as_str());
238                            if is_enum {
239                                tracing::debug!(target: "auto_schema_change",
240                                    "Convert PostgreSQL user defined enum type '{}' to VARCHAR", type_name);
241                                DataType::Varchar
242                            } else {
243                                match ty {
244                                    Some(ty) => match pg_type_to_rw_type(&ty) {
245                                        Ok(data_type) => data_type,
246                                        Err(err) => {
247                                            tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
248                                            return Err(AccessError::CdcAutoSchemaChangeError {
249                                                ty: type_name,
250                                                table_name: format!(
251                                                    "{}.{}",
252                                                    source_name, table_name
253                                                ),
254                                            });
255                                        }
256                                    },
257                                    None => {
258                                        return Err(AccessError::CdcAutoSchemaChangeError {
259                                            ty: type_name,
260                                            table_name: format!("{}.{}", source_name, table_name),
261                                        });
262                                    }
263                                }
264                            }
265                        }
266                        ConnectorProperties::MysqlCdc(_) => {
267                            let ty = type_name_to_mysql_type(type_name.as_str());
268                            match ty {
269                                Some(ty) => match mysql_type_to_rw_type(&ty) {
270                                    Ok(data_type) => data_type,
271                                    Err(err) => {
272                                        tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
273                                        return Err(AccessError::CdcAutoSchemaChangeError {
274                                            ty: type_name,
275                                            table_name: format!("{}.{}", source_name, table_name),
276                                        });
277                                    }
278                                },
279                                None => {
280                                    return Err(AccessError::CdcAutoSchemaChangeError {
281                                        ty: type_name,
282                                        table_name: format!("{}.{}", source_name, table_name),
283                                    });
284                                }
285                            }
286                        }
287                        _ => {
288                            unreachable!()
289                        }
290                    };
291
292                    // handle default value expression, currently we only support constant expression
293                    let column_desc = match col.access_object_field("defaultValueExpression") {
294                        Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
295                            let default_val_expr_str = default_val_expr_str.as_str().unwrap();
296                            // Only process constant default values
297                            if is_bigserial_default(default_val_expr_str) {
298                                tracing::warn!(target: "auto_schema_change",
299                                    "Ignoring unsupported BIGSERIAL default value expression: {}", default_val_expr_str);
300                                ColumnDesc::named(name, ColumnId::placeholder(), data_type)
301                            } else {
302                                let value_text: Option<String>;
303                                match *connector_props {
304                                    ConnectorProperties::PostgresCdc(_) => {
305                                        // default value of non-number data type will be stored as
306                                        // "'value'::type"
307                                        match default_val_expr_str
308                                            .split("::")
309                                            .map(|s| s.trim_matches('\''))
310                                            .next()
311                                        {
312                                            None => {
313                                                value_text = None;
314                                            }
315                                            Some(val_text) => {
316                                                value_text = Some(val_text.to_owned());
317                                            }
318                                        }
319                                    }
320                                    ConnectorProperties::MysqlCdc(_) => {
321                                        // mysql timestamp is mapped to timestamptz, we use UTC timezone to
322                                        // interpret its value
323                                        if data_type == DataType::Timestamptz {
324                                            value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
325                                                tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
326                                                AccessError::TypeError {
327                                                    expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
328                                                    got: data_type.to_string(),
329                                                    value: default_val_expr_str.to_owned(),
330                                                }
331                                            })?);
332                                        } else {
333                                            value_text = Some(default_val_expr_str.to_owned());
334                                        }
335                                    }
336                                    _ => {
337                                        unreachable!("connector doesn't support schema change")
338                                    }
339                                }
340
341                                let snapshot_value: Datum = if let Some(value_text) = value_text {
342                                    Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
343                                        |err| {
344                                            tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
345                                            AccessError::TypeError {
346                                                expected: "constant expression".into(),
347                                                got: data_type.to_string(),
348                                                value: value_text,
349                                            }
350                                        },
351                                    )?)
352                                } else {
353                                    None
354                                };
355
356                                if snapshot_value.is_none() {
357                                    tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
358                                    ColumnDesc::named(name, ColumnId::placeholder(), data_type)
359                                } else {
360                                    ColumnDesc::named_with_default_value(
361                                        name,
362                                        ColumnId::placeholder(),
363                                        data_type,
364                                        snapshot_value,
365                                    )
366                                }
367                            }
368                        }
369                        _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
370                    };
371                    column_descs.push(column_desc);
372                }
373            }
374
375            // concatenate the source_id to the cdc_table_id
376            let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
377            schema_changes.push(TableSchemaChange {
378                cdc_table_id,
379                columns: column_descs
380                    .into_iter()
381                    .map(|column_desc| ColumnCatalog {
382                        column_desc,
383                        is_hidden: false,
384                    })
385                    .collect_vec(),
386                change_type: ty.as_str().into(),
387                upstream_ddl: upstream_ddl.clone(),
388            });
389        }
390
391        Ok(SchemaChangeEnvelope {
392            table_changes: schema_changes,
393        })
394    } else {
395        Err(AccessError::Undefined {
396            name: "table schema change".into(),
397            path: TABLE_CHANGES.into(),
398        })
399    }
400}
401
402impl<A> DebeziumChangeEvent<A>
403where
404    A: Access,
405{
406    /// Panic: one of the `key_accessor` or `value_accessor` must be provided.
407    pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
408        assert!(key_accessor.is_some() || value_accessor.is_some());
409        Self {
410            value_accessor,
411            key_accessor,
412            is_mongodb: false,
413        }
414    }
415
416    pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
417        assert!(key_accessor.is_some() || value_accessor.is_some());
418        Self {
419            value_accessor,
420            key_accessor,
421            is_mongodb: true,
422        }
423    }
424
425    /// Returns the transaction metadata if exists.
426    ///
427    /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details.
428    pub(crate) fn transaction_control(
429        &self,
430        connector_props: &ConnectorProperties,
431    ) -> Option<TransactionControl> {
432        // Ignore if `value_accessor` is not provided or there's any error when
433        // trying to parse the transaction metadata.
434        self.value_accessor
435            .as_ref()
436            .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
437    }
438}
439
440impl<A> ChangeEvent for DebeziumChangeEvent<A>
441where
442    A: Access,
443{
444    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
445        match self.op()? {
446            ChangeEventOperation::Delete => {
447                // For delete events of MongoDB, the "before" and "after" field both are null in the value,
448                // we need to extract the _id field from the key.
449                if self.is_mongodb && desc.name == "_id" {
450                    return self
451                        .key_accessor
452                        .as_ref()
453                        .expect("key_accessor must be provided for delete operation")
454                        .access(&[&desc.name], &desc.data_type);
455                }
456
457                if let Some(va) = self.value_accessor.as_ref() {
458                    va.access(&[BEFORE, &desc.name], &desc.data_type)
459                } else {
460                    self.key_accessor
461                        .as_ref()
462                        .unwrap()
463                        .access(&[&desc.name], &desc.data_type)
464                }
465            }
466
467            // value should not be None.
468            ChangeEventOperation::Upsert => {
469                // For upsert operation, if desc is an additional column, access field in the `SOURCE` field.
470                desc.additional_column.column_type.as_ref().map_or_else(
471                    || {
472                        self.value_accessor
473                            .as_ref()
474                            .expect("value_accessor must be provided for upsert operation")
475                            .access(&[AFTER, &desc.name], &desc.data_type)
476                    },
477                    |additional_column_type| {
478                        match *additional_column_type {
479                            ColumnType::Timestamp(_) => {
480                                // access payload.source.ts_ms
481                                let ts_ms = self
482                                    .value_accessor
483                                    .as_ref()
484                                    .expect("value_accessor must be provided for upsert operation")
485                                    .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
486                                Ok(DatumCow::Owned(ts_ms.map(|scalar| {
487                                    Timestamptz::from_millis(scalar.into_int64())
488                                        .expect("source.ts_ms must in millisecond")
489                                        .to_scalar_value()
490                                })))
491                            }
492                            ColumnType::DatabaseName(_) => self
493                                .value_accessor
494                                .as_ref()
495                                .expect("value_accessor must be provided for upsert operation")
496                                .access(&[SOURCE, SOURCE_DB], &desc.data_type),
497                            ColumnType::SchemaName(_) => self
498                                .value_accessor
499                                .as_ref()
500                                .expect("value_accessor must be provided for upsert operation")
501                                .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
502                            ColumnType::TableName(_) => self
503                                .value_accessor
504                                .as_ref()
505                                .expect("value_accessor must be provided for upsert operation")
506                                .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
507                            ColumnType::CollectionName(_) => self
508                                .value_accessor
509                                .as_ref()
510                                .expect("value_accessor must be provided for upsert operation")
511                                .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
512                            _ => Err(AccessError::UnsupportedAdditionalColumn {
513                                name: desc.name.clone(),
514                            }),
515                        }
516                    },
517                )
518            }
519        }
520    }
521
522    fn op(&self) -> Result<ChangeEventOperation, AccessError> {
523        if let Some(accessor) = &self.value_accessor {
524            if let Some(ScalarRefImpl::Utf8(op)) =
525                accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
526            {
527                match op {
528                    DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
529                        return Ok(ChangeEventOperation::Upsert);
530                    }
531                    DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
532                    _ => (),
533                }
534            }
535            Err(super::AccessError::Undefined {
536                name: "op".into(),
537                path: Default::default(),
538            })
539        } else {
540            Ok(ChangeEventOperation::Delete)
541        }
542    }
543}
544
545/// Access support for Mongo
546///
547/// For now, we considerate `strong_schema` typed `MongoDB` Debezium event jsons only.
548pub struct MongoJsonAccess<A> {
549    accessor: A,
550    strong_schema: bool,
551}
552
553pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
554    let id_field = if let Some(value) = bson_doc.get("_id") {
555        value
556    } else {
557        bson_doc
558    };
559
560    let type_error = || AccessError::TypeError {
561        expected: id_type.to_string(),
562        got: match id_field {
563            serde_json::Value::Null => "null",
564            serde_json::Value::Bool(_) => "bool",
565            serde_json::Value::Number(_) => "number",
566            serde_json::Value::String(_) => "string",
567            serde_json::Value::Array(_) => "array",
568            serde_json::Value::Object(_) => "object",
569        }
570        .to_owned(),
571        value: id_field.to_string(),
572    };
573
574    let id: Datum = match id_type {
575        DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
576        DataType::Varchar => match id_field {
577            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
578            serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
579                obj["$oid"].as_str().unwrap_or_default().into(),
580            )),
581            _ => return Err(type_error()),
582        },
583        DataType::Int32 => {
584            if let serde_json::Value::Object(obj) = id_field
585                && obj.contains_key("$numberInt")
586            {
587                let int_str = obj["$numberInt"].as_str().unwrap_or_default();
588                Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
589            } else {
590                return Err(type_error());
591            }
592        }
593        DataType::Int64 => {
594            if let serde_json::Value::Object(obj) = id_field
595                && obj.contains_key("$numberLong")
596            {
597                let int_str = obj["$numberLong"].as_str().unwrap_or_default();
598                Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
599            } else {
600                return Err(type_error());
601            }
602        }
603        _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
604    };
605    Ok(id)
606}
607
608/// Extract the field data from the bson document
609///
610/// BSON document is a JSON object with some special fields, such as:
611/// long integer: {"$numberLong": "1630454400000"}
612/// date time: {"$date": {"$numberLong": "1630454400000"}}
613///
614/// For now, we support only the Canonical format of the date and timestamp.
615///
616/// # NOTE:
617///
618/// - `field` indicates the field name in the bson document, if it is None, the `bson_doc` is the field itself.
619// similar to extract the "_id" field from the message payload
620pub fn extract_bson_field(
621    type_expected: &DataType,
622    bson_doc: &serde_json::Value,
623    field: Option<&str>,
624) -> AccessResult {
625    let type_error = |datum: &serde_json::Value| AccessError::TypeError {
626        expected: type_expected.to_string(),
627        got: match bson_doc {
628            serde_json::Value::Null => "null",
629            serde_json::Value::Bool(_) => "bool",
630            serde_json::Value::Number(_) => "number",
631            serde_json::Value::String(_) => "string",
632            serde_json::Value::Array(_) => "array",
633            serde_json::Value::Object(_) => "object",
634        }
635        .to_owned(),
636        value: datum.to_string(),
637    };
638
639    let datum = if let Some(field) = field {
640        let Some(bson_doc) = bson_doc.get(field) else {
641            return Err(type_error(bson_doc));
642        };
643        bson_doc
644    } else {
645        bson_doc
646    };
647
648    if datum.is_null() {
649        return Ok(None);
650    }
651
652    let field_datum: Datum = match type_expected {
653        DataType::Boolean => {
654            if datum.is_boolean() {
655                Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
656            } else {
657                return Err(type_error(datum));
658            }
659        }
660        DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
661        DataType::Varchar => match datum {
662            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
663            serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
664                obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
665            }
666            _ => return Err(type_error(datum)),
667        },
668        DataType::Int16
669        | DataType::Int32
670        | DataType::Int64
671        | DataType::Int256
672        | DataType::Float32
673        | DataType::Float64 => {
674            if !datum.is_object() {
675                return Err(type_error(datum));
676            };
677
678            bson_extract_number(datum, type_expected)?
679        }
680
681        DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
682            if let serde_json::Value::Object(mp) = datum {
683                if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
684                    bson_extract_timestamp(datum, type_expected)?
685                } else if mp.contains_key("$date") {
686                    bson_extract_date(datum, type_expected)?
687                } else {
688                    return Err(type_error(datum));
689                }
690            } else {
691                return Err(type_error(datum));
692            }
693        }
694        DataType::Decimal => {
695            if let serde_json::Value::Object(obj) = datum
696                && obj.contains_key("$numberDecimal")
697                && obj["$numberDecimal"].is_string()
698            {
699                let number = obj["$numberDecimal"].as_str().unwrap();
700
701                let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
702                    AccessError::TypeError {
703                        expected: type_expected.to_string(),
704                        got: "unparsable string".into(),
705                        value: number.to_owned(),
706                    }
707                })?;
708                Some(ScalarImpl::Decimal(dec))
709            } else {
710                return Err(type_error(datum));
711            }
712        }
713
714        DataType::Bytea => {
715            if let serde_json::Value::Object(obj) = datum
716                && obj.contains_key("$binary")
717                && obj["$binary"].is_object()
718            {
719                use base64::Engine;
720
721                let binary = obj["$binary"].as_object().unwrap();
722
723                if !binary.contains_key("$base64")
724                    || !binary["$base64"].is_string()
725                    || !binary.contains_key("$subType")
726                    || !binary["$subType"].is_string()
727                {
728                    return Err(AccessError::TypeError {
729                        expected: type_expected.to_string(),
730                        got: "object".into(),
731                        value: datum.to_string(),
732                    });
733                }
734
735                let b64_str = binary["$base64"]
736                    .as_str()
737                    .ok_or_else(|| AccessError::TypeError {
738                        expected: type_expected.to_string(),
739                        got: "object".into(),
740                        value: datum.to_string(),
741                    })?;
742
743                // type is not used for now
744                let _type_str =
745                    binary["$subType"]
746                        .as_str()
747                        .ok_or_else(|| AccessError::TypeError {
748                            expected: type_expected.to_string(),
749                            got: "object".into(),
750                            value: datum.to_string(),
751                        })?;
752
753                let bytes = base64::prelude::BASE64_STANDARD
754                    .decode(b64_str)
755                    .map_err(|_| AccessError::TypeError {
756                        expected: "$binary object with $base64 string and $subType string field"
757                            .to_owned(),
758                        got: "string".to_owned(),
759                        value: bson_doc.to_string(),
760                    })?;
761                let bytea = ScalarImpl::Bytea(bytes.into());
762                Some(bytea)
763            } else {
764                return Err(type_error(datum));
765            }
766        }
767
768        DataType::Struct(struct_fields) => {
769            let mut datums = vec![];
770            for (field_name, field_type) in struct_fields.iter() {
771                let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
772                datums.push(field_datum);
773            }
774            let value = StructValue::new(datums);
775
776            Some(ScalarImpl::Struct(value))
777        }
778
779        DataType::List(list_type) => {
780            let elem_type = list_type.elem();
781            let Some(d_array) = datum.as_array() else {
782                return Err(type_error(datum));
783            };
784
785            let mut builder = elem_type.create_array_builder(d_array.len());
786            for item in d_array {
787                builder.append(extract_bson_field(elem_type, item, None)?);
788            }
789            Some(ScalarImpl::from(ListValue::new(builder.finish())))
790        }
791
792        _ => {
793            if let Some(field_name) = field {
794                unreachable!(
795                    "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
796                )
797            } else {
798                let type_expected = type_expected.to_string();
799                unreachable!(
800                    "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
801                )
802            }
803        }
804    };
805    Ok(field_datum)
806}
807
808fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
809    let field_name = match type_expected {
810        DataType::Int16 => "$numberInt",
811        DataType::Int32 => "$numberInt",
812        DataType::Int64 => "$numberLong",
813        DataType::Int256 => "$numberLong",
814        DataType::Float32 => "$numberDouble",
815        DataType::Float64 => "$numberDouble",
816        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
817    };
818
819    let datum = bson_doc.get(field_name);
820    if datum.is_none() {
821        return Err(AccessError::TypeError {
822            expected: type_expected.to_string(),
823            got: "object".into(),
824            value: bson_doc.to_string(),
825        });
826    }
827
828    let datum = datum.unwrap();
829
830    if datum.is_string() {
831        let Some(num_str) = datum.as_str() else {
832            return Err(AccessError::TypeError {
833                expected: type_expected.to_string(),
834                got: "string".into(),
835                value: datum.to_string(),
836            });
837        };
838        // parse to float
839        if [DataType::Float32, DataType::Float64].contains(type_expected) {
840            match (num_str, type_expected) {
841                ("Infinity", DataType::Float64) => {
842                    return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
843                }
844                ("Infinity", DataType::Float32) => {
845                    return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
846                }
847                ("-Infinity", DataType::Float64) => {
848                    return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
849                }
850                ("-Infinity", DataType::Float32) => {
851                    return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
852                }
853                ("NaN", DataType::Float64) => {
854                    return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
855                }
856                ("NaN", DataType::Float32) => {
857                    return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
858                }
859                _ => {}
860            }
861
862            let parsed_num: f64 = match num_str.parse() {
863                Ok(n) => n,
864                Err(_e) => {
865                    return Err(AccessError::TypeError {
866                        expected: type_expected.to_string(),
867                        got: "string".into(),
868                        value: num_str.to_owned(),
869                    });
870                }
871            };
872            if *type_expected == DataType::Float64 {
873                return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
874            } else {
875                let parsed_num = parsed_num as f32;
876                return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
877            }
878        }
879        // parse to large int
880        if *type_expected == DataType::Int256 {
881            let parsed_num = match Int256::from_str(num_str) {
882                Ok(n) => n,
883                Err(_) => {
884                    return Err(AccessError::TypeError {
885                        expected: type_expected.to_string(),
886                        got: "string".into(),
887                        value: num_str.to_owned(),
888                    });
889                }
890            };
891            return Ok(Some(ScalarImpl::Int256(parsed_num)));
892        }
893
894        // parse to integer
895        let parsed_num: i64 = match num_str.parse() {
896            Ok(n) => n,
897            Err(_e) => {
898                return Err(AccessError::TypeError {
899                    expected: type_expected.to_string(),
900                    got: "string".into(),
901                    value: num_str.to_owned(),
902                });
903            }
904        };
905        match type_expected {
906            DataType::Int16 => {
907                if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
908                    return Err(AccessError::TypeError {
909                        expected: type_expected.to_string(),
910                        got: "string".into(),
911                        value: num_str.to_owned(),
912                    });
913                }
914                return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
915            }
916            DataType::Int32 => {
917                if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
918                    return Err(AccessError::TypeError {
919                        expected: type_expected.to_string(),
920                        got: "string".into(),
921                        value: num_str.to_owned(),
922                    });
923                }
924                return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
925            }
926            DataType::Int64 => {
927                return Ok(Some(ScalarImpl::Int64(parsed_num)));
928            }
929            _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
930        }
931    }
932    if datum.is_null() {
933        return Err(AccessError::TypeError {
934            expected: type_expected.to_string(),
935            got: "null".into(),
936            value: bson_doc.to_string(),
937        });
938    }
939
940    if datum.is_array() {
941        return Err(AccessError::TypeError {
942            expected: type_expected.to_string(),
943            got: "array".to_owned(),
944            value: datum.to_string(),
945        });
946    }
947
948    if datum.is_object() {
949        return Err(AccessError::TypeError {
950            expected: type_expected.to_string(),
951            got: "object".to_owned(),
952            value: datum.to_string(),
953        });
954    }
955
956    if datum.is_boolean() {
957        return Err(AccessError::TypeError {
958            expected: type_expected.to_string(),
959            got: "boolean".into(),
960            value: bson_doc.to_string(),
961        });
962    }
963
964    if datum.is_number() {
965        let got_type = if datum.is_f64() { "f64" } else { "i64" };
966        return Err(AccessError::TypeError {
967            expected: type_expected.to_string(),
968            got: got_type.into(),
969            value: bson_doc.to_string(),
970        });
971    }
972
973    Err(AccessError::TypeError {
974        expected: type_expected.to_string(),
975        got: "unknown".into(),
976        value: bson_doc.to_string(),
977    })
978}
979
980fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
981    // according to mongodb extended json v2
982    // the date could be:
983    //
984    // the timestamp type could be:
985    //
986    // both Canonical and Relaxed format:
987    // {"$timestamp": {"t": 1630454400, "i": 1}}
988    //
989    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
990    // date is encoded as number of milliseconds since the Unix epoch
991    //
992    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
993    // date is encoded as ISO8601 string
994
995    let datum = &bson_doc["$date"];
996
997    let type_error = || AccessError::TypeError {
998        expected: type_expected.to_string(),
999        got: match bson_doc {
1000            serde_json::Value::Null => "null",
1001            serde_json::Value::Bool(_) => "bool",
1002            serde_json::Value::Number(_) => "number",
1003            serde_json::Value::String(_) => "string",
1004            serde_json::Value::Array(_) => "array",
1005            serde_json::Value::Object(_) => "object",
1006        }
1007        .to_owned(),
1008        value: datum.to_string(),
1009    };
1010
1011    // deal with the Canonical format only
1012    let millis = match datum {
1013        // Canonical format {"$date": {"$numberLong": "1630454400000"}}
1014        serde_json::Value::Object(obj)
1015            if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1016        {
1017            obj["$numberLong"]
1018                .as_str()
1019                .unwrap()
1020                .parse::<i64>()
1021                .map_err(|_| AccessError::TypeError {
1022                    expected: "timestamp".into(),
1023                    got: "object".into(),
1024                    value: datum.to_string(),
1025                })?
1026        }
1027        // Relaxed format {"$date": "2021-09-01T00:00:00.000Z"}
1028        serde_json::Value::String(s) => {
1029            let dt =
1030                chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1031                    expected: "valid ISO-8601 date string".into(),
1032                    got: "string".into(),
1033                    value: datum.to_string(),
1034                })?;
1035            dt.timestamp_millis()
1036        }
1037
1038        // jsonv1 format
1039        // {"$date": 1630454400000}
1040        serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1041            expected: "timestamp".into(),
1042            got: "number".into(),
1043            value: datum.to_string(),
1044        })?,
1045
1046        _ => return Err(type_error()),
1047    };
1048
1049    let datetime =
1050        chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1051            expected: "timestamp".into(),
1052            got: "object".into(),
1053            value: datum.to_string(),
1054        })?;
1055
1056    let res = match type_expected {
1057        DataType::Date => {
1058            let naive = datetime.naive_local();
1059            let dt = naive.date();
1060            Some(ScalarImpl::Date(dt.into()))
1061        }
1062        DataType::Time => {
1063            let naive = datetime.naive_local();
1064            let dt = naive.time();
1065            Some(ScalarImpl::Time(dt.into()))
1066        }
1067        DataType::Timestamp => {
1068            let naive = datetime.naive_local();
1069            let dt = Timestamp::from(naive);
1070            Some(ScalarImpl::Timestamp(dt))
1071        }
1072        DataType::Timestamptz => {
1073            let dt = datetime.into();
1074            Some(ScalarImpl::Timestamptz(dt))
1075        }
1076        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1077    };
1078    Ok(res)
1079}
1080
1081fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1082    // according to mongodb extended json v2
1083    // the date could be:
1084    //
1085    // the timestamp type could be:
1086    //
1087    // both Canonical and Relaxed format:
1088    // {"$timestamp": {"t": 1630454400, "i": 1}}
1089    // t is the number of seconds since the Unix epoch
1090    //
1091    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1092    // date is encoded as number of milliseconds since the Unix epoch
1093    //
1094    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1095    // date is encoded as ISO8601 string
1096    //
1097    // *For now, we support the Canonical format only.*
1098
1099    let Some(obj) = bson_doc["$timestamp"].as_object() else {
1100        return Err(AccessError::TypeError {
1101            expected: "timestamp".into(),
1102            got: "object".into(),
1103            value: bson_doc.to_string(),
1104        });
1105    };
1106
1107    if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1108    {
1109        return Err(AccessError::TypeError {
1110            expected: "timestamp with valid seconds since epoch".into(),
1111            got: "object".into(),
1112            value: bson_doc.to_string(),
1113        });
1114    }
1115
1116    let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1117        expected: "timestamp with valid seconds since epoch".into(),
1118        got: "object".into(),
1119        value: bson_doc.to_string(),
1120    })?;
1121
1122    let chrono_datetime =
1123        chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1124            expected: type_expected.to_string(),
1125            got: "object".to_owned(),
1126            value: bson_doc.to_string(),
1127        })?;
1128
1129    let res = match type_expected {
1130        DataType::Date => {
1131            let naive = chrono_datetime.naive_local();
1132            let dt = naive.date();
1133            Some(ScalarImpl::Date(dt.into()))
1134        }
1135        DataType::Time => {
1136            let naive = chrono_datetime.naive_local();
1137            let dt = naive.time();
1138            Some(ScalarImpl::Time(dt.into()))
1139        }
1140        DataType::Timestamp => {
1141            let naive = chrono_datetime.naive_local();
1142            let dt = Timestamp::from(naive);
1143            Some(ScalarImpl::Timestamp(dt))
1144        }
1145        DataType::Timestamptz => {
1146            let dt = chrono_datetime.into();
1147            Some(ScalarImpl::Timestamptz(dt))
1148        }
1149        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1150    };
1151
1152    Ok(res)
1153}
1154
1155impl<A> MongoJsonAccess<A> {
1156    pub fn new(accessor: A, strong_schema: bool) -> Self {
1157        Self {
1158            accessor,
1159            strong_schema,
1160        }
1161    }
1162}
1163
1164impl<A> Access for MongoJsonAccess<A>
1165where
1166    A: Access,
1167{
1168    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1169        match path {
1170            ["after" | "before", "_id"] => {
1171                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1172                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1173                    Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1174                } else {
1175                    // fail to extract the "_id" field from the message payload
1176                    Err(AccessError::Undefined {
1177                        name: "_id".to_owned(),
1178                        path: path[0].to_owned(),
1179                    })?
1180                }
1181            }
1182
1183            ["after" | "before", "payload"] if !self.strong_schema => {
1184                self.access(&[path[0]], &DataType::Jsonb)
1185            }
1186
1187            ["after" | "before", field] if self.strong_schema => {
1188                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1189                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1190                    Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1191                } else {
1192                    // fail to extract the expected field from the message payload
1193                    Err(AccessError::Undefined {
1194                        name: field.to_string(),
1195                        path: path[0].to_owned(),
1196                    })?
1197                }
1198            }
1199
1200            // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
1201            // In addition, the "_id" field is named as "id" in the key. An example of message key:
1202            // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
1203            ["_id"] => {
1204                let ret = self.accessor.access(path, type_expected);
1205                if matches!(ret, Err(AccessError::Undefined { .. })) {
1206                    let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1207                    if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1208                        Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1209                    } else {
1210                        // fail to extract the "_id" field from the message key
1211                        Err(AccessError::Undefined {
1212                            name: "_id".to_owned(),
1213                            path: "id".to_owned(),
1214                        })?
1215                    }
1216                } else {
1217                    ret
1218                }
1219            }
1220            _ => self.accessor.access(path, type_expected),
1221        }
1222    }
1223}