Skip to main content

risingwave_connector/parser/unified/
debezium.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::id::SourceId;
20use risingwave_common::types::{
21    DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
22    Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
23};
24use risingwave_connector_codec::decoder::AccessExt;
25use risingwave_pb::plan_common::additional_column::ColumnType;
26use thiserror_ext::AsReport;
27
28use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
29
30/// JDBC type constants found in Debezium schema change events.
31mod debezium_sql_types {
32    pub const STRUCT: i32 = 2002;
33    pub const ARRAY: i32 = 2003;
34}
35use crate::connector_common::{create_pg_client_from_properties, discover_pgvector_dimensions};
36use crate::parser::TransactionControl;
37use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
38use crate::parser::schema_change::TableChangeType;
39use crate::source::cdc::build_cdc_table_id;
40use crate::source::cdc::external::mysql::{
41    mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
42};
43use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
44use crate::source::{ConnectorProperties, SourceColumnDesc};
45
46/// Parse Debezium `tableChanges[].id` into `(schema_name, table_name)`.
47///
48/// Input examples observed in Debezium schema-change events:
49/// - `"public"."orders"` (quoted format)
50/// - `public.orders` (plain dotted format)
51/// - `db.public.orders` (database-prefixed dotted format)
52///
53/// Why two parsing branches:
54/// - Different Debezium versions/connectors may emit quoted or plain identifiers.
55/// - We normalize both forms so downstream lookup can consistently query upstream catalogs.
56///
57/// Output:
58/// - `Some((schema, table))` when both schema and table are successfully extracted.
59/// - `None` when the id is malformed or does not contain enough segments.
60fn parse_schema_table_from_debezium_id(id: &str) -> Option<(String, String)> {
61    let trimmed = id.trim();
62    if trimmed.contains("\".\"") {
63        let parts = trimmed
64            .split("\".\"")
65            .map(|s| s.trim_matches('"').trim())
66            .collect_vec();
67        if parts.len() >= 2 {
68            let schema = parts[parts.len() - 2].to_owned();
69            let table = parts[parts.len() - 1].to_owned();
70            if !schema.is_empty() && !table.is_empty() {
71                return Some((schema, table));
72            }
73        }
74    }
75
76    let cleaned = trimmed.trim_matches('"');
77    let parts = cleaned.split('.').collect_vec();
78    if parts.len() >= 2 {
79        let schema = parts[parts.len() - 2].trim().to_owned();
80        let table = parts[parts.len() - 1].trim().to_owned();
81        if !schema.is_empty() && !table.is_empty() {
82            return Some((schema, table));
83        }
84    }
85    None
86}
87
88async fn fetch_pgvector_dimensions_for_table(
89    connector_props: &ConnectorProperties,
90    schema: &str,
91    table: &str,
92) -> AccessResult<std::collections::HashMap<String, usize>> {
93    let ConnectorProperties::PostgresCdc(cdc_props) = connector_props else {
94        return Ok(std::collections::HashMap::new());
95    };
96
97    let client = create_pg_client_from_properties(&cdc_props.properties, None)
98        .await
99        .map_err(|err| AccessError::Uncategorized {
100            message: format!(
101                "failed to connect upstream postgres for schema change lookup: {}",
102                err.as_report()
103            ),
104        })?;
105
106    discover_pgvector_dimensions(&client, schema, table)
107        .await
108        .map_err(|err| AccessError::Uncategorized {
109            message: format!(
110                "failed to query upstream postgres schema for {schema}.{table}: {}",
111                err.as_report()
112            ),
113        })
114}
115
116/// Decide whether an unknown array column type (one not resolved by
117/// `type_name_to_pg_type`) can be represented as `varchar[]` in RW. This is the
118/// single question this function answers; the criteria behind it may grow over
119/// time.
120///
121/// Right now the answer is "yes iff the array's element type is a user-defined
122/// enum". Debezium does not expose element enum metadata on the array column
123/// (the `enumValues` field is only set for scalar enum columns), so we ask the
124/// upstream catalog directly: follow the array type's `typelem` to its element
125/// type and check `typtype = 'e'`. Enum values are plain text, hence `varchar[]`.
126///
127/// TODO(composite): arrays of composite types (`typtype = 'c'`) should likewise
128/// map to `varchar[]`. That is deferred to a follow-up PR — the snapshot/streaming
129/// side for composite arrays is handled in #25818, and this predicate should be
130/// extended to `typtype IN ('e', 'c')` once that lands.
131async fn can_fallback_array_to_varchar(
132    connector_props: &ConnectorProperties,
133    array_type_name: &str,
134) -> AccessResult<bool> {
135    let ConnectorProperties::PostgresCdc(cdc_props) = connector_props else {
136        return Ok(false);
137    };
138
139    let client = create_pg_client_from_properties(&cdc_props.properties, None)
140        .await
141        .map_err(|err| AccessError::Uncategorized {
142            message: format!(
143                "failed to connect upstream postgres for schema change lookup: {}",
144                err.as_report()
145            ),
146        })?;
147
148    let row = client
149        .query_opt(
150            "SELECT t_elem.typtype = 'e' \
151             FROM pg_type t \
152             JOIN pg_type t_elem ON t.typelem = t_elem.oid \
153             WHERE t.typname = $1 \
154             LIMIT 1",
155            &[&array_type_name],
156        )
157        .await
158        .map_err(|err| AccessError::Uncategorized {
159            message: format!(
160                "failed to query upstream postgres for array element type of `{array_type_name}`: {}",
161                err.as_report()
162            ),
163        })?;
164
165    Ok(row.map(|r| r.get::<_, bool>(0)).unwrap_or(false))
166}
167
168// Example of Debezium JSON value:
169// {
170//     "payload":
171//     {
172//         "before": null,
173//         "after":
174//         {
175//             "O_ORDERKEY": 5,
176//             "O_CUSTKEY": 44485,
177//             "O_ORDERSTATUS": "F",
178//             "O_TOTALPRICE": "144659.20",
179//             "O_ORDERDATE": "1994-07-30"
180//         },
181//         "source":
182//         {
183//             "version": "1.9.7.Final",
184//             "connector": "mysql",
185//             "name": "RW_CDC_1002",
186//             "ts_ms": 1695277757000,
187//             "db": "mydb",
188//             "sequence": null,
189//             "table": "orders",
190//             "server_id": 0,
191//             "gtid": null,
192//             "file": "binlog.000008",
193//             "pos": 3693,
194//             "row": 0,
195//         },
196//         "op": "r",
197//         "ts_ms": 1695277757017,
198//         "transaction": null
199//     }
200// }
201pub struct DebeziumChangeEvent<A> {
202    value_accessor: Option<A>,
203    key_accessor: Option<A>,
204    is_mongodb: bool,
205}
206
207const BEFORE: &str = "before";
208const AFTER: &str = "after";
209
210const UPSTREAM_DDL: &str = "ddl";
211const SOURCE: &str = "source";
212const SOURCE_TS_MS: &str = "ts_ms";
213const SOURCE_DB: &str = "db";
214const SOURCE_SCHEMA: &str = "schema";
215const SOURCE_TABLE: &str = "table";
216const SOURCE_COLLECTION: &str = "collection";
217
218const OP: &str = "op";
219pub const TRANSACTION_STATUS: &str = "status";
220pub const TRANSACTION_ID: &str = "id";
221
222pub const TABLE_CHANGES: &str = "tableChanges";
223
224pub const DEBEZIUM_READ_OP: &str = "r";
225pub const DEBEZIUM_CREATE_OP: &str = "c";
226pub const DEBEZIUM_UPDATE_OP: &str = "u";
227pub const DEBEZIUM_DELETE_OP: &str = "d";
228
229pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
230pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
231
232pub fn parse_transaction_meta(
233    accessor: &impl Access,
234    connector_props: &ConnectorProperties,
235) -> AccessResult<TransactionControl> {
236    if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
237        accessor
238            .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
239            .to_datum_ref(),
240        accessor
241            .access(&[TRANSACTION_ID], &DataType::Varchar)?
242            .to_datum_ref(),
243    ) {
244        // The id field has different meanings for different databases:
245        // PG: txID:LSN
246        // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23)
247        // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002)
248        match status {
249            DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
250                ConnectorProperties::PostgresCdc(_) => {
251                    let (tx_id, _) = id.split_once(':').unwrap();
252                    return Ok(TransactionControl::Begin { id: tx_id.into() });
253                }
254                ConnectorProperties::MysqlCdc(_) => {
255                    return Ok(TransactionControl::Begin { id: id.into() });
256                }
257                ConnectorProperties::SqlServerCdc(_) => {
258                    return Ok(TransactionControl::Begin { id: id.into() });
259                }
260                _ => {}
261            },
262            DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
263                ConnectorProperties::PostgresCdc(_) => {
264                    let (tx_id, _) = id.split_once(':').unwrap();
265                    return Ok(TransactionControl::Commit { id: tx_id.into() });
266                }
267                ConnectorProperties::MysqlCdc(_) => {
268                    return Ok(TransactionControl::Commit { id: id.into() });
269                }
270                ConnectorProperties::SqlServerCdc(_) => {
271                    return Ok(TransactionControl::Commit { id: id.into() });
272                }
273                _ => {}
274            },
275            _ => {}
276        }
277    }
278
279    Err(AccessError::Undefined {
280        name: "transaction status".into(),
281        path: TRANSACTION_STATUS.into(),
282    })
283}
284
285macro_rules! jsonb_access_field {
286    ($col:expr, $field:expr, $as_type:tt) => {
287        $crate::paste! {
288            $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
289        }
290    };
291}
292
293/// Parse the schema change message from Debezium.
294/// The layout of MySQL schema change message can refer to
295/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
296pub async fn parse_schema_change(
297    accessor: &impl Access,
298    source_id: SourceId,
299    source_name: &str,
300    connector_props: &ConnectorProperties,
301) -> AccessResult<SchemaChangeEnvelope> {
302    let mut schema_changes = vec![];
303    let mut pgvector_dims_cache: std::collections::HashMap<
304        (String, String),
305        std::collections::HashMap<String, usize>,
306    > = std::collections::HashMap::new();
307
308    let upstream_ddl: String = accessor
309        .access(&[UPSTREAM_DDL], &DataType::Varchar)?
310        .to_owned_datum()
311        .unwrap()
312        .as_utf8()
313        .to_string();
314
315    if let Some(ScalarRefImpl::List(table_changes)) = accessor
316        .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
317        .to_datum_ref()
318    {
319        for datum in table_changes.iter() {
320            let jsonb = match datum {
321                Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
322                _ => unreachable!(""),
323            };
324            let id: String = jsonb_access_field!(jsonb, "id", string);
325            let ty = jsonb_access_field!(jsonb, "type", string);
326
327            let table_name = id.trim_matches('"').to_owned();
328            let ddl_type: TableChangeType = ty.as_str().into();
329            if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
330                tracing::debug!("skip table schema change for create/drop command");
331                continue;
332            }
333
334            let mut column_descs: Vec<ColumnDesc> = vec![];
335            if let Some(table) = jsonb.access_object_field("table")
336                && let Some(columns) = table.access_object_field("columns")
337            {
338                for col in columns.array_elements().unwrap() {
339                    let name = jsonb_access_field!(col, "name", string);
340                    let type_name = jsonb_access_field!(col, "typeName", string);
341                    // User-defined types (enum, composite) are not in the
342                    // `type_name_to_pg_type` whitelist because their type names are
343                    // user-chosen. We detect them via Debezium metadata instead:
344                    //  - Enum: has a non-null `enumValues` field in the column descriptor.
345                    //  - Composite (STRUCT): identified by `jdbcType == 2002`.
346                    // Both are mapped to Varchar — enum values are plain strings, and
347                    // composite values are converted to text by our CustomConverter.
348                    let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
349                    let jdbc_type = col
350                        .access_object_field("jdbcType")
351                        .and_then(|v| v.as_number().ok())
352                        .map(|n| n.0 as i32);
353                    let is_composite = jdbc_type == Some(debezium_sql_types::STRUCT);
354
355                    let data_type = match *connector_props {
356                        ConnectorProperties::PostgresCdc(_) => {
357                            if is_composite || is_enum {
358                                tracing::debug!(target: "auto_schema_change",
359                                    "Convert PostgreSQL user-defined type '{}' ({}) to VARCHAR",
360                                    type_name,
361                                    if is_composite { "composite" } else { "enum" });
362                                DataType::Varchar
363                            } else if type_name.eq_ignore_ascii_case("vector") {
364                                let Some((schema_name, table_name_only)) =
365                                    parse_schema_table_from_debezium_id(id.as_str())
366                                else {
367                                    return Err(AccessError::CdcAutoSchemaChangeError {
368                                        ty: type_name,
369                                        table_name: format!("{}.{}", source_name, table_name),
370                                    });
371                                };
372
373                                // Cache by normalized `(schema, table)` tuple to avoid split/join churn on id text.
374                                let cache_key = (schema_name, table_name_only);
375                                if !pgvector_dims_cache.contains_key(&cache_key) {
376                                    let fetched = fetch_pgvector_dimensions_for_table(
377                                        connector_props,
378                                        &cache_key.0,
379                                        &cache_key.1,
380                                    )
381                                    .await?;
382                                    pgvector_dims_cache.insert(cache_key.clone(), fetched);
383                                }
384
385                                match pgvector_dims_cache
386                                    .get(&cache_key)
387                                    .and_then(|m| m.get(name.as_str()).copied())
388                                {
389                                    Some(dim) if (1..=DataType::VEC_MAX_SIZE).contains(&dim) => {
390                                        DataType::Vector(dim)
391                                    }
392                                    _ => {
393                                        // No fallback to VARCHAR: a dimension-less vector cannot be mapped to
394                                        // RW's required `vector(n)` type safely.
395                                        return Err(AccessError::CdcAutoSchemaChangeError {
396                                            ty: type_name,
397                                            table_name: format!("{}.{}", source_name, table_name),
398                                        });
399                                    }
400                                }
401                            } else {
402                                // Resolve builtin types first, so arrays of builtin element
403                                // types keep their proper element type (e.g. `_int4` -> int[],
404                                // `_text` -> text[]). `type_name_to_pg_type` only covers PG
405                                // builtins (the `PgType` it returns carries a static OID);
406                                // user-defined and extension types are not representable here
407                                // and fall through to the catalog lookup below.
408                                let ty = type_name_to_pg_type(type_name.as_str());
409                                match ty {
410                                    Some(ty) => match pg_type_to_rw_type(&ty) {
411                                        Ok(data_type) => data_type,
412                                        Err(err) => {
413                                            tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
414                                            return Err(AccessError::CdcAutoSchemaChangeError {
415                                                ty: type_name,
416                                                table_name: format!(
417                                                    "{}.{}",
418                                                    source_name, table_name
419                                                ),
420                                            });
421                                        }
422                                    },
423                                    // An unrecognized ARRAY type may be an array of a
424                                    // user-defined enum (e.g. `_mood_enum`). Debezium carries
425                                    // no enum metadata on the array column, so ask upstream
426                                    // whether the element is an enum; if so, map to
427                                    // `varchar[]` (enum values are plain text).
428                                    None if jdbc_type == Some(debezium_sql_types::ARRAY)
429                                        && can_fallback_array_to_varchar(
430                                            connector_props,
431                                            type_name.as_str(),
432                                        )
433                                        .await? =>
434                                    {
435                                        tracing::debug!(target: "auto_schema_change",
436                                            "Fall back PostgreSQL array type '{}' to VARCHAR[]",
437                                            type_name);
438                                        DataType::Varchar.list()
439                                    }
440                                    None => {
441                                        return Err(AccessError::CdcAutoSchemaChangeError {
442                                            ty: type_name,
443                                            table_name: format!("{}.{}", source_name, table_name),
444                                        });
445                                    }
446                                }
447                            }
448                        }
449                        ConnectorProperties::MysqlCdc(_) => {
450                            let ty = type_name_to_mysql_type(type_name.as_str());
451                            match ty {
452                                Some(ty) => match mysql_type_to_rw_type(&ty) {
453                                    Ok(data_type) => data_type,
454                                    Err(err) => {
455                                        tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
456                                        return Err(AccessError::CdcAutoSchemaChangeError {
457                                            ty: type_name,
458                                            table_name: format!("{}.{}", source_name, table_name),
459                                        });
460                                    }
461                                },
462                                None => {
463                                    return Err(AccessError::CdcAutoSchemaChangeError {
464                                        ty: type_name,
465                                        table_name: format!("{}.{}", source_name, table_name),
466                                    });
467                                }
468                            }
469                        }
470                        _ => {
471                            unreachable!()
472                        }
473                    };
474
475                    // Handle default value expression. Non-constant defaults (`now()`,
476                    // `gen_random_uuid()`, `nextval('seq'::regclass)` for BIGSERIAL, etc.)
477                    // will fail `ScalarImpl::from_text` and fall through to the fail-open
478                    // branch below — the column is added without a default and a warning is
479                    // logged, rather than aborting the whole auto schema change.
480                    //
481                    // TODO: the schema change event carries the **full** set of columns of
482                    // the table, so here we cannot tell "columns newly added by this ALTER"
483                    // from "columns that already existed". A better approach would be to
484                    // only process the delta columns that actually changed in this event,
485                    // which would let us precisely tell the user: which columns have
486                    // existing rows filled with NULL (needs attention), and which ones are
487                    // pre-existing columns merely surfaced by the event (harmless, can be
488                    // ignored silently).
489                    let column_desc = match col.access_object_field("defaultValueExpression") {
490                        Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
491                            let default_val_expr_str = default_val_expr_str.as_str().unwrap();
492                            let value_text: Option<String>;
493                            match *connector_props {
494                                ConnectorProperties::PostgresCdc(_) => {
495                                    // default value of non-number data type will be stored as
496                                    // "'value'::type"
497                                    match default_val_expr_str
498                                        .split("::")
499                                        .map(|s| s.trim_matches('\''))
500                                        .next()
501                                    {
502                                        None => {
503                                            value_text = None;
504                                        }
505                                        Some(val_text) => {
506                                            value_text = Some(val_text.to_owned());
507                                        }
508                                    }
509                                }
510                                ConnectorProperties::MysqlCdc(_) => {
511                                    // mysql timestamp is mapped to timestamptz, we use UTC timezone to
512                                    // interpret its value
513                                    if data_type == DataType::Timestamptz {
514                                        value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
515                                            tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
516                                            AccessError::TypeError {
517                                                expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
518                                                got: data_type.to_string(),
519                                                value: default_val_expr_str.to_owned(),
520                                            }
521                                        })?);
522                                    } else {
523                                        value_text = Some(default_val_expr_str.to_owned());
524                                    }
525                                }
526                                _ => {
527                                    unreachable!("connector doesn't support schema change")
528                                }
529                            }
530
531                            let snapshot_value: Datum = value_text.and_then(|value_text| {
532                                ScalarImpl::from_text(value_text.as_str(), &data_type)
533                                    .inspect_err(|err| {
534                                        tracing::warn!(
535                                            target: "auto_schema_change",
536                                            error = %err.as_report(),
537                                            column = %name,
538                                            data_type = %data_type,
539                                            default_value_expression = default_val_expr_str,
540                                            upstream_ddl = %upstream_ddl,
541                                            "non-constant default expression, column added without default. \
542                                             If this column is not newly added by this schema change, it is safe to ignore this warning. \
543                                             If this column is newly added by this schema change, existing rows will be NULL in this column — consider using COALESCE in queries to provide a fallback value."
544                                        );
545                                    })
546                                    .ok()
547                            });
548
549                            if snapshot_value.is_none() {
550                                ColumnDesc::named(name, ColumnId::placeholder(), data_type)
551                            } else {
552                                ColumnDesc::named_with_default_value(
553                                    name,
554                                    ColumnId::placeholder(),
555                                    data_type,
556                                    snapshot_value,
557                                )
558                            }
559                        }
560                        _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
561                    };
562                    column_descs.push(column_desc);
563                }
564            }
565
566            // concatenate the source_id to the cdc_table_id
567            let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
568            schema_changes.push(TableSchemaChange {
569                cdc_table_id,
570                columns: column_descs
571                    .into_iter()
572                    .map(|column_desc| ColumnCatalog {
573                        column_desc,
574                        is_hidden: false,
575                    })
576                    .collect_vec(),
577                change_type: ty.as_str().into(),
578                upstream_ddl: upstream_ddl.clone(),
579            });
580        }
581
582        Ok(SchemaChangeEnvelope {
583            table_changes: schema_changes,
584        })
585    } else {
586        Err(AccessError::Undefined {
587            name: "table schema change".into(),
588            path: TABLE_CHANGES.into(),
589        })
590    }
591}
592
593impl<A> DebeziumChangeEvent<A>
594where
595    A: Access,
596{
597    /// Panic: one of the `key_accessor` or `value_accessor` must be provided.
598    pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
599        assert!(key_accessor.is_some() || value_accessor.is_some());
600        Self {
601            value_accessor,
602            key_accessor,
603            is_mongodb: false,
604        }
605    }
606
607    pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
608        assert!(key_accessor.is_some() || value_accessor.is_some());
609        Self {
610            value_accessor,
611            key_accessor,
612            is_mongodb: true,
613        }
614    }
615
616    /// Returns the transaction metadata if exists.
617    ///
618    /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details.
619    pub(crate) fn transaction_control(
620        &self,
621        connector_props: &ConnectorProperties,
622    ) -> Option<TransactionControl> {
623        // Ignore if `value_accessor` is not provided or there's any error when
624        // trying to parse the transaction metadata.
625        self.value_accessor
626            .as_ref()
627            .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
628    }
629}
630
631impl<A> ChangeEvent for DebeziumChangeEvent<A>
632where
633    A: Access,
634{
635    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
636        match self.op()? {
637            ChangeEventOperation::Delete => {
638                // For delete events of MongoDB, the "before" and "after" field both are null in the value,
639                // we need to extract the _id field from the key.
640                if self.is_mongodb && desc.name == "_id" {
641                    return self
642                        .key_accessor
643                        .as_ref()
644                        .expect("key_accessor must be provided for delete operation")
645                        .access(&[&desc.name], &desc.data_type);
646                }
647
648                if let Some(va) = self.value_accessor.as_ref() {
649                    va.access(&[BEFORE, &desc.name], &desc.data_type)
650                } else {
651                    self.key_accessor
652                        .as_ref()
653                        .unwrap()
654                        .access(&[&desc.name], &desc.data_type)
655                }
656            }
657
658            // value should not be None.
659            ChangeEventOperation::Upsert => {
660                // For upsert operation, if desc is an additional column, access field in the `SOURCE` field.
661                desc.additional_column.column_type.as_ref().map_or_else(
662                    || {
663                        self.value_accessor
664                            .as_ref()
665                            .expect("value_accessor must be provided for upsert operation")
666                            .access(&[AFTER, &desc.name], &desc.data_type)
667                    },
668                    |additional_column_type| {
669                        match *additional_column_type {
670                            ColumnType::Timestamp(_) => {
671                                // access payload.source.ts_ms
672                                let ts_ms = self
673                                    .value_accessor
674                                    .as_ref()
675                                    .expect("value_accessor must be provided for upsert operation")
676                                    .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
677                                Ok(DatumCow::Owned(ts_ms.map(|scalar| {
678                                    Timestamptz::from_millis(scalar.into_int64())
679                                        .expect("source.ts_ms must in millisecond")
680                                        .to_scalar_value()
681                                })))
682                            }
683                            ColumnType::DatabaseName(_) => self
684                                .value_accessor
685                                .as_ref()
686                                .expect("value_accessor must be provided for upsert operation")
687                                .access(&[SOURCE, SOURCE_DB], &desc.data_type),
688                            ColumnType::SchemaName(_) => self
689                                .value_accessor
690                                .as_ref()
691                                .expect("value_accessor must be provided for upsert operation")
692                                .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
693                            ColumnType::TableName(_) => self
694                                .value_accessor
695                                .as_ref()
696                                .expect("value_accessor must be provided for upsert operation")
697                                .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
698                            ColumnType::CollectionName(_) => self
699                                .value_accessor
700                                .as_ref()
701                                .expect("value_accessor must be provided for upsert operation")
702                                .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
703                            _ => Err(AccessError::UnsupportedAdditionalColumn {
704                                name: desc.name.clone(),
705                            }),
706                        }
707                    },
708                )
709            }
710        }
711    }
712
713    fn op(&self) -> Result<ChangeEventOperation, AccessError> {
714        if let Some(accessor) = &self.value_accessor {
715            if let Some(ScalarRefImpl::Utf8(op)) =
716                accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
717            {
718                match op {
719                    DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
720                        return Ok(ChangeEventOperation::Upsert);
721                    }
722                    DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
723                    _ => (),
724                }
725            }
726            Err(super::AccessError::Undefined {
727                name: "op".into(),
728                path: Default::default(),
729            })
730        } else {
731            Ok(ChangeEventOperation::Delete)
732        }
733    }
734}
735
736/// Access support for Mongo
737///
738/// For now, we considerate `strong_schema` typed `MongoDB` Debezium event jsons only.
739pub struct MongoJsonAccess<A> {
740    accessor: A,
741    strong_schema: bool,
742}
743
744pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
745    let id_field = if let Some(value) = bson_doc.get("_id") {
746        value
747    } else {
748        bson_doc
749    };
750
751    let type_error = || AccessError::TypeError {
752        expected: id_type.to_string(),
753        got: match id_field {
754            serde_json::Value::Null => "null",
755            serde_json::Value::Bool(_) => "bool",
756            serde_json::Value::Number(_) => "number",
757            serde_json::Value::String(_) => "string",
758            serde_json::Value::Array(_) => "array",
759            serde_json::Value::Object(_) => "object",
760        }
761        .to_owned(),
762        value: id_field.to_string(),
763    };
764
765    let id: Datum = match id_type {
766        DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
767        DataType::Varchar => match id_field {
768            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
769            serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
770                obj["$oid"].as_str().unwrap_or_default().into(),
771            )),
772            _ => return Err(type_error()),
773        },
774        DataType::Int32 => {
775            if let serde_json::Value::Object(obj) = id_field
776                && obj.contains_key("$numberInt")
777            {
778                let int_str = obj["$numberInt"].as_str().unwrap_or_default();
779                Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
780            } else {
781                return Err(type_error());
782            }
783        }
784        DataType::Int64 => {
785            if let serde_json::Value::Object(obj) = id_field
786                && obj.contains_key("$numberLong")
787            {
788                let int_str = obj["$numberLong"].as_str().unwrap_or_default();
789                Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
790            } else {
791                return Err(type_error());
792            }
793        }
794        _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
795    };
796    Ok(id)
797}
798
799/// Extract the field data from the bson document
800///
801/// BSON document is a JSON object with some special fields, such as:
802/// long integer: {"$numberLong": "1630454400000"}
803/// date time: {"$date": {"$numberLong": "1630454400000"}}
804///
805/// For now, we support only the Canonical format of the date and timestamp.
806///
807/// # NOTE:
808///
809/// - `field` indicates the field name in the bson document, if it is None, the `bson_doc` is the field itself.
810// similar to extract the "_id" field from the message payload
811pub fn extract_bson_field(
812    type_expected: &DataType,
813    bson_doc: &serde_json::Value,
814    field: Option<&str>,
815) -> AccessResult {
816    let type_error = |datum: &serde_json::Value| AccessError::TypeError {
817        expected: type_expected.to_string(),
818        got: match bson_doc {
819            serde_json::Value::Null => "null",
820            serde_json::Value::Bool(_) => "bool",
821            serde_json::Value::Number(_) => "number",
822            serde_json::Value::String(_) => "string",
823            serde_json::Value::Array(_) => "array",
824            serde_json::Value::Object(_) => "object",
825        }
826        .to_owned(),
827        value: datum.to_string(),
828    };
829
830    let datum = if let Some(field) = field {
831        let Some(bson_doc) = bson_doc.get(field) else {
832            return Err(type_error(bson_doc));
833        };
834        bson_doc
835    } else {
836        bson_doc
837    };
838
839    if datum.is_null() {
840        return Ok(None);
841    }
842
843    let field_datum: Datum = match type_expected {
844        DataType::Boolean => {
845            if datum.is_boolean() {
846                Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
847            } else {
848                return Err(type_error(datum));
849            }
850        }
851        DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
852        DataType::Varchar => match datum {
853            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
854            serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
855                obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
856            }
857            _ => return Err(type_error(datum)),
858        },
859        DataType::Int16
860        | DataType::Int32
861        | DataType::Int64
862        | DataType::Int256
863        | DataType::Float32
864        | DataType::Float64 => {
865            if !datum.is_object() {
866                return Err(type_error(datum));
867            };
868
869            bson_extract_number(datum, type_expected)?
870        }
871
872        DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
873            if let serde_json::Value::Object(mp) = datum {
874                if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
875                    bson_extract_timestamp(datum, type_expected)?
876                } else if mp.contains_key("$date") {
877                    bson_extract_date(datum, type_expected)?
878                } else {
879                    return Err(type_error(datum));
880                }
881            } else {
882                return Err(type_error(datum));
883            }
884        }
885        DataType::Decimal => {
886            if let serde_json::Value::Object(obj) = datum
887                && obj.contains_key("$numberDecimal")
888                && obj["$numberDecimal"].is_string()
889            {
890                let number = obj["$numberDecimal"].as_str().unwrap();
891
892                let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
893                    AccessError::TypeError {
894                        expected: type_expected.to_string(),
895                        got: "unparsable string".into(),
896                        value: number.to_owned(),
897                    }
898                })?;
899                Some(ScalarImpl::Decimal(dec))
900            } else {
901                return Err(type_error(datum));
902            }
903        }
904
905        DataType::Bytea => {
906            if let serde_json::Value::Object(obj) = datum
907                && obj.contains_key("$binary")
908                && obj["$binary"].is_object()
909            {
910                use base64::Engine;
911
912                let binary = obj["$binary"].as_object().unwrap();
913
914                if !binary.contains_key("$base64")
915                    || !binary["$base64"].is_string()
916                    || !binary.contains_key("$subType")
917                    || !binary["$subType"].is_string()
918                {
919                    return Err(AccessError::TypeError {
920                        expected: type_expected.to_string(),
921                        got: "object".into(),
922                        value: datum.to_string(),
923                    });
924                }
925
926                let b64_str = binary["$base64"]
927                    .as_str()
928                    .ok_or_else(|| AccessError::TypeError {
929                        expected: type_expected.to_string(),
930                        got: "object".into(),
931                        value: datum.to_string(),
932                    })?;
933
934                // type is not used for now
935                let _type_str =
936                    binary["$subType"]
937                        .as_str()
938                        .ok_or_else(|| AccessError::TypeError {
939                            expected: type_expected.to_string(),
940                            got: "object".into(),
941                            value: datum.to_string(),
942                        })?;
943
944                let bytes = base64::prelude::BASE64_STANDARD
945                    .decode(b64_str)
946                    .map_err(|_| AccessError::TypeError {
947                        expected: "$binary object with $base64 string and $subType string field"
948                            .to_owned(),
949                        got: "string".to_owned(),
950                        value: bson_doc.to_string(),
951                    })?;
952                let bytea = ScalarImpl::Bytea(bytes.into());
953                Some(bytea)
954            } else {
955                return Err(type_error(datum));
956            }
957        }
958
959        DataType::Struct(struct_fields) => {
960            let mut datums = vec![];
961            for (field_name, field_type) in struct_fields.iter() {
962                let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
963                datums.push(field_datum);
964            }
965            let value = StructValue::new(datums);
966
967            Some(ScalarImpl::Struct(value))
968        }
969
970        DataType::List(list_type) => {
971            let elem_type = list_type.elem();
972            let Some(d_array) = datum.as_array() else {
973                return Err(type_error(datum));
974            };
975
976            let mut builder = elem_type.create_array_builder(d_array.len());
977            for item in d_array {
978                builder.append(extract_bson_field(elem_type, item, None)?);
979            }
980            Some(ScalarImpl::from(ListValue::new(builder.finish())))
981        }
982
983        _ => {
984            if let Some(field_name) = field {
985                unreachable!(
986                    "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
987                )
988            } else {
989                let type_expected = type_expected.to_string();
990                unreachable!(
991                    "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
992                )
993            }
994        }
995    };
996    Ok(field_datum)
997}
998
999fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1000    let field_name = match type_expected {
1001        DataType::Int16 => "$numberInt",
1002        DataType::Int32 => "$numberInt",
1003        DataType::Int64 => "$numberLong",
1004        DataType::Int256 => "$numberLong",
1005        DataType::Float32 => "$numberDouble",
1006        DataType::Float64 => "$numberDouble",
1007        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1008    };
1009
1010    let datum = bson_doc.get(field_name);
1011    if datum.is_none() {
1012        return Err(AccessError::TypeError {
1013            expected: type_expected.to_string(),
1014            got: "object".into(),
1015            value: bson_doc.to_string(),
1016        });
1017    }
1018
1019    let datum = datum.unwrap();
1020
1021    if datum.is_string() {
1022        let Some(num_str) = datum.as_str() else {
1023            return Err(AccessError::TypeError {
1024                expected: type_expected.to_string(),
1025                got: "string".into(),
1026                value: datum.to_string(),
1027            });
1028        };
1029        // parse to float
1030        if [DataType::Float32, DataType::Float64].contains(type_expected) {
1031            match (num_str, type_expected) {
1032                ("Infinity", DataType::Float64) => {
1033                    return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
1034                }
1035                ("Infinity", DataType::Float32) => {
1036                    return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
1037                }
1038                ("-Infinity", DataType::Float64) => {
1039                    return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
1040                }
1041                ("-Infinity", DataType::Float32) => {
1042                    return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
1043                }
1044                ("NaN", DataType::Float64) => {
1045                    return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
1046                }
1047                ("NaN", DataType::Float32) => {
1048                    return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
1049                }
1050                _ => {}
1051            }
1052
1053            let parsed_num: f64 = match num_str.parse() {
1054                Ok(n) => n,
1055                Err(_e) => {
1056                    return Err(AccessError::TypeError {
1057                        expected: type_expected.to_string(),
1058                        got: "string".into(),
1059                        value: num_str.to_owned(),
1060                    });
1061                }
1062            };
1063            if *type_expected == DataType::Float64 {
1064                return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
1065            } else {
1066                let parsed_num = parsed_num as f32;
1067                return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
1068            }
1069        }
1070        // parse to large int
1071        if *type_expected == DataType::Int256 {
1072            let parsed_num = match Int256::from_str(num_str) {
1073                Ok(n) => n,
1074                Err(_) => {
1075                    return Err(AccessError::TypeError {
1076                        expected: type_expected.to_string(),
1077                        got: "string".into(),
1078                        value: num_str.to_owned(),
1079                    });
1080                }
1081            };
1082            return Ok(Some(ScalarImpl::Int256(parsed_num)));
1083        }
1084
1085        // parse to integer
1086        let parsed_num: i64 = match num_str.parse() {
1087            Ok(n) => n,
1088            Err(_e) => {
1089                return Err(AccessError::TypeError {
1090                    expected: type_expected.to_string(),
1091                    got: "string".into(),
1092                    value: num_str.to_owned(),
1093                });
1094            }
1095        };
1096        match type_expected {
1097            DataType::Int16 => {
1098                if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
1099                    return Err(AccessError::TypeError {
1100                        expected: type_expected.to_string(),
1101                        got: "string".into(),
1102                        value: num_str.to_owned(),
1103                    });
1104                }
1105                return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
1106            }
1107            DataType::Int32 => {
1108                if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
1109                    return Err(AccessError::TypeError {
1110                        expected: type_expected.to_string(),
1111                        got: "string".into(),
1112                        value: num_str.to_owned(),
1113                    });
1114                }
1115                return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
1116            }
1117            DataType::Int64 => {
1118                return Ok(Some(ScalarImpl::Int64(parsed_num)));
1119            }
1120            _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1121        }
1122    }
1123    if datum.is_null() {
1124        return Err(AccessError::TypeError {
1125            expected: type_expected.to_string(),
1126            got: "null".into(),
1127            value: bson_doc.to_string(),
1128        });
1129    }
1130
1131    if datum.is_array() {
1132        return Err(AccessError::TypeError {
1133            expected: type_expected.to_string(),
1134            got: "array".to_owned(),
1135            value: datum.to_string(),
1136        });
1137    }
1138
1139    if datum.is_object() {
1140        return Err(AccessError::TypeError {
1141            expected: type_expected.to_string(),
1142            got: "object".to_owned(),
1143            value: datum.to_string(),
1144        });
1145    }
1146
1147    if datum.is_boolean() {
1148        return Err(AccessError::TypeError {
1149            expected: type_expected.to_string(),
1150            got: "boolean".into(),
1151            value: bson_doc.to_string(),
1152        });
1153    }
1154
1155    if datum.is_number() {
1156        let got_type = if datum.is_f64() { "f64" } else { "i64" };
1157        return Err(AccessError::TypeError {
1158            expected: type_expected.to_string(),
1159            got: got_type.into(),
1160            value: bson_doc.to_string(),
1161        });
1162    }
1163
1164    Err(AccessError::TypeError {
1165        expected: type_expected.to_string(),
1166        got: "unknown".into(),
1167        value: bson_doc.to_string(),
1168    })
1169}
1170
1171fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1172    // according to mongodb extended json v2
1173    // the date could be:
1174    //
1175    // the timestamp type could be:
1176    //
1177    // both Canonical and Relaxed format:
1178    // {"$timestamp": {"t": 1630454400, "i": 1}}
1179    //
1180    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1181    // date is encoded as number of milliseconds since the Unix epoch
1182    //
1183    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1184    // date is encoded as ISO8601 string
1185
1186    let datum = &bson_doc["$date"];
1187
1188    let type_error = || AccessError::TypeError {
1189        expected: type_expected.to_string(),
1190        got: match bson_doc {
1191            serde_json::Value::Null => "null",
1192            serde_json::Value::Bool(_) => "bool",
1193            serde_json::Value::Number(_) => "number",
1194            serde_json::Value::String(_) => "string",
1195            serde_json::Value::Array(_) => "array",
1196            serde_json::Value::Object(_) => "object",
1197        }
1198        .to_owned(),
1199        value: datum.to_string(),
1200    };
1201
1202    // deal with the Canonical format only
1203    let millis = match datum {
1204        // Canonical format {"$date": {"$numberLong": "1630454400000"}}
1205        serde_json::Value::Object(obj)
1206            if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1207        {
1208            obj["$numberLong"]
1209                .as_str()
1210                .unwrap()
1211                .parse::<i64>()
1212                .map_err(|_| AccessError::TypeError {
1213                    expected: "timestamp".into(),
1214                    got: "object".into(),
1215                    value: datum.to_string(),
1216                })?
1217        }
1218        // Relaxed format {"$date": "2021-09-01T00:00:00.000Z"}
1219        serde_json::Value::String(s) => {
1220            let dt =
1221                chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1222                    expected: "valid ISO-8601 date string".into(),
1223                    got: "string".into(),
1224                    value: datum.to_string(),
1225                })?;
1226            dt.timestamp_millis()
1227        }
1228
1229        // jsonv1 format
1230        // {"$date": 1630454400000}
1231        serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1232            expected: "timestamp".into(),
1233            got: "number".into(),
1234            value: datum.to_string(),
1235        })?,
1236
1237        _ => return Err(type_error()),
1238    };
1239
1240    let datetime =
1241        chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1242            expected: "timestamp".into(),
1243            got: "object".into(),
1244            value: datum.to_string(),
1245        })?;
1246
1247    let res = match type_expected {
1248        DataType::Date => {
1249            let naive = datetime.naive_local();
1250            let dt = naive.date();
1251            Some(ScalarImpl::Date(dt.into()))
1252        }
1253        DataType::Time => {
1254            let naive = datetime.naive_local();
1255            let dt = naive.time();
1256            Some(ScalarImpl::Time(dt.into()))
1257        }
1258        DataType::Timestamp => {
1259            let naive = datetime.naive_local();
1260            let dt = Timestamp::from(naive);
1261            Some(ScalarImpl::Timestamp(dt))
1262        }
1263        DataType::Timestamptz => {
1264            let dt = datetime.into();
1265            Some(ScalarImpl::Timestamptz(dt))
1266        }
1267        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1268    };
1269    Ok(res)
1270}
1271
1272fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1273    // according to mongodb extended json v2
1274    // the date could be:
1275    //
1276    // the timestamp type could be:
1277    //
1278    // both Canonical and Relaxed format:
1279    // {"$timestamp": {"t": 1630454400, "i": 1}}
1280    // t is the number of seconds since the Unix epoch
1281    //
1282    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1283    // date is encoded as number of milliseconds since the Unix epoch
1284    //
1285    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1286    // date is encoded as ISO8601 string
1287    //
1288    // *For now, we support the Canonical format only.*
1289
1290    let Some(obj) = bson_doc["$timestamp"].as_object() else {
1291        return Err(AccessError::TypeError {
1292            expected: "timestamp".into(),
1293            got: "object".into(),
1294            value: bson_doc.to_string(),
1295        });
1296    };
1297
1298    if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1299    {
1300        return Err(AccessError::TypeError {
1301            expected: "timestamp with valid seconds since epoch".into(),
1302            got: "object".into(),
1303            value: bson_doc.to_string(),
1304        });
1305    }
1306
1307    let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1308        expected: "timestamp with valid seconds since epoch".into(),
1309        got: "object".into(),
1310        value: bson_doc.to_string(),
1311    })?;
1312
1313    let chrono_datetime =
1314        chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1315            expected: type_expected.to_string(),
1316            got: "object".to_owned(),
1317            value: bson_doc.to_string(),
1318        })?;
1319
1320    let res = match type_expected {
1321        DataType::Date => {
1322            let naive = chrono_datetime.naive_local();
1323            let dt = naive.date();
1324            Some(ScalarImpl::Date(dt.into()))
1325        }
1326        DataType::Time => {
1327            let naive = chrono_datetime.naive_local();
1328            let dt = naive.time();
1329            Some(ScalarImpl::Time(dt.into()))
1330        }
1331        DataType::Timestamp => {
1332            let naive = chrono_datetime.naive_local();
1333            let dt = Timestamp::from(naive);
1334            Some(ScalarImpl::Timestamp(dt))
1335        }
1336        DataType::Timestamptz => {
1337            let dt = chrono_datetime.into();
1338            Some(ScalarImpl::Timestamptz(dt))
1339        }
1340        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1341    };
1342
1343    Ok(res)
1344}
1345
1346impl<A> MongoJsonAccess<A> {
1347    pub fn new(accessor: A, strong_schema: bool) -> Self {
1348        Self {
1349            accessor,
1350            strong_schema,
1351        }
1352    }
1353}
1354
1355impl<A> Access for MongoJsonAccess<A>
1356where
1357    A: Access,
1358{
1359    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1360        match path {
1361            ["after" | "before", "_id"] => {
1362                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1363                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1364                    Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1365                } else {
1366                    // fail to extract the "_id" field from the message payload
1367                    Err(AccessError::Undefined {
1368                        name: "_id".to_owned(),
1369                        path: path[0].to_owned(),
1370                    })?
1371                }
1372            }
1373
1374            ["after" | "before", "payload"] if !self.strong_schema => {
1375                self.access(&[path[0]], &DataType::Jsonb)
1376            }
1377
1378            ["after" | "before", field] if self.strong_schema => {
1379                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1380                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1381                    Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1382                } else {
1383                    // fail to extract the expected field from the message payload
1384                    Err(AccessError::Undefined {
1385                        name: field.to_string(),
1386                        path: path[0].to_owned(),
1387                    })?
1388                }
1389            }
1390
1391            // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
1392            // In addition, the "_id" field is named as "id" in the key. An example of message key:
1393            // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
1394            ["_id"] => {
1395                let ret = self.accessor.access(path, type_expected);
1396                if matches!(ret, Err(AccessError::Undefined { .. })) {
1397                    let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1398                    if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1399                        Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1400                    } else {
1401                        // fail to extract the "_id" field from the message key
1402                        Err(AccessError::Undefined {
1403                            name: "_id".to_owned(),
1404                            path: "id".to_owned(),
1405                        })?
1406                    }
1407                } else {
1408                    ret
1409                }
1410            }
1411            _ => self.accessor.access(path, type_expected),
1412        }
1413    }
1414}