risingwave_connector/parser/unified/
debezium.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::id::SourceId;
20use risingwave_common::types::{
21    DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
22    Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
23};
24use risingwave_connector_codec::decoder::AccessExt;
25use risingwave_pb::plan_common::additional_column::ColumnType;
26use thiserror_ext::AsReport;
27
28use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
29
30/// JDBC type constants found in Debezium schema change events.
31mod debezium_sql_types {
32    pub const STRUCT: i32 = 2002;
33}
34use crate::connector_common::{create_pg_client_from_properties, discover_pgvector_dimensions};
35use crate::parser::TransactionControl;
36use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
37use crate::parser::schema_change::TableChangeType;
38use crate::source::cdc::build_cdc_table_id;
39use crate::source::cdc::external::mysql::{
40    mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
41};
42use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
43use crate::source::{ConnectorProperties, SourceColumnDesc};
44
45/// Parse Debezium `tableChanges[].id` into `(schema_name, table_name)`.
46///
47/// Input examples observed in Debezium schema-change events:
48/// - `"public"."orders"` (quoted format)
49/// - `public.orders` (plain dotted format)
50/// - `db.public.orders` (database-prefixed dotted format)
51///
52/// Why two parsing branches:
53/// - Different Debezium versions/connectors may emit quoted or plain identifiers.
54/// - We normalize both forms so downstream lookup can consistently query upstream catalogs.
55///
56/// Output:
57/// - `Some((schema, table))` when both schema and table are successfully extracted.
58/// - `None` when the id is malformed or does not contain enough segments.
59fn parse_schema_table_from_debezium_id(id: &str) -> Option<(String, String)> {
60    let trimmed = id.trim();
61    if trimmed.contains("\".\"") {
62        let parts = trimmed
63            .split("\".\"")
64            .map(|s| s.trim_matches('"').trim())
65            .collect_vec();
66        if parts.len() >= 2 {
67            let schema = parts[parts.len() - 2].to_owned();
68            let table = parts[parts.len() - 1].to_owned();
69            if !schema.is_empty() && !table.is_empty() {
70                return Some((schema, table));
71            }
72        }
73    }
74
75    let cleaned = trimmed.trim_matches('"');
76    let parts = cleaned.split('.').collect_vec();
77    if parts.len() >= 2 {
78        let schema = parts[parts.len() - 2].trim().to_owned();
79        let table = parts[parts.len() - 1].trim().to_owned();
80        if !schema.is_empty() && !table.is_empty() {
81            return Some((schema, table));
82        }
83    }
84    None
85}
86
87async fn fetch_pgvector_dimensions_for_table(
88    connector_props: &ConnectorProperties,
89    schema: &str,
90    table: &str,
91) -> AccessResult<std::collections::HashMap<String, usize>> {
92    let ConnectorProperties::PostgresCdc(cdc_props) = connector_props else {
93        return Ok(std::collections::HashMap::new());
94    };
95
96    let client = create_pg_client_from_properties(&cdc_props.properties, None)
97        .await
98        .map_err(|err| AccessError::Uncategorized {
99            message: format!(
100                "failed to connect upstream postgres for schema change lookup: {}",
101                err.as_report()
102            ),
103        })?;
104
105    discover_pgvector_dimensions(&client, schema, table)
106        .await
107        .map_err(|err| AccessError::Uncategorized {
108            message: format!(
109                "failed to query upstream postgres schema for {schema}.{table}: {}",
110                err.as_report()
111            ),
112        })
113}
114
115// Example of Debezium JSON value:
116// {
117//     "payload":
118//     {
119//         "before": null,
120//         "after":
121//         {
122//             "O_ORDERKEY": 5,
123//             "O_CUSTKEY": 44485,
124//             "O_ORDERSTATUS": "F",
125//             "O_TOTALPRICE": "144659.20",
126//             "O_ORDERDATE": "1994-07-30"
127//         },
128//         "source":
129//         {
130//             "version": "1.9.7.Final",
131//             "connector": "mysql",
132//             "name": "RW_CDC_1002",
133//             "ts_ms": 1695277757000,
134//             "db": "mydb",
135//             "sequence": null,
136//             "table": "orders",
137//             "server_id": 0,
138//             "gtid": null,
139//             "file": "binlog.000008",
140//             "pos": 3693,
141//             "row": 0,
142//         },
143//         "op": "r",
144//         "ts_ms": 1695277757017,
145//         "transaction": null
146//     }
147// }
148pub struct DebeziumChangeEvent<A> {
149    value_accessor: Option<A>,
150    key_accessor: Option<A>,
151    is_mongodb: bool,
152}
153
154const BEFORE: &str = "before";
155const AFTER: &str = "after";
156
157const UPSTREAM_DDL: &str = "ddl";
158const SOURCE: &str = "source";
159const SOURCE_TS_MS: &str = "ts_ms";
160const SOURCE_DB: &str = "db";
161const SOURCE_SCHEMA: &str = "schema";
162const SOURCE_TABLE: &str = "table";
163const SOURCE_COLLECTION: &str = "collection";
164
165const OP: &str = "op";
166pub const TRANSACTION_STATUS: &str = "status";
167pub const TRANSACTION_ID: &str = "id";
168
169pub const TABLE_CHANGES: &str = "tableChanges";
170
171pub const DEBEZIUM_READ_OP: &str = "r";
172pub const DEBEZIUM_CREATE_OP: &str = "c";
173pub const DEBEZIUM_UPDATE_OP: &str = "u";
174pub const DEBEZIUM_DELETE_OP: &str = "d";
175
176pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
177pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
178
179pub fn parse_transaction_meta(
180    accessor: &impl Access,
181    connector_props: &ConnectorProperties,
182) -> AccessResult<TransactionControl> {
183    if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
184        accessor
185            .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
186            .to_datum_ref(),
187        accessor
188            .access(&[TRANSACTION_ID], &DataType::Varchar)?
189            .to_datum_ref(),
190    ) {
191        // The id field has different meanings for different databases:
192        // PG: txID:LSN
193        // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23)
194        // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002)
195        match status {
196            DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
197                ConnectorProperties::PostgresCdc(_) => {
198                    let (tx_id, _) = id.split_once(':').unwrap();
199                    return Ok(TransactionControl::Begin { id: tx_id.into() });
200                }
201                ConnectorProperties::MysqlCdc(_) => {
202                    return Ok(TransactionControl::Begin { id: id.into() });
203                }
204                ConnectorProperties::SqlServerCdc(_) => {
205                    return Ok(TransactionControl::Begin { id: id.into() });
206                }
207                _ => {}
208            },
209            DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
210                ConnectorProperties::PostgresCdc(_) => {
211                    let (tx_id, _) = id.split_once(':').unwrap();
212                    return Ok(TransactionControl::Commit { id: tx_id.into() });
213                }
214                ConnectorProperties::MysqlCdc(_) => {
215                    return Ok(TransactionControl::Commit { id: id.into() });
216                }
217                ConnectorProperties::SqlServerCdc(_) => {
218                    return Ok(TransactionControl::Commit { id: id.into() });
219                }
220                _ => {}
221            },
222            _ => {}
223        }
224    }
225
226    Err(AccessError::Undefined {
227        name: "transaction status".into(),
228        path: TRANSACTION_STATUS.into(),
229    })
230}
231
232macro_rules! jsonb_access_field {
233    ($col:expr, $field:expr, $as_type:tt) => {
234        $crate::paste! {
235            $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
236        }
237    };
238}
239
240/// Parse the schema change message from Debezium.
241/// The layout of MySQL schema change message can refer to
242/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
243pub async fn parse_schema_change(
244    accessor: &impl Access,
245    source_id: SourceId,
246    source_name: &str,
247    connector_props: &ConnectorProperties,
248) -> AccessResult<SchemaChangeEnvelope> {
249    let mut schema_changes = vec![];
250    let mut pgvector_dims_cache: std::collections::HashMap<
251        (String, String),
252        std::collections::HashMap<String, usize>,
253    > = std::collections::HashMap::new();
254
255    let upstream_ddl: String = accessor
256        .access(&[UPSTREAM_DDL], &DataType::Varchar)?
257        .to_owned_datum()
258        .unwrap()
259        .as_utf8()
260        .to_string();
261
262    if let Some(ScalarRefImpl::List(table_changes)) = accessor
263        .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
264        .to_datum_ref()
265    {
266        for datum in table_changes.iter() {
267            let jsonb = match datum {
268                Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
269                _ => unreachable!(""),
270            };
271            let id: String = jsonb_access_field!(jsonb, "id", string);
272            let ty = jsonb_access_field!(jsonb, "type", string);
273
274            let table_name = id.trim_matches('"').to_owned();
275            let ddl_type: TableChangeType = ty.as_str().into();
276            if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
277                tracing::debug!("skip table schema change for create/drop command");
278                continue;
279            }
280
281            let mut column_descs: Vec<ColumnDesc> = vec![];
282            if let Some(table) = jsonb.access_object_field("table")
283                && let Some(columns) = table.access_object_field("columns")
284            {
285                for col in columns.array_elements().unwrap() {
286                    let name = jsonb_access_field!(col, "name", string);
287                    let type_name = jsonb_access_field!(col, "typeName", string);
288                    // User-defined types (enum, composite) are not in the
289                    // `type_name_to_pg_type` whitelist because their type names are
290                    // user-chosen. We detect them via Debezium metadata instead:
291                    //  - Enum: has a non-null `enumValues` field in the column descriptor.
292                    //  - Composite (STRUCT): identified by `jdbcType == 2002`.
293                    // Both are mapped to Varchar — enum values are plain strings, and
294                    // composite values are converted to text by our CustomConverter.
295                    let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
296                    let is_composite = col
297                        .access_object_field("jdbcType")
298                        .and_then(|v| v.as_number().ok())
299                        .map(|n| n.0 as i32)
300                        == Some(debezium_sql_types::STRUCT);
301
302                    let data_type = match *connector_props {
303                        ConnectorProperties::PostgresCdc(_) => {
304                            if is_composite || is_enum {
305                                tracing::debug!(target: "auto_schema_change",
306                                    "Convert PostgreSQL user-defined type '{}' ({}) to VARCHAR",
307                                    type_name,
308                                    if is_composite { "composite" } else { "enum" });
309                                DataType::Varchar
310                            } else if type_name.eq_ignore_ascii_case("vector") {
311                                let Some((schema_name, table_name_only)) =
312                                    parse_schema_table_from_debezium_id(id.as_str())
313                                else {
314                                    return Err(AccessError::CdcAutoSchemaChangeError {
315                                        ty: type_name,
316                                        table_name: format!("{}.{}", source_name, table_name),
317                                    });
318                                };
319
320                                // Cache by normalized `(schema, table)` tuple to avoid split/join churn on id text.
321                                let cache_key = (schema_name, table_name_only);
322                                if !pgvector_dims_cache.contains_key(&cache_key) {
323                                    let fetched = fetch_pgvector_dimensions_for_table(
324                                        connector_props,
325                                        &cache_key.0,
326                                        &cache_key.1,
327                                    )
328                                    .await?;
329                                    pgvector_dims_cache.insert(cache_key.clone(), fetched);
330                                }
331
332                                match pgvector_dims_cache
333                                    .get(&cache_key)
334                                    .and_then(|m| m.get(name.as_str()).copied())
335                                {
336                                    Some(dim) if (1..=DataType::VEC_MAX_SIZE).contains(&dim) => {
337                                        DataType::Vector(dim)
338                                    }
339                                    _ => {
340                                        // No fallback to VARCHAR: a dimension-less vector cannot be mapped to
341                                        // RW's required `vector(n)` type safely.
342                                        return Err(AccessError::CdcAutoSchemaChangeError {
343                                            ty: type_name,
344                                            table_name: format!("{}.{}", source_name, table_name),
345                                        });
346                                    }
347                                }
348                            } else {
349                                let ty = type_name_to_pg_type(type_name.as_str());
350                                match ty {
351                                    Some(ty) => match pg_type_to_rw_type(&ty) {
352                                        Ok(data_type) => data_type,
353                                        Err(err) => {
354                                            tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
355                                            return Err(AccessError::CdcAutoSchemaChangeError {
356                                                ty: type_name,
357                                                table_name: format!(
358                                                    "{}.{}",
359                                                    source_name, table_name
360                                                ),
361                                            });
362                                        }
363                                    },
364                                    None => {
365                                        return Err(AccessError::CdcAutoSchemaChangeError {
366                                            ty: type_name,
367                                            table_name: format!("{}.{}", source_name, table_name),
368                                        });
369                                    }
370                                }
371                            }
372                        }
373                        ConnectorProperties::MysqlCdc(_) => {
374                            let ty = type_name_to_mysql_type(type_name.as_str());
375                            match ty {
376                                Some(ty) => match mysql_type_to_rw_type(&ty) {
377                                    Ok(data_type) => data_type,
378                                    Err(err) => {
379                                        tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
380                                        return Err(AccessError::CdcAutoSchemaChangeError {
381                                            ty: type_name,
382                                            table_name: format!("{}.{}", source_name, table_name),
383                                        });
384                                    }
385                                },
386                                None => {
387                                    return Err(AccessError::CdcAutoSchemaChangeError {
388                                        ty: type_name,
389                                        table_name: format!("{}.{}", source_name, table_name),
390                                    });
391                                }
392                            }
393                        }
394                        _ => {
395                            unreachable!()
396                        }
397                    };
398
399                    // Handle default value expression. Non-constant defaults (`now()`,
400                    // `gen_random_uuid()`, `nextval('seq'::regclass)` for BIGSERIAL, etc.)
401                    // will fail `ScalarImpl::from_text` and fall through to the fail-open
402                    // branch below — the column is added without a default and a warning is
403                    // logged, rather than aborting the whole auto schema change.
404                    //
405                    // TODO: the schema change event carries the **full** set of columns of
406                    // the table, so here we cannot tell "columns newly added by this ALTER"
407                    // from "columns that already existed". A better approach would be to
408                    // only process the delta columns that actually changed in this event,
409                    // which would let us precisely tell the user: which columns have
410                    // existing rows filled with NULL (needs attention), and which ones are
411                    // pre-existing columns merely surfaced by the event (harmless, can be
412                    // ignored silently).
413                    let column_desc = match col.access_object_field("defaultValueExpression") {
414                        Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
415                            let default_val_expr_str = default_val_expr_str.as_str().unwrap();
416                            let value_text: Option<String>;
417                            match *connector_props {
418                                ConnectorProperties::PostgresCdc(_) => {
419                                    // default value of non-number data type will be stored as
420                                    // "'value'::type"
421                                    match default_val_expr_str
422                                        .split("::")
423                                        .map(|s| s.trim_matches('\''))
424                                        .next()
425                                    {
426                                        None => {
427                                            value_text = None;
428                                        }
429                                        Some(val_text) => {
430                                            value_text = Some(val_text.to_owned());
431                                        }
432                                    }
433                                }
434                                ConnectorProperties::MysqlCdc(_) => {
435                                    // mysql timestamp is mapped to timestamptz, we use UTC timezone to
436                                    // interpret its value
437                                    if data_type == DataType::Timestamptz {
438                                        value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
439                                            tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
440                                            AccessError::TypeError {
441                                                expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
442                                                got: data_type.to_string(),
443                                                value: default_val_expr_str.to_owned(),
444                                            }
445                                        })?);
446                                    } else {
447                                        value_text = Some(default_val_expr_str.to_owned());
448                                    }
449                                }
450                                _ => {
451                                    unreachable!("connector doesn't support schema change")
452                                }
453                            }
454
455                            let snapshot_value: Datum = value_text.and_then(|value_text| {
456                                ScalarImpl::from_text(value_text.as_str(), &data_type)
457                                    .inspect_err(|err| {
458                                        tracing::warn!(
459                                            target: "auto_schema_change",
460                                            error = %err.as_report(),
461                                            column = %name,
462                                            data_type = %data_type,
463                                            default_value_expression = default_val_expr_str,
464                                            upstream_ddl = %upstream_ddl,
465                                            "non-constant default expression, column added without default. \
466                                             If this column is not newly added by this schema change, it is safe to ignore this warning. \
467                                             If this column is newly added by this schema change, existing rows will be NULL in this column — consider using COALESCE in queries to provide a fallback value."
468                                        );
469                                    })
470                                    .ok()
471                            });
472
473                            if snapshot_value.is_none() {
474                                ColumnDesc::named(name, ColumnId::placeholder(), data_type)
475                            } else {
476                                ColumnDesc::named_with_default_value(
477                                    name,
478                                    ColumnId::placeholder(),
479                                    data_type,
480                                    snapshot_value,
481                                )
482                            }
483                        }
484                        _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
485                    };
486                    column_descs.push(column_desc);
487                }
488            }
489
490            // concatenate the source_id to the cdc_table_id
491            let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
492            schema_changes.push(TableSchemaChange {
493                cdc_table_id,
494                columns: column_descs
495                    .into_iter()
496                    .map(|column_desc| ColumnCatalog {
497                        column_desc,
498                        is_hidden: false,
499                    })
500                    .collect_vec(),
501                change_type: ty.as_str().into(),
502                upstream_ddl: upstream_ddl.clone(),
503            });
504        }
505
506        Ok(SchemaChangeEnvelope {
507            table_changes: schema_changes,
508        })
509    } else {
510        Err(AccessError::Undefined {
511            name: "table schema change".into(),
512            path: TABLE_CHANGES.into(),
513        })
514    }
515}
516
517impl<A> DebeziumChangeEvent<A>
518where
519    A: Access,
520{
521    /// Panic: one of the `key_accessor` or `value_accessor` must be provided.
522    pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
523        assert!(key_accessor.is_some() || value_accessor.is_some());
524        Self {
525            value_accessor,
526            key_accessor,
527            is_mongodb: false,
528        }
529    }
530
531    pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
532        assert!(key_accessor.is_some() || value_accessor.is_some());
533        Self {
534            value_accessor,
535            key_accessor,
536            is_mongodb: true,
537        }
538    }
539
540    /// Returns the transaction metadata if exists.
541    ///
542    /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details.
543    pub(crate) fn transaction_control(
544        &self,
545        connector_props: &ConnectorProperties,
546    ) -> Option<TransactionControl> {
547        // Ignore if `value_accessor` is not provided or there's any error when
548        // trying to parse the transaction metadata.
549        self.value_accessor
550            .as_ref()
551            .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
552    }
553}
554
555impl<A> ChangeEvent for DebeziumChangeEvent<A>
556where
557    A: Access,
558{
559    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
560        match self.op()? {
561            ChangeEventOperation::Delete => {
562                // For delete events of MongoDB, the "before" and "after" field both are null in the value,
563                // we need to extract the _id field from the key.
564                if self.is_mongodb && desc.name == "_id" {
565                    return self
566                        .key_accessor
567                        .as_ref()
568                        .expect("key_accessor must be provided for delete operation")
569                        .access(&[&desc.name], &desc.data_type);
570                }
571
572                if let Some(va) = self.value_accessor.as_ref() {
573                    va.access(&[BEFORE, &desc.name], &desc.data_type)
574                } else {
575                    self.key_accessor
576                        .as_ref()
577                        .unwrap()
578                        .access(&[&desc.name], &desc.data_type)
579                }
580            }
581
582            // value should not be None.
583            ChangeEventOperation::Upsert => {
584                // For upsert operation, if desc is an additional column, access field in the `SOURCE` field.
585                desc.additional_column.column_type.as_ref().map_or_else(
586                    || {
587                        self.value_accessor
588                            .as_ref()
589                            .expect("value_accessor must be provided for upsert operation")
590                            .access(&[AFTER, &desc.name], &desc.data_type)
591                    },
592                    |additional_column_type| {
593                        match *additional_column_type {
594                            ColumnType::Timestamp(_) => {
595                                // access payload.source.ts_ms
596                                let ts_ms = self
597                                    .value_accessor
598                                    .as_ref()
599                                    .expect("value_accessor must be provided for upsert operation")
600                                    .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
601                                Ok(DatumCow::Owned(ts_ms.map(|scalar| {
602                                    Timestamptz::from_millis(scalar.into_int64())
603                                        .expect("source.ts_ms must in millisecond")
604                                        .to_scalar_value()
605                                })))
606                            }
607                            ColumnType::DatabaseName(_) => self
608                                .value_accessor
609                                .as_ref()
610                                .expect("value_accessor must be provided for upsert operation")
611                                .access(&[SOURCE, SOURCE_DB], &desc.data_type),
612                            ColumnType::SchemaName(_) => self
613                                .value_accessor
614                                .as_ref()
615                                .expect("value_accessor must be provided for upsert operation")
616                                .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
617                            ColumnType::TableName(_) => self
618                                .value_accessor
619                                .as_ref()
620                                .expect("value_accessor must be provided for upsert operation")
621                                .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
622                            ColumnType::CollectionName(_) => self
623                                .value_accessor
624                                .as_ref()
625                                .expect("value_accessor must be provided for upsert operation")
626                                .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
627                            _ => Err(AccessError::UnsupportedAdditionalColumn {
628                                name: desc.name.clone(),
629                            }),
630                        }
631                    },
632                )
633            }
634        }
635    }
636
637    fn op(&self) -> Result<ChangeEventOperation, AccessError> {
638        if let Some(accessor) = &self.value_accessor {
639            if let Some(ScalarRefImpl::Utf8(op)) =
640                accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
641            {
642                match op {
643                    DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
644                        return Ok(ChangeEventOperation::Upsert);
645                    }
646                    DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
647                    _ => (),
648                }
649            }
650            Err(super::AccessError::Undefined {
651                name: "op".into(),
652                path: Default::default(),
653            })
654        } else {
655            Ok(ChangeEventOperation::Delete)
656        }
657    }
658}
659
660/// Access support for Mongo
661///
662/// For now, we considerate `strong_schema` typed `MongoDB` Debezium event jsons only.
663pub struct MongoJsonAccess<A> {
664    accessor: A,
665    strong_schema: bool,
666}
667
668pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
669    let id_field = if let Some(value) = bson_doc.get("_id") {
670        value
671    } else {
672        bson_doc
673    };
674
675    let type_error = || AccessError::TypeError {
676        expected: id_type.to_string(),
677        got: match id_field {
678            serde_json::Value::Null => "null",
679            serde_json::Value::Bool(_) => "bool",
680            serde_json::Value::Number(_) => "number",
681            serde_json::Value::String(_) => "string",
682            serde_json::Value::Array(_) => "array",
683            serde_json::Value::Object(_) => "object",
684        }
685        .to_owned(),
686        value: id_field.to_string(),
687    };
688
689    let id: Datum = match id_type {
690        DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
691        DataType::Varchar => match id_field {
692            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
693            serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
694                obj["$oid"].as_str().unwrap_or_default().into(),
695            )),
696            _ => return Err(type_error()),
697        },
698        DataType::Int32 => {
699            if let serde_json::Value::Object(obj) = id_field
700                && obj.contains_key("$numberInt")
701            {
702                let int_str = obj["$numberInt"].as_str().unwrap_or_default();
703                Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
704            } else {
705                return Err(type_error());
706            }
707        }
708        DataType::Int64 => {
709            if let serde_json::Value::Object(obj) = id_field
710                && obj.contains_key("$numberLong")
711            {
712                let int_str = obj["$numberLong"].as_str().unwrap_or_default();
713                Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
714            } else {
715                return Err(type_error());
716            }
717        }
718        _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
719    };
720    Ok(id)
721}
722
723/// Extract the field data from the bson document
724///
725/// BSON document is a JSON object with some special fields, such as:
726/// long integer: {"$numberLong": "1630454400000"}
727/// date time: {"$date": {"$numberLong": "1630454400000"}}
728///
729/// For now, we support only the Canonical format of the date and timestamp.
730///
731/// # NOTE:
732///
733/// - `field` indicates the field name in the bson document, if it is None, the `bson_doc` is the field itself.
734// similar to extract the "_id" field from the message payload
735pub fn extract_bson_field(
736    type_expected: &DataType,
737    bson_doc: &serde_json::Value,
738    field: Option<&str>,
739) -> AccessResult {
740    let type_error = |datum: &serde_json::Value| AccessError::TypeError {
741        expected: type_expected.to_string(),
742        got: match bson_doc {
743            serde_json::Value::Null => "null",
744            serde_json::Value::Bool(_) => "bool",
745            serde_json::Value::Number(_) => "number",
746            serde_json::Value::String(_) => "string",
747            serde_json::Value::Array(_) => "array",
748            serde_json::Value::Object(_) => "object",
749        }
750        .to_owned(),
751        value: datum.to_string(),
752    };
753
754    let datum = if let Some(field) = field {
755        let Some(bson_doc) = bson_doc.get(field) else {
756            return Err(type_error(bson_doc));
757        };
758        bson_doc
759    } else {
760        bson_doc
761    };
762
763    if datum.is_null() {
764        return Ok(None);
765    }
766
767    let field_datum: Datum = match type_expected {
768        DataType::Boolean => {
769            if datum.is_boolean() {
770                Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
771            } else {
772                return Err(type_error(datum));
773            }
774        }
775        DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
776        DataType::Varchar => match datum {
777            serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
778            serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
779                obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
780            }
781            _ => return Err(type_error(datum)),
782        },
783        DataType::Int16
784        | DataType::Int32
785        | DataType::Int64
786        | DataType::Int256
787        | DataType::Float32
788        | DataType::Float64 => {
789            if !datum.is_object() {
790                return Err(type_error(datum));
791            };
792
793            bson_extract_number(datum, type_expected)?
794        }
795
796        DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
797            if let serde_json::Value::Object(mp) = datum {
798                if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
799                    bson_extract_timestamp(datum, type_expected)?
800                } else if mp.contains_key("$date") {
801                    bson_extract_date(datum, type_expected)?
802                } else {
803                    return Err(type_error(datum));
804                }
805            } else {
806                return Err(type_error(datum));
807            }
808        }
809        DataType::Decimal => {
810            if let serde_json::Value::Object(obj) = datum
811                && obj.contains_key("$numberDecimal")
812                && obj["$numberDecimal"].is_string()
813            {
814                let number = obj["$numberDecimal"].as_str().unwrap();
815
816                let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
817                    AccessError::TypeError {
818                        expected: type_expected.to_string(),
819                        got: "unparsable string".into(),
820                        value: number.to_owned(),
821                    }
822                })?;
823                Some(ScalarImpl::Decimal(dec))
824            } else {
825                return Err(type_error(datum));
826            }
827        }
828
829        DataType::Bytea => {
830            if let serde_json::Value::Object(obj) = datum
831                && obj.contains_key("$binary")
832                && obj["$binary"].is_object()
833            {
834                use base64::Engine;
835
836                let binary = obj["$binary"].as_object().unwrap();
837
838                if !binary.contains_key("$base64")
839                    || !binary["$base64"].is_string()
840                    || !binary.contains_key("$subType")
841                    || !binary["$subType"].is_string()
842                {
843                    return Err(AccessError::TypeError {
844                        expected: type_expected.to_string(),
845                        got: "object".into(),
846                        value: datum.to_string(),
847                    });
848                }
849
850                let b64_str = binary["$base64"]
851                    .as_str()
852                    .ok_or_else(|| AccessError::TypeError {
853                        expected: type_expected.to_string(),
854                        got: "object".into(),
855                        value: datum.to_string(),
856                    })?;
857
858                // type is not used for now
859                let _type_str =
860                    binary["$subType"]
861                        .as_str()
862                        .ok_or_else(|| AccessError::TypeError {
863                            expected: type_expected.to_string(),
864                            got: "object".into(),
865                            value: datum.to_string(),
866                        })?;
867
868                let bytes = base64::prelude::BASE64_STANDARD
869                    .decode(b64_str)
870                    .map_err(|_| AccessError::TypeError {
871                        expected: "$binary object with $base64 string and $subType string field"
872                            .to_owned(),
873                        got: "string".to_owned(),
874                        value: bson_doc.to_string(),
875                    })?;
876                let bytea = ScalarImpl::Bytea(bytes.into());
877                Some(bytea)
878            } else {
879                return Err(type_error(datum));
880            }
881        }
882
883        DataType::Struct(struct_fields) => {
884            let mut datums = vec![];
885            for (field_name, field_type) in struct_fields.iter() {
886                let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
887                datums.push(field_datum);
888            }
889            let value = StructValue::new(datums);
890
891            Some(ScalarImpl::Struct(value))
892        }
893
894        DataType::List(list_type) => {
895            let elem_type = list_type.elem();
896            let Some(d_array) = datum.as_array() else {
897                return Err(type_error(datum));
898            };
899
900            let mut builder = elem_type.create_array_builder(d_array.len());
901            for item in d_array {
902                builder.append(extract_bson_field(elem_type, item, None)?);
903            }
904            Some(ScalarImpl::from(ListValue::new(builder.finish())))
905        }
906
907        _ => {
908            if let Some(field_name) = field {
909                unreachable!(
910                    "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
911                )
912            } else {
913                let type_expected = type_expected.to_string();
914                unreachable!(
915                    "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
916                )
917            }
918        }
919    };
920    Ok(field_datum)
921}
922
923fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
924    let field_name = match type_expected {
925        DataType::Int16 => "$numberInt",
926        DataType::Int32 => "$numberInt",
927        DataType::Int64 => "$numberLong",
928        DataType::Int256 => "$numberLong",
929        DataType::Float32 => "$numberDouble",
930        DataType::Float64 => "$numberDouble",
931        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
932    };
933
934    let datum = bson_doc.get(field_name);
935    if datum.is_none() {
936        return Err(AccessError::TypeError {
937            expected: type_expected.to_string(),
938            got: "object".into(),
939            value: bson_doc.to_string(),
940        });
941    }
942
943    let datum = datum.unwrap();
944
945    if datum.is_string() {
946        let Some(num_str) = datum.as_str() else {
947            return Err(AccessError::TypeError {
948                expected: type_expected.to_string(),
949                got: "string".into(),
950                value: datum.to_string(),
951            });
952        };
953        // parse to float
954        if [DataType::Float32, DataType::Float64].contains(type_expected) {
955            match (num_str, type_expected) {
956                ("Infinity", DataType::Float64) => {
957                    return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
958                }
959                ("Infinity", DataType::Float32) => {
960                    return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
961                }
962                ("-Infinity", DataType::Float64) => {
963                    return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
964                }
965                ("-Infinity", DataType::Float32) => {
966                    return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
967                }
968                ("NaN", DataType::Float64) => {
969                    return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
970                }
971                ("NaN", DataType::Float32) => {
972                    return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
973                }
974                _ => {}
975            }
976
977            let parsed_num: f64 = match num_str.parse() {
978                Ok(n) => n,
979                Err(_e) => {
980                    return Err(AccessError::TypeError {
981                        expected: type_expected.to_string(),
982                        got: "string".into(),
983                        value: num_str.to_owned(),
984                    });
985                }
986            };
987            if *type_expected == DataType::Float64 {
988                return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
989            } else {
990                let parsed_num = parsed_num as f32;
991                return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
992            }
993        }
994        // parse to large int
995        if *type_expected == DataType::Int256 {
996            let parsed_num = match Int256::from_str(num_str) {
997                Ok(n) => n,
998                Err(_) => {
999                    return Err(AccessError::TypeError {
1000                        expected: type_expected.to_string(),
1001                        got: "string".into(),
1002                        value: num_str.to_owned(),
1003                    });
1004                }
1005            };
1006            return Ok(Some(ScalarImpl::Int256(parsed_num)));
1007        }
1008
1009        // parse to integer
1010        let parsed_num: i64 = match num_str.parse() {
1011            Ok(n) => n,
1012            Err(_e) => {
1013                return Err(AccessError::TypeError {
1014                    expected: type_expected.to_string(),
1015                    got: "string".into(),
1016                    value: num_str.to_owned(),
1017                });
1018            }
1019        };
1020        match type_expected {
1021            DataType::Int16 => {
1022                if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
1023                    return Err(AccessError::TypeError {
1024                        expected: type_expected.to_string(),
1025                        got: "string".into(),
1026                        value: num_str.to_owned(),
1027                    });
1028                }
1029                return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
1030            }
1031            DataType::Int32 => {
1032                if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
1033                    return Err(AccessError::TypeError {
1034                        expected: type_expected.to_string(),
1035                        got: "string".into(),
1036                        value: num_str.to_owned(),
1037                    });
1038                }
1039                return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
1040            }
1041            DataType::Int64 => {
1042                return Ok(Some(ScalarImpl::Int64(parsed_num)));
1043            }
1044            _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1045        }
1046    }
1047    if datum.is_null() {
1048        return Err(AccessError::TypeError {
1049            expected: type_expected.to_string(),
1050            got: "null".into(),
1051            value: bson_doc.to_string(),
1052        });
1053    }
1054
1055    if datum.is_array() {
1056        return Err(AccessError::TypeError {
1057            expected: type_expected.to_string(),
1058            got: "array".to_owned(),
1059            value: datum.to_string(),
1060        });
1061    }
1062
1063    if datum.is_object() {
1064        return Err(AccessError::TypeError {
1065            expected: type_expected.to_string(),
1066            got: "object".to_owned(),
1067            value: datum.to_string(),
1068        });
1069    }
1070
1071    if datum.is_boolean() {
1072        return Err(AccessError::TypeError {
1073            expected: type_expected.to_string(),
1074            got: "boolean".into(),
1075            value: bson_doc.to_string(),
1076        });
1077    }
1078
1079    if datum.is_number() {
1080        let got_type = if datum.is_f64() { "f64" } else { "i64" };
1081        return Err(AccessError::TypeError {
1082            expected: type_expected.to_string(),
1083            got: got_type.into(),
1084            value: bson_doc.to_string(),
1085        });
1086    }
1087
1088    Err(AccessError::TypeError {
1089        expected: type_expected.to_string(),
1090        got: "unknown".into(),
1091        value: bson_doc.to_string(),
1092    })
1093}
1094
1095fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1096    // according to mongodb extended json v2
1097    // the date could be:
1098    //
1099    // the timestamp type could be:
1100    //
1101    // both Canonical and Relaxed format:
1102    // {"$timestamp": {"t": 1630454400, "i": 1}}
1103    //
1104    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1105    // date is encoded as number of milliseconds since the Unix epoch
1106    //
1107    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1108    // date is encoded as ISO8601 string
1109
1110    let datum = &bson_doc["$date"];
1111
1112    let type_error = || AccessError::TypeError {
1113        expected: type_expected.to_string(),
1114        got: match bson_doc {
1115            serde_json::Value::Null => "null",
1116            serde_json::Value::Bool(_) => "bool",
1117            serde_json::Value::Number(_) => "number",
1118            serde_json::Value::String(_) => "string",
1119            serde_json::Value::Array(_) => "array",
1120            serde_json::Value::Object(_) => "object",
1121        }
1122        .to_owned(),
1123        value: datum.to_string(),
1124    };
1125
1126    // deal with the Canonical format only
1127    let millis = match datum {
1128        // Canonical format {"$date": {"$numberLong": "1630454400000"}}
1129        serde_json::Value::Object(obj)
1130            if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1131        {
1132            obj["$numberLong"]
1133                .as_str()
1134                .unwrap()
1135                .parse::<i64>()
1136                .map_err(|_| AccessError::TypeError {
1137                    expected: "timestamp".into(),
1138                    got: "object".into(),
1139                    value: datum.to_string(),
1140                })?
1141        }
1142        // Relaxed format {"$date": "2021-09-01T00:00:00.000Z"}
1143        serde_json::Value::String(s) => {
1144            let dt =
1145                chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1146                    expected: "valid ISO-8601 date string".into(),
1147                    got: "string".into(),
1148                    value: datum.to_string(),
1149                })?;
1150            dt.timestamp_millis()
1151        }
1152
1153        // jsonv1 format
1154        // {"$date": 1630454400000}
1155        serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1156            expected: "timestamp".into(),
1157            got: "number".into(),
1158            value: datum.to_string(),
1159        })?,
1160
1161        _ => return Err(type_error()),
1162    };
1163
1164    let datetime =
1165        chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1166            expected: "timestamp".into(),
1167            got: "object".into(),
1168            value: datum.to_string(),
1169        })?;
1170
1171    let res = match type_expected {
1172        DataType::Date => {
1173            let naive = datetime.naive_local();
1174            let dt = naive.date();
1175            Some(ScalarImpl::Date(dt.into()))
1176        }
1177        DataType::Time => {
1178            let naive = datetime.naive_local();
1179            let dt = naive.time();
1180            Some(ScalarImpl::Time(dt.into()))
1181        }
1182        DataType::Timestamp => {
1183            let naive = datetime.naive_local();
1184            let dt = Timestamp::from(naive);
1185            Some(ScalarImpl::Timestamp(dt))
1186        }
1187        DataType::Timestamptz => {
1188            let dt = datetime.into();
1189            Some(ScalarImpl::Timestamptz(dt))
1190        }
1191        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1192    };
1193    Ok(res)
1194}
1195
1196fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1197    // according to mongodb extended json v2
1198    // the date could be:
1199    //
1200    // the timestamp type could be:
1201    //
1202    // both Canonical and Relaxed format:
1203    // {"$timestamp": {"t": 1630454400, "i": 1}}
1204    // t is the number of seconds since the Unix epoch
1205    //
1206    // Canonical: {"$date": {"$numberLong": "1630454400000"}}
1207    // date is encoded as number of milliseconds since the Unix epoch
1208    //
1209    // Relaxed: {"$date": "2021-09-01T00:00:00.000Z"}
1210    // date is encoded as ISO8601 string
1211    //
1212    // *For now, we support the Canonical format only.*
1213
1214    let Some(obj) = bson_doc["$timestamp"].as_object() else {
1215        return Err(AccessError::TypeError {
1216            expected: "timestamp".into(),
1217            got: "object".into(),
1218            value: bson_doc.to_string(),
1219        });
1220    };
1221
1222    if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1223    {
1224        return Err(AccessError::TypeError {
1225            expected: "timestamp with valid seconds since epoch".into(),
1226            got: "object".into(),
1227            value: bson_doc.to_string(),
1228        });
1229    }
1230
1231    let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1232        expected: "timestamp with valid seconds since epoch".into(),
1233        got: "object".into(),
1234        value: bson_doc.to_string(),
1235    })?;
1236
1237    let chrono_datetime =
1238        chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1239            expected: type_expected.to_string(),
1240            got: "object".to_owned(),
1241            value: bson_doc.to_string(),
1242        })?;
1243
1244    let res = match type_expected {
1245        DataType::Date => {
1246            let naive = chrono_datetime.naive_local();
1247            let dt = naive.date();
1248            Some(ScalarImpl::Date(dt.into()))
1249        }
1250        DataType::Time => {
1251            let naive = chrono_datetime.naive_local();
1252            let dt = naive.time();
1253            Some(ScalarImpl::Time(dt.into()))
1254        }
1255        DataType::Timestamp => {
1256            let naive = chrono_datetime.naive_local();
1257            let dt = Timestamp::from(naive);
1258            Some(ScalarImpl::Timestamp(dt))
1259        }
1260        DataType::Timestamptz => {
1261            let dt = chrono_datetime.into();
1262            Some(ScalarImpl::Timestamptz(dt))
1263        }
1264        _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1265    };
1266
1267    Ok(res)
1268}
1269
1270impl<A> MongoJsonAccess<A> {
1271    pub fn new(accessor: A, strong_schema: bool) -> Self {
1272        Self {
1273            accessor,
1274            strong_schema,
1275        }
1276    }
1277}
1278
1279impl<A> Access for MongoJsonAccess<A>
1280where
1281    A: Access,
1282{
1283    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1284        match path {
1285            ["after" | "before", "_id"] => {
1286                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1287                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1288                    Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1289                } else {
1290                    // fail to extract the "_id" field from the message payload
1291                    Err(AccessError::Undefined {
1292                        name: "_id".to_owned(),
1293                        path: path[0].to_owned(),
1294                    })?
1295                }
1296            }
1297
1298            ["after" | "before", "payload"] if !self.strong_schema => {
1299                self.access(&[path[0]], &DataType::Jsonb)
1300            }
1301
1302            ["after" | "before", field] if self.strong_schema => {
1303                let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1304                if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1305                    Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1306                } else {
1307                    // fail to extract the expected field from the message payload
1308                    Err(AccessError::Undefined {
1309                        name: field.to_string(),
1310                        path: path[0].to_owned(),
1311                    })?
1312                }
1313            }
1314
1315            // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
1316            // In addition, the "_id" field is named as "id" in the key. An example of message key:
1317            // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
1318            ["_id"] => {
1319                let ret = self.accessor.access(path, type_expected);
1320                if matches!(ret, Err(AccessError::Undefined { .. })) {
1321                    let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1322                    if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1323                        Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1324                    } else {
1325                        // fail to extract the "_id" field from the message key
1326                        Err(AccessError::Undefined {
1327                            name: "_id".to_owned(),
1328                            path: "id".to_owned(),
1329                        })?
1330                    }
1331                } else {
1332                    ret
1333                }
1334            }
1335            _ => self.accessor.access(path, type_expected),
1336        }
1337    }
1338}