risingwave_connector/parser/unified/
debezium.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::types::{
20    DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
21    Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
22};
23use risingwave_connector_codec::decoder::AccessExt;
24use risingwave_pb::plan_common::additional_column::ColumnType;
25use thiserror_ext::AsReport;
26
27use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
28use crate::parser::TransactionControl;
29use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
30use crate::parser::schema_change::TableChangeType;
31use crate::source::cdc::build_cdc_table_id;
32use crate::source::cdc::external::mysql::{
33    mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
34};
35use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
36use crate::source::{ConnectorProperties, SourceColumnDesc};
37
38/// Checks if a given default value expression is a BIGSERIAL default.
39///
40/// Normally, all unsupported expressions should fail in `from_text` parsing.
41/// However, we make a special exception for `nextval()` function because:
42/// 1. When users set a column as BIGSERIAL (usually as primary key), PostgreSQL automatically generates
43///    a default value expression like `nextval('sequence_name'::regclass)`
44/// 2. This is a very common scenario in real-world usage
45/// 3. For existing columns with `nextval()` default, we skip parsing the default value
46///    to avoid schema change failures
47///
48/// TODO: In the future, if we can distinguish between newly added columns and existing columns,
49/// we should modify this logic to:
50/// - Skip default value parsing for existing columns with `nextval()`
51/// - Report error for newly added columns with `nextval()` (since they should be handled differently)
52fn is_bigserial_default(default_value_expression: &str) -> bool {
53    if default_value_expression.trim().is_empty() {
54        return false;
55    }
56
57    let expr = default_value_expression.trim();
58
59    // Return true if it is a `nextval()` function (bigserial default).
60    expr.starts_with("nextval(")
61}
62
63// Example of Debezium JSON value:
64// {
65//     "payload":
66//     {
67//         "before": null,
68//         "after":
69//         {
70//             "O_ORDERKEY": 5,
71//             "O_CUSTKEY": 44485,
72//             "O_ORDERSTATUS": "F",
73//             "O_TOTALPRICE": "144659.20",
74//             "O_ORDERDATE": "1994-07-30"
75//         },
76//         "source":
77//         {
78//             "version": "1.9.7.Final",
79//             "connector": "mysql",
80//             "name": "RW_CDC_1002",
81//             "ts_ms": 1695277757000,
82//             "db": "mydb",
83//             "sequence": null,
84//             "table": "orders",
85//             "server_id": 0,
86//             "gtid": null,
87//             "file": "binlog.000008",
88//             "pos": 3693,
89//             "row": 0,
90//         },
91//         "op": "r",
92//         "ts_ms": 1695277757017,
93//         "transaction": null
94//     }
95// }
96pub struct DebeziumChangeEvent<A> {
97    value_accessor: Option<A>,
98    key_accessor: Option<A>,
99    is_mongodb: bool,
100}
101
102const BEFORE: &str = "before";
103const AFTER: &str = "after";
104
105const UPSTREAM_DDL: &str = "ddl";
106const SOURCE: &str = "source";
107const SOURCE_TS_MS: &str = "ts_ms";
108const SOURCE_DB: &str = "db";
109const SOURCE_SCHEMA: &str = "schema";
110const SOURCE_TABLE: &str = "table";
111const SOURCE_COLLECTION: &str = "collection";
112
113const OP: &str = "op";
114pub const TRANSACTION_STATUS: &str = "status";
115pub const TRANSACTION_ID: &str = "id";
116
117pub const TABLE_CHANGES: &str = "tableChanges";
118
119pub const DEBEZIUM_READ_OP: &str = "r";
120pub const DEBEZIUM_CREATE_OP: &str = "c";
121pub const DEBEZIUM_UPDATE_OP: &str = "u";
122pub const DEBEZIUM_DELETE_OP: &str = "d";
123
124pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
125pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
126
127pub fn parse_transaction_meta(
128    accessor: &impl Access,
129    connector_props: &ConnectorProperties,
130) -> AccessResult<TransactionControl> {
131    if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
132        accessor
133            .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
134            .to_datum_ref(),
135        accessor
136            .access(&[TRANSACTION_ID], &DataType::Varchar)?
137            .to_datum_ref(),
138    ) {
139        // The id field has different meanings for different databases:
140        // PG: txID:LSN
141        // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23)
142        // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002)
143        match status {
144            DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
145                ConnectorProperties::PostgresCdc(_) => {
146                    let (tx_id, _) = id.split_once(':').unwrap();
147                    return Ok(TransactionControl::Begin { id: tx_id.into() });
148                }
149                ConnectorProperties::MysqlCdc(_) => {
150                    return Ok(TransactionControl::Begin { id: id.into() });
151                }
152                ConnectorProperties::SqlServerCdc(_) => {
153                    return Ok(TransactionControl::Begin { id: id.into() });
154                }
155                _ => {}
156            },
157            DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
158                ConnectorProperties::PostgresCdc(_) => {
159                    let (tx_id, _) = id.split_once(':').unwrap();
160                    return Ok(TransactionControl::Commit { id: tx_id.into() });
161                }
162                ConnectorProperties::MysqlCdc(_) => {
163                    return Ok(TransactionControl::Commit { id: id.into() });
164                }
165                ConnectorProperties::SqlServerCdc(_) => {
166                    return Ok(TransactionControl::Commit { id: id.into() });
167                }
168                _ => {}
169            },
170            _ => {}
171        }
172    }
173
174    Err(AccessError::Undefined {
175        name: "transaction status".into(),
176        path: TRANSACTION_STATUS.into(),
177    })
178}
179
180macro_rules! jsonb_access_field {
181    ($col:expr, $field:expr, $as_type:tt) => {
182        $crate::paste! {
183            $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
184        }
185    };
186}
187
188/// Parse the schema change message from Debezium.
189/// The layout of MySQL schema change message can refer to
190/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
191pub fn parse_schema_change(
192    accessor: &impl Access,
193    source_id: u32,
194    source_name: &str,
195    connector_props: &ConnectorProperties,
196) -> AccessResult<SchemaChangeEnvelope> {
197    let mut schema_changes = vec![];
198
199    let upstream_ddl: String = accessor
200        .access(&[UPSTREAM_DDL], &DataType::Varchar)?
201        .to_owned_datum()
202        .unwrap()
203        .as_utf8()
204        .to_string();
205
206    if let Some(ScalarRefImpl::List(table_changes)) = accessor
207        .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
208        .to_datum_ref()
209    {
210        for datum in table_changes.iter() {
211            let jsonb = match datum {
212                Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
213                _ => unreachable!(""),
214            };
215            let id: String = jsonb_access_field!(jsonb, "id", string);
216            let ty = jsonb_access_field!(jsonb, "type", string);
217
218            let table_name = id.trim_matches('"').to_owned();
219            let ddl_type: TableChangeType = ty.as_str().into();
220            if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
221                tracing::debug!("skip table schema change for create/drop command");
222                continue;
223            }
224
225            let mut column_descs: Vec<ColumnDesc> = vec![];
226            if let Some(table) = jsonb.access_object_field("table")
227                && let Some(columns) = table.access_object_field("columns")
228            {
229                for col in columns.array_elements().unwrap() {
230                    let name = jsonb_access_field!(col, "name", string);
231                    let type_name = jsonb_access_field!(col, "typeName", string);
232                    // Determine if this column is an enum type
233                    let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
234                    let data_type = match *connector_props {
235                        ConnectorProperties::PostgresCdc(_) => {
236                            let ty = type_name_to_pg_type(type_name.as_str());
237                            if is_enum {
238                                tracing::debug!(target: "auto_schema_change",
239                                    "Convert PostgreSQL user defined enum type '{}' to VARCHAR", type_name);
240                                DataType::Varchar
241                            } else {
242                                match ty {
243                                    Some(ty) => match pg_type_to_rw_type(&ty) {
244                                        Ok(data_type) => data_type,
245                                        Err(err) => {
246                                            tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
247                                            return Err(AccessError::CdcAutoSchemaChangeError {
248                                                ty: type_name,
249                                                table_name: format!(
250                                                    "{}.{}",
251                                                    source_name, table_name
252                                                ),
253                                            });
254                                        }
255                                    },
256                                    None => {
257                                        return Err(AccessError::CdcAutoSchemaChangeError {
258                                            ty: type_name,
259                                            table_name: format!("{}.{}", source_name, table_name),
260                                        });
261                                    }
262                                }
263                            }
264                        }
265                        ConnectorProperties::MysqlCdc(_) => {
266                            let ty = type_name_to_mysql_type(type_name.as_str());
267                            match ty {
268                                Some(ty) => match mysql_type_to_rw_type(&ty) {
269                                    Ok(data_type) => data_type,
270                                    Err(err) => {
271                                        tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
272                                        return Err(AccessError::CdcAutoSchemaChangeError {
273                                            ty: type_name,
274                                            table_name: format!("{}.{}", source_name, table_name),
275                                        });
276                                    }
277                                },
278                                None => {
279                                    return Err(AccessError::CdcAutoSchemaChangeError {
280                                        ty: type_name,
281                                        table_name: format!("{}.{}", source_name, table_name),
282                                    });
283                                }
284                            }
285                        }
286                        _ => {
287                            unreachable!()
288                        }
289                    };
290
291                    // handle default value expression, currently we only support constant expression
292                    let column_desc = match col.access_object_field("defaultValueExpression") {
293                        Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
294                            let default_val_expr_str = default_val_expr_str.as_str().unwrap();
295                            // Only process constant default values
296                            if is_bigserial_default(default_val_expr_str) {
297                                tracing::warn!(target: "auto_schema_change",
298                                    "Ignoring unsupported BIGSERIAL default value expression: {}", default_val_expr_str);
299                                ColumnDesc::named(name, ColumnId::placeholder(), data_type)
300                            } else {
301                                let value_text: Option<String>;
302                                match *connector_props {
303                                    ConnectorProperties::PostgresCdc(_) => {
304                                        // default value of non-number data type will be stored as
305                                        // "'value'::type"
306                                        match default_val_expr_str
307                                            .split("::")
308                                            .map(|s| s.trim_matches('\''))
309                                            .next()
310                                        {
311                                            None => {
312                                                value_text = None;
313                                            }
314                                            Some(val_text) => {
315                                                value_text = Some(val_text.to_owned());
316                                            }
317                                        }
318                                    }
319                                    ConnectorProperties::MysqlCdc(_) => {
320                                        // mysql timestamp is mapped to timestamptz, we use UTC timezone to
321                                        // interpret its value
322                                        if data_type == DataType::Timestamptz {
323                                            value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
324                                                tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
325                                                AccessError::TypeError {
326                                                    expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
327                                                    got: data_type.to_string(),
328                                                    value: default_val_expr_str.to_owned(),
329                                                }
330                                            })?);
331                                        } else {
332                                            value_text = Some(default_val_expr_str.to_owned());
333                                        }
334                                    }
335                                    _ => {
336                                        unreachable!("connector doesn't support schema change")
337                                    }
338                                }
339
340                                let snapshot_value: Datum = if let Some(value_text) = value_text {
341                                    Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
342                                        |err| {
343                                            tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
344                                            AccessError::TypeError {
345                                                expected: "constant expression".into(),
346                                                got: data_type.to_string(),
347                                                value: value_text,
348                                            }
349                                        },
350                                    )?)
351                                } else {
352                                    None
353                                };
354
355                                if snapshot_value.is_none() {
356                                    tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
357                                    ColumnDesc::named(name, ColumnId::placeholder(), data_type)
358                                } else {
359                                    ColumnDesc::named_with_default_value(
360                                        name,
361                                        ColumnId::placeholder(),
362                                        data_type,
363                                        snapshot_value,
364                                    )
365                                }
366                            }
367                        }
368                        _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
369                    };
370                    column_descs.push(column_desc);
371                }
372            }
373
374            // concatenate the source_id to the cdc_table_id
375            let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
376            schema_changes.push(TableSchemaChange {
377                cdc_table_id,
378                columns: column_descs
379                    .into_iter()
380                    .map(|column_desc| ColumnCatalog {
381                        column_desc,
382                        is_hidden: false,
383                    })
384                    .collect_vec(),
385                change_type: ty.as_str().into(),
386                upstream_ddl: upstream_ddl.clone(),
387            });
388        }
389
390        Ok(SchemaChangeEnvelope {
391            table_changes: schema_changes,
392        })
393    } else {
394        Err(AccessError::Undefined {
395            name: "table schema change".into(),
396            path: TABLE_CHANGES.into(),
397        })
398    }
399}
400
401impl<A> DebeziumChangeEvent<A>
402where
403    A: Access,
404{
405    /// Panic: one of the `key_accessor` or `value_accessor` must be provided.
406    pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
407        assert!(key_accessor.is_some() || value_accessor.is_some());
408        Self {
409            value_accessor,
410            key_accessor,
411            is_mongodb: false,
412        }
413    }
414
415    pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
416        assert!(key_accessor.is_some() || value_accessor.is_some());
417        Self {
418            value_accessor,
419            key_accessor,
420            is_mongodb: true,
421        }
422    }
423
424    /// Returns the transaction metadata if exists.
425    ///
426    /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details.
427    pub(crate) fn transaction_control(
428        &self,
429        connector_props: &ConnectorProperties,
430    ) -> Option<TransactionControl> {
431        // Ignore if `value_accessor` is not provided or there's any error when
432        // trying to parse the transaction metadata.
433        self.value_accessor
434            .as_ref()
435            .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
436    }
437}
438
439impl<A> ChangeEvent for DebeziumChangeEvent<A>
440where
441    A: Access,
442{
443    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
444        match self.op()? {
445            ChangeEventOperation::Delete => {
446                // For delete events of MongoDB, the "before" and "after" field both are null in the value,
447                // we need to extract the _id field from the key.
448                if self.is_mongodb && desc.name == "_id" {
449                    return self
450                        .key_accessor
451                        .as_ref()
452                        .expect("key_accessor must be provided for delete operation")
453                        .access(&[&desc.name], &desc.data_type);
454                }
455
456                if let Some(va) = self.value_accessor.as_ref() {
457                    va.access(&[BEFORE, &desc.name], &desc.data_type)
458                } else {
459                    self.key_accessor
460                        .as_ref()
461                        .unwrap()
462                        .access(&[&desc.name], &desc.data_type)
463                }
464            }
465
466            // value should not be None.
467            ChangeEventOperation::Upsert => {
468                // For upsert operation, if desc is an additional column, access field in the `SOURCE` field.
469                desc.additional_column.column_type.as_ref().map_or_else(
470                    || {
471                        self.value_accessor
472                            .as_ref()
473                            .expect("value_accessor must be provided for upsert operation")
474                            .access(&[AFTER, &desc.name], &desc.data_type)
475                    },
476                    |additional_column_type| {
477                        match *additional_column_type {
478                            ColumnType::Timestamp(_) => {
479                                // access payload.source.ts_ms
480                                let ts_ms = self
481                                    .value_accessor
482                                    .as_ref()
483                                    .expect("value_accessor must be provided for upsert operation")
484                                    .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
485                                Ok(DatumCow::Owned(ts_ms.map(|scalar| {
486                                    Timestamptz::from_millis(scalar.into_int64())
487                                        .expect("source.ts_ms must in millisecond")
488                                        .to_scalar_value()
489                                })))
490                            }
491                            ColumnType::DatabaseName(_) => self
492                                .value_accessor
493                                .as_ref()
494                                .expect("value_accessor must be provided for upsert operation")
495                                .access(&[SOURCE, SOURCE_DB], &desc.data_type),
496                            ColumnType::SchemaName(_) => self
497                                .value_accessor
498                                .as_ref()
499                                .expect("value_accessor must be provided for upsert operation")
500                                .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
501                            ColumnType::TableName(_) => self
502                                .value_accessor
503                                .as_ref()
504                                .expect("value_accessor must be provided for upsert operation")
505                                .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
506                            ColumnType::CollectionName(_) => self
507                                .value_accessor
508                                .as_ref()
509                                .expect("value_accessor must be provided for upsert operation")
510                                .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
511                            _ => Err(AccessError::UnsupportedAdditionalColumn {
512                                name: desc.name.clone(),
513                            }),
514                        }
515                    },
516                )
517            }
518        }
519    }
520
521    fn op(&self) -> Result<ChangeEventOperation, AccessError> {
522        if let Some(accessor) = &self.value_accessor {
523            if let Some(ScalarRefImpl::Utf8(op)) =
524                accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
525            {
526                match op {
527                    DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
528                        return Ok(ChangeEventOperation::Upsert);
529                    }
530                    DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
531                    _ => (),
532                }
533            }
534            Err(super::AccessError::Undefined {
535                name: "op".into(),
536                path: Default::default(),
537            })
538        } else {
539            Ok(ChangeEventOperation::Delete)
540        }
541    }
542}
543
544/// Access support for Mongo
545///
546/// For now, we considerate `strong_schema` typed `MongoDB` Debezium event jsons only.
547pub struct MongoJsonAccess<A> {
548    accessor: A,
549    strong_schema: bool,
550}
551
552pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
553    let id_field = if let Some(value) = bson_doc.get("_id") {
554        value
555    } else {
556        bson_doc
557    };
558
559    let type_error = || AccessError::TypeError {
560        expected: id_type.to_string(),
561        got: match id_field {
562            serde_json::Value::Null => "null",
563            serde_json::Value::Bool(_) => "bool",
564            serde_json::Value::Number(_) => "number",
565            serde_json::Value::String(_) => "string",
566            serde_json::Value::Array(_) => "array",
567            serde_json::Value::Object(_) => "object",
568        }
569        .to_owned(),
570        value: id_field.to_string(),
571    };
572
573    let id: Datum = match id_type {
574        DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
575        DataType::Varchar => match id_field {
576            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
577            serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
578                obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
579            )),
580            _ => return Err(type_error()),
581        },
582        DataType::Int32 => {
583            if let serde_json::Value::Object(obj) = id_field
584                && obj.contains_key("$numberInt")
585            {
586                let int_str = obj["$numberInt"].as_str().unwrap_or_default();
587                Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
588            } else {
589                return Err(type_error());
590            }
591        }
592        DataType::Int64 => {
593            if let serde_json::Value::Object(obj) = id_field
594                && obj.contains_key("$numberLong")
595            {
596                let int_str = obj["$numberLong"].as_str().unwrap_or_default();
597                Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
598            } else {
599                return Err(type_error());
600            }
601        }
602        _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
603    };
604    Ok(id)
605}
606
607/// Extract the field data from the bson document
608///
609/// BSON document is a JSON object with some special fields, such as:
610/// long integer: {"$numberLong": "1630454400000"}
611/// date time: {"$date": {"$numberLong": "1630454400000"}}
612///
613/// For now, we support only the Canonical format of the date and timestamp.
614///
615/// # NOTE:
616///
617/// - `field` indicates the field name in the bson document, if it is None, the `bson_doc` is the field itself.
618// similar to extract the "_id" field from the message payload
619pub fn extract_bson_field(
620    type_expected: &DataType,
621    bson_doc: &serde_json::Value,
622    field: Option<&str>,
623) -> AccessResult {
624    let type_error = |datum: &serde_json::Value| AccessError::TypeError {
625        expected: type_expected.to_string(),
626        got: match bson_doc {
627            serde_json::Value::Null => "null",
628            serde_json::Value::Bool(_) => "bool",
629            serde_json::Value::Number(_) => "number",
630            serde_json::Value::String(_) => "string",
631            serde_json::Value::Array(_) => "array",
632            serde_json::Value::Object(_) => "object",
633        }
634        .to_owned(),
635        value: datum.to_string(),
636    };
637
638    let datum = if let Some(field) = field {
639        let Some(bson_doc) = bson_doc.get(field) else {
640            return Err(type_error(bson_doc));
641        };
642        bson_doc
643    } else {
644        bson_doc
645    };
646
647    if datum.is_null() {
648        return Ok(None);
649    }
650
651    let field_datum: Datum = match type_expected {
652        DataType::Boolean => {
653            if datum.is_boolean() {
654                Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
655            } else {
656                return Err(type_error(datum));
657            }
658        }
659        DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
660        DataType::Varchar => match datum {
661            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
662            serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
663                obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
664            }
665            _ => return Err(type_error(datum)),
666        },
667        DataType::Int16
668        | DataType::Int32
669        | DataType::Int64
670        | DataType::Int256
671        | DataType::Float32
672        | DataType::Float64 => {
673            if !datum.is_object() {
674                return Err(type_error(datum));
675            };
676
677            bson_extract_number(datum, type_expected)?
678        }
679
680        DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
681            if let serde_json::Value::Object(mp) = datum {
682                if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
683                    bson_extract_timestamp(datum, type_expected)?
684                } else if mp.contains_key("$date") {
685                    bson_extract_date(datum, type_expected)?
686                } else {
687                    return Err(type_error(datum));
688                }
689            } else {
690                return Err(type_error(datum));
691            }
692        }
693        DataType::Decimal => {
694            if let serde_json::Value::Object(obj) = datum
695                && obj.contains_key("$numberDecimal")
696                && obj["$numberDecimal"].is_string()
697            {
698                let number = obj["$numberDecimal"].as_str().unwrap();
699
700                let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
701                    AccessError::TypeError {
702                        expected: type_expected.to_string(),
703                        got: "unparsable string".into(),
704                        value: number.to_owned(),
705                    }
706                })?;
707                Some(ScalarImpl::Decimal(dec))
708            } else {
709                return Err(type_error(datum));
710            }
711        }
712
713        DataType::Bytea => {
714            if let serde_json::Value::Object(obj) = datum
715                && obj.contains_key("$binary")
716                && obj["$binary"].is_object()
717            {
718                use base64::Engine;
719
720                let binary = obj["$binary"].as_object().unwrap();
721
722                if !binary.contains_key("$base64")
723                    || !binary["$base64"].is_string()
724                    || !binary.contains_key("$subType")
725                    || !binary["$subType"].is_string()
726                {
727                    return Err(AccessError::TypeError {
728                        expected: type_expected.to_string(),
729                        got: "object".into(),
730                        value: datum.to_string(),
731                    });
732                }
733
734                let b64_str = binary["$base64"]
735                    .as_str()
736                    .ok_or_else(|| AccessError::TypeError {
737                        expected: type_expected.to_string(),
738                        got: "object".into(),
739                        value: datum.to_string(),
740                    })?;
741
742                // type is not used for now
743                let _type_str =
744                    binary["$subType"]
745                        .as_str()
746                        .ok_or_else(|| AccessError::TypeError {
747                            expected: type_expected.to_string(),
748                            got: "object".into(),
749                            value: datum.to_string(),
750                        })?;
751
752                let bytes = base64::prelude::BASE64_STANDARD
753                    .decode(b64_str)
754                    .map_err(|_| AccessError::TypeError {
755                        expected: "$binary object with $base64 string and $subType string field"
756                            .to_owned(),
757                        got: "string".to_owned(),
758                        value: bson_doc.to_string(),
759                    })?;
760                let bytea = ScalarImpl::Bytea(bytes.into());
761                Some(bytea)
762            } else {
763                return Err(type_error(datum));
764            }
765        }
766
767        DataType::Struct(struct_fields) => {
768            let mut datums = vec![];
769            for (field_name, field_type) in struct_fields.iter() {
770                let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
771                datums.push(field_datum);
772            }
773            let value = StructValue::new(datums);
774
775            Some(ScalarImpl::Struct(value))
776        }
777
778        DataType::List(list_type) => {
779            let Some(d_array) = datum.as_array() else {
780                return Err(type_error(datum));
781            };
782
783            let mut builder = list_type.create_array_builder(d_array.len());
784            for item in d_array {
785                builder.append(extract_bson_field(list_type, item, None)?);
786            }
787            Some(ScalarImpl::from(ListValue::new(builder.finish())))
788        }
789
790        _ => {
791            if let Some(field_name) = field {
792                unreachable!(
793                    "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
794                )
795            } else {
796                let type_expected = type_expected.to_string();
797                unreachable!(
798                    "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
799                )
800            }
801        }
802    };
803    Ok(field_datum)
804}
805
806fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
807    let field_name = match type_expected {
808        DataType::Int16 => "$numberInt",
809        DataType::Int32 => "$numberInt",
810        DataType::Int64 => "$numberLong",
811        DataType::Int256 => "$numberLong",
812        DataType::Float32 => "$numberDouble",
813        DataType::Float64 => "$numberDouble",
814        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
815    };
816
817    let datum = bson_doc.get(field_name);
818    if datum.is_none() {
819        return Err(AccessError::TypeError {
820            expected: type_expected.to_string(),
821            got: "object".into(),
822            value: bson_doc.to_string(),
823        });
824    }
825
826    let datum = datum.unwrap();
827
828    if datum.is_string() {
829        let Some(num_str) = datum.as_str() else {
830            return Err(AccessError::TypeError {
831                expected: type_expected.to_string(),
832                got: "string".into(),
833                value: datum.to_string(),
834            });
835        };
836        // parse to float
837        if [DataType::Float32, DataType::Float64].contains(type_expected) {
838            match (num_str, type_expected) {
839                ("Infinity", DataType::Float64) => {
840                    return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
841                }
842                ("Infinity", DataType::Float32) => {
843                    return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
844                }
845                ("-Infinity", DataType::Float64) => {
846                    return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
847                }
848                ("-Infinity", DataType::Float32) => {
849                    return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
850                }
851                ("NaN", DataType::Float64) => {
852                    return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
853                }
854                ("NaN", DataType::Float32) => {
855                    return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
856                }
857                _ => {}
858            }
859
860            let parsed_num: f64 = match num_str.parse() {
861                Ok(n) => n,
862                Err(_e) => {
863                    return Err(AccessError::TypeError {
864                        expected: type_expected.to_string(),
865                        got: "string".into(),
866                        value: num_str.to_owned(),
867                    });
868                }
869            };
870            if *type_expected == DataType::Float64 {
871                return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
872            } else {
873                let parsed_num = parsed_num as f32;
874                return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
875            }
876        }
877        // parse to large int
878        if *type_expected == DataType::Int256 {
879            let parsed_num = match Int256::from_str(num_str) {
880                Ok(n) => n,
881                Err(_) => {
882                    return Err(AccessError::TypeError {
883                        expected: type_expected.to_string(),
884                        got: "string".into(),
885                        value: num_str.to_owned(),
886                    });
887                }
888            };
889            return Ok(Some(ScalarImpl::Int256(parsed_num)));
890        }
891
892        // parse to integer
893        let parsed_num: i64 = match num_str.parse() {
894            Ok(n) => n,
895            Err(_e) => {
896                return Err(AccessError::TypeError {
897                    expected: type_expected.to_string(),
898                    got: "string".into(),
899                    value: num_str.to_owned(),
900                });
901            }
902        };
903        match type_expected {
904            DataType::Int16 => {
905                if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
906                    return Err(AccessError::TypeError {
907                        expected: type_expected.to_string(),
908                        got: "string".into(),
909                        value: num_str.to_owned(),
910                    });
911                }
912                return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
913            }
914            DataType::Int32 => {
915                if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
916                    return Err(AccessError::TypeError {
917                        expected: type_expected.to_string(),
918                        got: "string".into(),
919                        value: num_str.to_owned(),
920                    });
921                }
922                return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
923            }
924            DataType::Int64 => {
925                return Ok(Some(ScalarImpl::Int64(parsed_num)));
926            }
927            _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
928        }
929    }
930    if datum.is_null() {
931        return Err(AccessError::TypeError {
932            expected: type_expected.to_string(),
933            got: "null".into(),
934            value: bson_doc.to_string(),
935        });
936    }
937
938    if datum.is_array() {
939        return Err(AccessError::TypeError {
940            expected: type_expected.to_string(),
941            got: "array".to_owned(),
942            value: datum.to_string(),
943        });
944    }
945
946    if datum.is_object() {
947        return Err(AccessError::TypeError {
948            expected: type_expected.to_string(),
949            got: "object".to_owned(),
950            value: datum.to_string(),
951        });
952    }
953
954    if datum.is_boolean() {
955        return Err(AccessError::TypeError {
956            expected: type_expected.to_string(),
957            got: "boolean".into(),
958            value: bson_doc.to_string(),
959        });
960    }
961
962    if datum.is_number() {
963        let got_type = if datum.is_f64() { "f64" } else { "i64" };
964        return Err(AccessError::TypeError {
965            expected: type_expected.to_string(),
966            got: got_type.into(),
967            value: bson_doc.to_string(),
968        });
969    }
970
971    Err(AccessError::TypeError {
972        expected: type_expected.to_string(),
973        got: "unknown".into(),
974        value: bson_doc.to_string(),
975    })
976}
977
978fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
979    // according to mongodb extended json v2
980    // the date could be:
981    //
982    // the timestamp type could be:
983    //
984    // both Canonical and Relaxed format:
985    // {"$timestamp": {"t": 1630454400, "i": 1}}
986    //
987    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
988    // date is encoded as number of milliseconds since the Unix epoch
989    //
990    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
991    // date is encoded as ISO8601 string
992
993    let datum = &bson_doc["$date"];
994
995    let type_error = || AccessError::TypeError {
996        expected: type_expected.to_string(),
997        got: match bson_doc {
998            serde_json::Value::Null => "null",
999            serde_json::Value::Bool(_) => "bool",
1000            serde_json::Value::Number(_) => "number",
1001            serde_json::Value::String(_) => "string",
1002            serde_json::Value::Array(_) => "array",
1003            serde_json::Value::Object(_) => "object",
1004        }
1005        .to_owned(),
1006        value: datum.to_string(),
1007    };
1008
1009    // deal with the Canonical format only
1010    let millis = match datum {
1011        // Canonical format {"$date": {"$numberLong": "1630454400000"}}
1012        serde_json::Value::Object(obj)
1013            if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1014        {
1015            obj["$numberLong"]
1016                .as_str()
1017                .unwrap()
1018                .parse::<i64>()
1019                .map_err(|_| AccessError::TypeError {
1020                    expected: "timestamp".into(),
1021                    got: "object".into(),
1022                    value: datum.to_string(),
1023                })?
1024        }
1025        // Relaxed format {"$date": "2021-09-01T00:00:00.000Z"}
1026        serde_json::Value::String(s) => {
1027            let dt =
1028                chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1029                    expected: "valid ISO-8601 date string".into(),
1030                    got: "string".into(),
1031                    value: datum.to_string(),
1032                })?;
1033            dt.timestamp_millis()
1034        }
1035
1036        // jsonv1 format
1037        // {"$date": 1630454400000}
1038        serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1039            expected: "timestamp".into(),
1040            got: "number".into(),
1041            value: datum.to_string(),
1042        })?,
1043
1044        _ => return Err(type_error()),
1045    };
1046
1047    let datetime =
1048        chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1049            expected: "timestamp".into(),
1050            got: "object".into(),
1051            value: datum.to_string(),
1052        })?;
1053
1054    let res = match type_expected {
1055        DataType::Date => {
1056            let naive = datetime.naive_local();
1057            let dt = naive.date();
1058            Some(ScalarImpl::Date(dt.into()))
1059        }
1060        DataType::Time => {
1061            let naive = datetime.naive_local();
1062            let dt = naive.time();
1063            Some(ScalarImpl::Time(dt.into()))
1064        }
1065        DataType::Timestamp => {
1066            let naive = datetime.naive_local();
1067            let dt = Timestamp::from(naive);
1068            Some(ScalarImpl::Timestamp(dt))
1069        }
1070        DataType::Timestamptz => {
1071            let dt = datetime.into();
1072            Some(ScalarImpl::Timestamptz(dt))
1073        }
1074        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1075    };
1076    Ok(res)
1077}
1078
1079fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1080    // according to mongodb extended json v2
1081    // the date could be:
1082    //
1083    // the timestamp type could be:
1084    //
1085    // both Canonical and Relaxed format:
1086    // {"$timestamp": {"t": 1630454400, "i": 1}}
1087    // t is the number of seconds since the Unix epoch
1088    //
1089    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1090    // date is encoded as number of milliseconds since the Unix epoch
1091    //
1092    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1093    // date is encoded as ISO8601 string
1094    //
1095    // *For now, we support the Canonical format only.*
1096
1097    let Some(obj) = bson_doc["$timestamp"].as_object() else {
1098        return Err(AccessError::TypeError {
1099            expected: "timestamp".into(),
1100            got: "object".into(),
1101            value: bson_doc.to_string(),
1102        });
1103    };
1104
1105    if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1106    {
1107        return Err(AccessError::TypeError {
1108            expected: "timestamp with valid seconds since epoch".into(),
1109            got: "object".into(),
1110            value: bson_doc.to_string(),
1111        });
1112    }
1113
1114    let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1115        expected: "timestamp with valid seconds since epoch".into(),
1116        got: "object".into(),
1117        value: bson_doc.to_string(),
1118    })?;
1119
1120    let chrono_datetime =
1121        chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1122            expected: type_expected.to_string(),
1123            got: "object".to_owned(),
1124            value: bson_doc.to_string(),
1125        })?;
1126
1127    let res = match type_expected {
1128        DataType::Date => {
1129            let naive = chrono_datetime.naive_local();
1130            let dt = naive.date();
1131            Some(ScalarImpl::Date(dt.into()))
1132        }
1133        DataType::Time => {
1134            let naive = chrono_datetime.naive_local();
1135            let dt = naive.time();
1136            Some(ScalarImpl::Time(dt.into()))
1137        }
1138        DataType::Timestamp => {
1139            let naive = chrono_datetime.naive_local();
1140            let dt = Timestamp::from(naive);
1141            Some(ScalarImpl::Timestamp(dt))
1142        }
1143        DataType::Timestamptz => {
1144            let dt = chrono_datetime.into();
1145            Some(ScalarImpl::Timestamptz(dt))
1146        }
1147        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1148    };
1149
1150    Ok(res)
1151}
1152
1153impl<A> MongoJsonAccess<A> {
1154    pub fn new(accessor: A, strong_schema: bool) -> Self {
1155        Self {
1156            accessor,
1157            strong_schema,
1158        }
1159    }
1160}
1161
1162impl<A> Access for MongoJsonAccess<A>
1163where
1164    A: Access,
1165{
1166    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1167        match path {
1168            ["after" | "before", "_id"] => {
1169                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1170                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1171                    Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1172                } else {
1173                    // fail to extract the "_id" field from the message payload
1174                    Err(AccessError::Undefined {
1175                        name: "_id".to_owned(),
1176                        path: path[0].to_owned(),
1177                    })?
1178                }
1179            }
1180
1181            ["after" | "before", "payload"] if !self.strong_schema => {
1182                self.access(&[path[0]], &DataType::Jsonb)
1183            }
1184
1185            ["after" | "before", field] if self.strong_schema => {
1186                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1187                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1188                    Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1189                } else {
1190                    // fail to extract the expected field from the message payload
1191                    Err(AccessError::Undefined {
1192                        name: field.to_string(),
1193                        path: path[0].to_owned(),
1194                    })?
1195                }
1196            }
1197
1198            // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
1199            // In addition, the "_id" field is named as "id" in the key. An example of message key:
1200            // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
1201            ["_id"] => {
1202                let ret = self.accessor.access(path, type_expected);
1203                if matches!(ret, Err(AccessError::Undefined { .. })) {
1204                    let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1205                    if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1206                        Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1207                    } else {
1208                        // fail to extract the "_id" field from the message key
1209                        Err(AccessError::Undefined {
1210                            name: "_id".to_owned(),
1211                            path: "id".to_owned(),
1212                        })?
1213                    }
1214                } else {
1215                    ret
1216                }
1217            }
1218            _ => self.accessor.access(path, type_expected),
1219        }
1220    }
1221}