1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::id::SourceId;
20use risingwave_common::types::{
21 DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
22 Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
23};
24use risingwave_connector_codec::decoder::AccessExt;
25use risingwave_pb::plan_common::additional_column::ColumnType;
26use thiserror_ext::AsReport;
27
28use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
29
30mod debezium_sql_types {
32 pub const STRUCT: i32 = 2002;
33}
34use crate::connector_common::{create_pg_client_from_properties, discover_pgvector_dimensions};
35use crate::parser::TransactionControl;
36use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
37use crate::parser::schema_change::TableChangeType;
38use crate::source::cdc::build_cdc_table_id;
39use crate::source::cdc::external::mysql::{
40 mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
41};
42use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
43use crate::source::{ConnectorProperties, SourceColumnDesc};
44
45fn parse_schema_table_from_debezium_id(id: &str) -> Option<(String, String)> {
60 let trimmed = id.trim();
61 if trimmed.contains("\".\"") {
62 let parts = trimmed
63 .split("\".\"")
64 .map(|s| s.trim_matches('"').trim())
65 .collect_vec();
66 if parts.len() >= 2 {
67 let schema = parts[parts.len() - 2].to_owned();
68 let table = parts[parts.len() - 1].to_owned();
69 if !schema.is_empty() && !table.is_empty() {
70 return Some((schema, table));
71 }
72 }
73 }
74
75 let cleaned = trimmed.trim_matches('"');
76 let parts = cleaned.split('.').collect_vec();
77 if parts.len() >= 2 {
78 let schema = parts[parts.len() - 2].trim().to_owned();
79 let table = parts[parts.len() - 1].trim().to_owned();
80 if !schema.is_empty() && !table.is_empty() {
81 return Some((schema, table));
82 }
83 }
84 None
85}
86
87async fn fetch_pgvector_dimensions_for_table(
88 connector_props: &ConnectorProperties,
89 schema: &str,
90 table: &str,
91) -> AccessResult<std::collections::HashMap<String, usize>> {
92 let ConnectorProperties::PostgresCdc(cdc_props) = connector_props else {
93 return Ok(std::collections::HashMap::new());
94 };
95
96 let client = create_pg_client_from_properties(&cdc_props.properties, None)
97 .await
98 .map_err(|err| AccessError::Uncategorized {
99 message: format!(
100 "failed to connect upstream postgres for schema change lookup: {}",
101 err.as_report()
102 ),
103 })?;
104
105 discover_pgvector_dimensions(&client, schema, table)
106 .await
107 .map_err(|err| AccessError::Uncategorized {
108 message: format!(
109 "failed to query upstream postgres schema for {schema}.{table}: {}",
110 err.as_report()
111 ),
112 })
113}
114
115pub struct DebeziumChangeEvent<A> {
149 value_accessor: Option<A>,
150 key_accessor: Option<A>,
151 is_mongodb: bool,
152}
153
154const BEFORE: &str = "before";
155const AFTER: &str = "after";
156
157const UPSTREAM_DDL: &str = "ddl";
158const SOURCE: &str = "source";
159const SOURCE_TS_MS: &str = "ts_ms";
160const SOURCE_DB: &str = "db";
161const SOURCE_SCHEMA: &str = "schema";
162const SOURCE_TABLE: &str = "table";
163const SOURCE_COLLECTION: &str = "collection";
164
165const OP: &str = "op";
166pub const TRANSACTION_STATUS: &str = "status";
167pub const TRANSACTION_ID: &str = "id";
168
169pub const TABLE_CHANGES: &str = "tableChanges";
170
171pub const DEBEZIUM_READ_OP: &str = "r";
172pub const DEBEZIUM_CREATE_OP: &str = "c";
173pub const DEBEZIUM_UPDATE_OP: &str = "u";
174pub const DEBEZIUM_DELETE_OP: &str = "d";
175
176pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
177pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
178
179pub fn parse_transaction_meta(
180 accessor: &impl Access,
181 connector_props: &ConnectorProperties,
182) -> AccessResult<TransactionControl> {
183 if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
184 accessor
185 .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
186 .to_datum_ref(),
187 accessor
188 .access(&[TRANSACTION_ID], &DataType::Varchar)?
189 .to_datum_ref(),
190 ) {
191 match status {
196 DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
197 ConnectorProperties::PostgresCdc(_) => {
198 let (tx_id, _) = id.split_once(':').unwrap();
199 return Ok(TransactionControl::Begin { id: tx_id.into() });
200 }
201 ConnectorProperties::MysqlCdc(_) => {
202 return Ok(TransactionControl::Begin { id: id.into() });
203 }
204 ConnectorProperties::SqlServerCdc(_) => {
205 return Ok(TransactionControl::Begin { id: id.into() });
206 }
207 _ => {}
208 },
209 DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
210 ConnectorProperties::PostgresCdc(_) => {
211 let (tx_id, _) = id.split_once(':').unwrap();
212 return Ok(TransactionControl::Commit { id: tx_id.into() });
213 }
214 ConnectorProperties::MysqlCdc(_) => {
215 return Ok(TransactionControl::Commit { id: id.into() });
216 }
217 ConnectorProperties::SqlServerCdc(_) => {
218 return Ok(TransactionControl::Commit { id: id.into() });
219 }
220 _ => {}
221 },
222 _ => {}
223 }
224 }
225
226 Err(AccessError::Undefined {
227 name: "transaction status".into(),
228 path: TRANSACTION_STATUS.into(),
229 })
230}
231
232macro_rules! jsonb_access_field {
233 ($col:expr, $field:expr, $as_type:tt) => {
234 $crate::paste! {
235 $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
236 }
237 };
238}
239
240pub async fn parse_schema_change(
244 accessor: &impl Access,
245 source_id: SourceId,
246 source_name: &str,
247 connector_props: &ConnectorProperties,
248) -> AccessResult<SchemaChangeEnvelope> {
249 let mut schema_changes = vec![];
250 let mut pgvector_dims_cache: std::collections::HashMap<
251 (String, String),
252 std::collections::HashMap<String, usize>,
253 > = std::collections::HashMap::new();
254
255 let upstream_ddl: String = accessor
256 .access(&[UPSTREAM_DDL], &DataType::Varchar)?
257 .to_owned_datum()
258 .unwrap()
259 .as_utf8()
260 .to_string();
261
262 if let Some(ScalarRefImpl::List(table_changes)) = accessor
263 .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
264 .to_datum_ref()
265 {
266 for datum in table_changes.iter() {
267 let jsonb = match datum {
268 Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
269 _ => unreachable!(""),
270 };
271 let id: String = jsonb_access_field!(jsonb, "id", string);
272 let ty = jsonb_access_field!(jsonb, "type", string);
273
274 let table_name = id.trim_matches('"').to_owned();
275 let ddl_type: TableChangeType = ty.as_str().into();
276 if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
277 tracing::debug!("skip table schema change for create/drop command");
278 continue;
279 }
280
281 let mut column_descs: Vec<ColumnDesc> = vec![];
282 if let Some(table) = jsonb.access_object_field("table")
283 && let Some(columns) = table.access_object_field("columns")
284 {
285 for col in columns.array_elements().unwrap() {
286 let name = jsonb_access_field!(col, "name", string);
287 let type_name = jsonb_access_field!(col, "typeName", string);
288 let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
296 let is_composite = col
297 .access_object_field("jdbcType")
298 .and_then(|v| v.as_number().ok())
299 .map(|n| n.0 as i32)
300 == Some(debezium_sql_types::STRUCT);
301
302 let data_type = match *connector_props {
303 ConnectorProperties::PostgresCdc(_) => {
304 if is_composite || is_enum {
305 tracing::debug!(target: "auto_schema_change",
306 "Convert PostgreSQL user-defined type '{}' ({}) to VARCHAR",
307 type_name,
308 if is_composite { "composite" } else { "enum" });
309 DataType::Varchar
310 } else if type_name.eq_ignore_ascii_case("vector") {
311 let Some((schema_name, table_name_only)) =
312 parse_schema_table_from_debezium_id(id.as_str())
313 else {
314 return Err(AccessError::CdcAutoSchemaChangeError {
315 ty: type_name,
316 table_name: format!("{}.{}", source_name, table_name),
317 });
318 };
319
320 let cache_key = (schema_name, table_name_only);
322 if !pgvector_dims_cache.contains_key(&cache_key) {
323 let fetched = fetch_pgvector_dimensions_for_table(
324 connector_props,
325 &cache_key.0,
326 &cache_key.1,
327 )
328 .await?;
329 pgvector_dims_cache.insert(cache_key.clone(), fetched);
330 }
331
332 match pgvector_dims_cache
333 .get(&cache_key)
334 .and_then(|m| m.get(name.as_str()).copied())
335 {
336 Some(dim) if (1..=DataType::VEC_MAX_SIZE).contains(&dim) => {
337 DataType::Vector(dim)
338 }
339 _ => {
340 return Err(AccessError::CdcAutoSchemaChangeError {
343 ty: type_name,
344 table_name: format!("{}.{}", source_name, table_name),
345 });
346 }
347 }
348 } else {
349 let ty = type_name_to_pg_type(type_name.as_str());
350 match ty {
351 Some(ty) => match pg_type_to_rw_type(&ty) {
352 Ok(data_type) => data_type,
353 Err(err) => {
354 tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
355 return Err(AccessError::CdcAutoSchemaChangeError {
356 ty: type_name,
357 table_name: format!(
358 "{}.{}",
359 source_name, table_name
360 ),
361 });
362 }
363 },
364 None => {
365 return Err(AccessError::CdcAutoSchemaChangeError {
366 ty: type_name,
367 table_name: format!("{}.{}", source_name, table_name),
368 });
369 }
370 }
371 }
372 }
373 ConnectorProperties::MysqlCdc(_) => {
374 let ty = type_name_to_mysql_type(type_name.as_str());
375 match ty {
376 Some(ty) => match mysql_type_to_rw_type(&ty) {
377 Ok(data_type) => data_type,
378 Err(err) => {
379 tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
380 return Err(AccessError::CdcAutoSchemaChangeError {
381 ty: type_name,
382 table_name: format!("{}.{}", source_name, table_name),
383 });
384 }
385 },
386 None => {
387 return Err(AccessError::CdcAutoSchemaChangeError {
388 ty: type_name,
389 table_name: format!("{}.{}", source_name, table_name),
390 });
391 }
392 }
393 }
394 _ => {
395 unreachable!()
396 }
397 };
398
399 let column_desc = match col.access_object_field("defaultValueExpression") {
414 Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
415 let default_val_expr_str = default_val_expr_str.as_str().unwrap();
416 let value_text: Option<String>;
417 match *connector_props {
418 ConnectorProperties::PostgresCdc(_) => {
419 match default_val_expr_str
422 .split("::")
423 .map(|s| s.trim_matches('\''))
424 .next()
425 {
426 None => {
427 value_text = None;
428 }
429 Some(val_text) => {
430 value_text = Some(val_text.to_owned());
431 }
432 }
433 }
434 ConnectorProperties::MysqlCdc(_) => {
435 if data_type == DataType::Timestamptz {
438 value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
439 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
440 AccessError::TypeError {
441 expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
442 got: data_type.to_string(),
443 value: default_val_expr_str.to_owned(),
444 }
445 })?);
446 } else {
447 value_text = Some(default_val_expr_str.to_owned());
448 }
449 }
450 _ => {
451 unreachable!("connector doesn't support schema change")
452 }
453 }
454
455 let snapshot_value: Datum = value_text.and_then(|value_text| {
456 ScalarImpl::from_text(value_text.as_str(), &data_type)
457 .inspect_err(|err| {
458 tracing::warn!(
459 target: "auto_schema_change",
460 error = %err.as_report(),
461 column = %name,
462 data_type = %data_type,
463 default_value_expression = default_val_expr_str,
464 upstream_ddl = %upstream_ddl,
465 "non-constant default expression, column added without default. \
466 If this column is not newly added by this schema change, it is safe to ignore this warning. \
467 If this column is newly added by this schema change, existing rows will be NULL in this column — consider using COALESCE in queries to provide a fallback value."
468 );
469 })
470 .ok()
471 });
472
473 if snapshot_value.is_none() {
474 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
475 } else {
476 ColumnDesc::named_with_default_value(
477 name,
478 ColumnId::placeholder(),
479 data_type,
480 snapshot_value,
481 )
482 }
483 }
484 _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
485 };
486 column_descs.push(column_desc);
487 }
488 }
489
490 let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
492 schema_changes.push(TableSchemaChange {
493 cdc_table_id,
494 columns: column_descs
495 .into_iter()
496 .map(|column_desc| ColumnCatalog {
497 column_desc,
498 is_hidden: false,
499 })
500 .collect_vec(),
501 change_type: ty.as_str().into(),
502 upstream_ddl: upstream_ddl.clone(),
503 });
504 }
505
506 Ok(SchemaChangeEnvelope {
507 table_changes: schema_changes,
508 })
509 } else {
510 Err(AccessError::Undefined {
511 name: "table schema change".into(),
512 path: TABLE_CHANGES.into(),
513 })
514 }
515}
516
517impl<A> DebeziumChangeEvent<A>
518where
519 A: Access,
520{
521 pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
523 assert!(key_accessor.is_some() || value_accessor.is_some());
524 Self {
525 value_accessor,
526 key_accessor,
527 is_mongodb: false,
528 }
529 }
530
531 pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
532 assert!(key_accessor.is_some() || value_accessor.is_some());
533 Self {
534 value_accessor,
535 key_accessor,
536 is_mongodb: true,
537 }
538 }
539
540 pub(crate) fn transaction_control(
544 &self,
545 connector_props: &ConnectorProperties,
546 ) -> Option<TransactionControl> {
547 self.value_accessor
550 .as_ref()
551 .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
552 }
553}
554
555impl<A> ChangeEvent for DebeziumChangeEvent<A>
556where
557 A: Access,
558{
559 fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
560 match self.op()? {
561 ChangeEventOperation::Delete => {
562 if self.is_mongodb && desc.name == "_id" {
565 return self
566 .key_accessor
567 .as_ref()
568 .expect("key_accessor must be provided for delete operation")
569 .access(&[&desc.name], &desc.data_type);
570 }
571
572 if let Some(va) = self.value_accessor.as_ref() {
573 va.access(&[BEFORE, &desc.name], &desc.data_type)
574 } else {
575 self.key_accessor
576 .as_ref()
577 .unwrap()
578 .access(&[&desc.name], &desc.data_type)
579 }
580 }
581
582 ChangeEventOperation::Upsert => {
584 desc.additional_column.column_type.as_ref().map_or_else(
586 || {
587 self.value_accessor
588 .as_ref()
589 .expect("value_accessor must be provided for upsert operation")
590 .access(&[AFTER, &desc.name], &desc.data_type)
591 },
592 |additional_column_type| {
593 match *additional_column_type {
594 ColumnType::Timestamp(_) => {
595 let ts_ms = self
597 .value_accessor
598 .as_ref()
599 .expect("value_accessor must be provided for upsert operation")
600 .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
601 Ok(DatumCow::Owned(ts_ms.map(|scalar| {
602 Timestamptz::from_millis(scalar.into_int64())
603 .expect("source.ts_ms must in millisecond")
604 .to_scalar_value()
605 })))
606 }
607 ColumnType::DatabaseName(_) => self
608 .value_accessor
609 .as_ref()
610 .expect("value_accessor must be provided for upsert operation")
611 .access(&[SOURCE, SOURCE_DB], &desc.data_type),
612 ColumnType::SchemaName(_) => self
613 .value_accessor
614 .as_ref()
615 .expect("value_accessor must be provided for upsert operation")
616 .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
617 ColumnType::TableName(_) => self
618 .value_accessor
619 .as_ref()
620 .expect("value_accessor must be provided for upsert operation")
621 .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
622 ColumnType::CollectionName(_) => self
623 .value_accessor
624 .as_ref()
625 .expect("value_accessor must be provided for upsert operation")
626 .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
627 _ => Err(AccessError::UnsupportedAdditionalColumn {
628 name: desc.name.clone(),
629 }),
630 }
631 },
632 )
633 }
634 }
635 }
636
637 fn op(&self) -> Result<ChangeEventOperation, AccessError> {
638 if let Some(accessor) = &self.value_accessor {
639 if let Some(ScalarRefImpl::Utf8(op)) =
640 accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
641 {
642 match op {
643 DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
644 return Ok(ChangeEventOperation::Upsert);
645 }
646 DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
647 _ => (),
648 }
649 }
650 Err(super::AccessError::Undefined {
651 name: "op".into(),
652 path: Default::default(),
653 })
654 } else {
655 Ok(ChangeEventOperation::Delete)
656 }
657 }
658}
659
660pub struct MongoJsonAccess<A> {
664 accessor: A,
665 strong_schema: bool,
666}
667
668pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
669 let id_field = if let Some(value) = bson_doc.get("_id") {
670 value
671 } else {
672 bson_doc
673 };
674
675 let type_error = || AccessError::TypeError {
676 expected: id_type.to_string(),
677 got: match id_field {
678 serde_json::Value::Null => "null",
679 serde_json::Value::Bool(_) => "bool",
680 serde_json::Value::Number(_) => "number",
681 serde_json::Value::String(_) => "string",
682 serde_json::Value::Array(_) => "array",
683 serde_json::Value::Object(_) => "object",
684 }
685 .to_owned(),
686 value: id_field.to_string(),
687 };
688
689 let id: Datum = match id_type {
690 DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
691 DataType::Varchar => match id_field {
692 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
693 serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
694 obj["$oid"].as_str().unwrap_or_default().into(),
695 )),
696 _ => return Err(type_error()),
697 },
698 DataType::Int32 => {
699 if let serde_json::Value::Object(obj) = id_field
700 && obj.contains_key("$numberInt")
701 {
702 let int_str = obj["$numberInt"].as_str().unwrap_or_default();
703 Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
704 } else {
705 return Err(type_error());
706 }
707 }
708 DataType::Int64 => {
709 if let serde_json::Value::Object(obj) = id_field
710 && obj.contains_key("$numberLong")
711 {
712 let int_str = obj["$numberLong"].as_str().unwrap_or_default();
713 Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
714 } else {
715 return Err(type_error());
716 }
717 }
718 _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
719 };
720 Ok(id)
721}
722
723pub fn extract_bson_field(
736 type_expected: &DataType,
737 bson_doc: &serde_json::Value,
738 field: Option<&str>,
739) -> AccessResult {
740 let type_error = |datum: &serde_json::Value| AccessError::TypeError {
741 expected: type_expected.to_string(),
742 got: match bson_doc {
743 serde_json::Value::Null => "null",
744 serde_json::Value::Bool(_) => "bool",
745 serde_json::Value::Number(_) => "number",
746 serde_json::Value::String(_) => "string",
747 serde_json::Value::Array(_) => "array",
748 serde_json::Value::Object(_) => "object",
749 }
750 .to_owned(),
751 value: datum.to_string(),
752 };
753
754 let datum = if let Some(field) = field {
755 let Some(bson_doc) = bson_doc.get(field) else {
756 return Err(type_error(bson_doc));
757 };
758 bson_doc
759 } else {
760 bson_doc
761 };
762
763 if datum.is_null() {
764 return Ok(None);
765 }
766
767 let field_datum: Datum = match type_expected {
768 DataType::Boolean => {
769 if datum.is_boolean() {
770 Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
771 } else {
772 return Err(type_error(datum));
773 }
774 }
775 DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
776 DataType::Varchar => match datum {
777 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
778 serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
779 obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
780 }
781 _ => return Err(type_error(datum)),
782 },
783 DataType::Int16
784 | DataType::Int32
785 | DataType::Int64
786 | DataType::Int256
787 | DataType::Float32
788 | DataType::Float64 => {
789 if !datum.is_object() {
790 return Err(type_error(datum));
791 };
792
793 bson_extract_number(datum, type_expected)?
794 }
795
796 DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
797 if let serde_json::Value::Object(mp) = datum {
798 if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
799 bson_extract_timestamp(datum, type_expected)?
800 } else if mp.contains_key("$date") {
801 bson_extract_date(datum, type_expected)?
802 } else {
803 return Err(type_error(datum));
804 }
805 } else {
806 return Err(type_error(datum));
807 }
808 }
809 DataType::Decimal => {
810 if let serde_json::Value::Object(obj) = datum
811 && obj.contains_key("$numberDecimal")
812 && obj["$numberDecimal"].is_string()
813 {
814 let number = obj["$numberDecimal"].as_str().unwrap();
815
816 let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
817 AccessError::TypeError {
818 expected: type_expected.to_string(),
819 got: "unparsable string".into(),
820 value: number.to_owned(),
821 }
822 })?;
823 Some(ScalarImpl::Decimal(dec))
824 } else {
825 return Err(type_error(datum));
826 }
827 }
828
829 DataType::Bytea => {
830 if let serde_json::Value::Object(obj) = datum
831 && obj.contains_key("$binary")
832 && obj["$binary"].is_object()
833 {
834 use base64::Engine;
835
836 let binary = obj["$binary"].as_object().unwrap();
837
838 if !binary.contains_key("$base64")
839 || !binary["$base64"].is_string()
840 || !binary.contains_key("$subType")
841 || !binary["$subType"].is_string()
842 {
843 return Err(AccessError::TypeError {
844 expected: type_expected.to_string(),
845 got: "object".into(),
846 value: datum.to_string(),
847 });
848 }
849
850 let b64_str = binary["$base64"]
851 .as_str()
852 .ok_or_else(|| AccessError::TypeError {
853 expected: type_expected.to_string(),
854 got: "object".into(),
855 value: datum.to_string(),
856 })?;
857
858 let _type_str =
860 binary["$subType"]
861 .as_str()
862 .ok_or_else(|| AccessError::TypeError {
863 expected: type_expected.to_string(),
864 got: "object".into(),
865 value: datum.to_string(),
866 })?;
867
868 let bytes = base64::prelude::BASE64_STANDARD
869 .decode(b64_str)
870 .map_err(|_| AccessError::TypeError {
871 expected: "$binary object with $base64 string and $subType string field"
872 .to_owned(),
873 got: "string".to_owned(),
874 value: bson_doc.to_string(),
875 })?;
876 let bytea = ScalarImpl::Bytea(bytes.into());
877 Some(bytea)
878 } else {
879 return Err(type_error(datum));
880 }
881 }
882
883 DataType::Struct(struct_fields) => {
884 let mut datums = vec![];
885 for (field_name, field_type) in struct_fields.iter() {
886 let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
887 datums.push(field_datum);
888 }
889 let value = StructValue::new(datums);
890
891 Some(ScalarImpl::Struct(value))
892 }
893
894 DataType::List(list_type) => {
895 let elem_type = list_type.elem();
896 let Some(d_array) = datum.as_array() else {
897 return Err(type_error(datum));
898 };
899
900 let mut builder = elem_type.create_array_builder(d_array.len());
901 for item in d_array {
902 builder.append(extract_bson_field(elem_type, item, None)?);
903 }
904 Some(ScalarImpl::from(ListValue::new(builder.finish())))
905 }
906
907 _ => {
908 if let Some(field_name) = field {
909 unreachable!(
910 "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
911 )
912 } else {
913 let type_expected = type_expected.to_string();
914 unreachable!(
915 "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
916 )
917 }
918 }
919 };
920 Ok(field_datum)
921}
922
923fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
924 let field_name = match type_expected {
925 DataType::Int16 => "$numberInt",
926 DataType::Int32 => "$numberInt",
927 DataType::Int64 => "$numberLong",
928 DataType::Int256 => "$numberLong",
929 DataType::Float32 => "$numberDouble",
930 DataType::Float64 => "$numberDouble",
931 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
932 };
933
934 let datum = bson_doc.get(field_name);
935 if datum.is_none() {
936 return Err(AccessError::TypeError {
937 expected: type_expected.to_string(),
938 got: "object".into(),
939 value: bson_doc.to_string(),
940 });
941 }
942
943 let datum = datum.unwrap();
944
945 if datum.is_string() {
946 let Some(num_str) = datum.as_str() else {
947 return Err(AccessError::TypeError {
948 expected: type_expected.to_string(),
949 got: "string".into(),
950 value: datum.to_string(),
951 });
952 };
953 if [DataType::Float32, DataType::Float64].contains(type_expected) {
955 match (num_str, type_expected) {
956 ("Infinity", DataType::Float64) => {
957 return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
958 }
959 ("Infinity", DataType::Float32) => {
960 return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
961 }
962 ("-Infinity", DataType::Float64) => {
963 return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
964 }
965 ("-Infinity", DataType::Float32) => {
966 return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
967 }
968 ("NaN", DataType::Float64) => {
969 return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
970 }
971 ("NaN", DataType::Float32) => {
972 return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
973 }
974 _ => {}
975 }
976
977 let parsed_num: f64 = match num_str.parse() {
978 Ok(n) => n,
979 Err(_e) => {
980 return Err(AccessError::TypeError {
981 expected: type_expected.to_string(),
982 got: "string".into(),
983 value: num_str.to_owned(),
984 });
985 }
986 };
987 if *type_expected == DataType::Float64 {
988 return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
989 } else {
990 let parsed_num = parsed_num as f32;
991 return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
992 }
993 }
994 if *type_expected == DataType::Int256 {
996 let parsed_num = match Int256::from_str(num_str) {
997 Ok(n) => n,
998 Err(_) => {
999 return Err(AccessError::TypeError {
1000 expected: type_expected.to_string(),
1001 got: "string".into(),
1002 value: num_str.to_owned(),
1003 });
1004 }
1005 };
1006 return Ok(Some(ScalarImpl::Int256(parsed_num)));
1007 }
1008
1009 let parsed_num: i64 = match num_str.parse() {
1011 Ok(n) => n,
1012 Err(_e) => {
1013 return Err(AccessError::TypeError {
1014 expected: type_expected.to_string(),
1015 got: "string".into(),
1016 value: num_str.to_owned(),
1017 });
1018 }
1019 };
1020 match type_expected {
1021 DataType::Int16 => {
1022 if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
1023 return Err(AccessError::TypeError {
1024 expected: type_expected.to_string(),
1025 got: "string".into(),
1026 value: num_str.to_owned(),
1027 });
1028 }
1029 return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
1030 }
1031 DataType::Int32 => {
1032 if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
1033 return Err(AccessError::TypeError {
1034 expected: type_expected.to_string(),
1035 got: "string".into(),
1036 value: num_str.to_owned(),
1037 });
1038 }
1039 return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
1040 }
1041 DataType::Int64 => {
1042 return Ok(Some(ScalarImpl::Int64(parsed_num)));
1043 }
1044 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1045 }
1046 }
1047 if datum.is_null() {
1048 return Err(AccessError::TypeError {
1049 expected: type_expected.to_string(),
1050 got: "null".into(),
1051 value: bson_doc.to_string(),
1052 });
1053 }
1054
1055 if datum.is_array() {
1056 return Err(AccessError::TypeError {
1057 expected: type_expected.to_string(),
1058 got: "array".to_owned(),
1059 value: datum.to_string(),
1060 });
1061 }
1062
1063 if datum.is_object() {
1064 return Err(AccessError::TypeError {
1065 expected: type_expected.to_string(),
1066 got: "object".to_owned(),
1067 value: datum.to_string(),
1068 });
1069 }
1070
1071 if datum.is_boolean() {
1072 return Err(AccessError::TypeError {
1073 expected: type_expected.to_string(),
1074 got: "boolean".into(),
1075 value: bson_doc.to_string(),
1076 });
1077 }
1078
1079 if datum.is_number() {
1080 let got_type = if datum.is_f64() { "f64" } else { "i64" };
1081 return Err(AccessError::TypeError {
1082 expected: type_expected.to_string(),
1083 got: got_type.into(),
1084 value: bson_doc.to_string(),
1085 });
1086 }
1087
1088 Err(AccessError::TypeError {
1089 expected: type_expected.to_string(),
1090 got: "unknown".into(),
1091 value: bson_doc.to_string(),
1092 })
1093}
1094
1095fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1096 let datum = &bson_doc["$date"];
1111
1112 let type_error = || AccessError::TypeError {
1113 expected: type_expected.to_string(),
1114 got: match bson_doc {
1115 serde_json::Value::Null => "null",
1116 serde_json::Value::Bool(_) => "bool",
1117 serde_json::Value::Number(_) => "number",
1118 serde_json::Value::String(_) => "string",
1119 serde_json::Value::Array(_) => "array",
1120 serde_json::Value::Object(_) => "object",
1121 }
1122 .to_owned(),
1123 value: datum.to_string(),
1124 };
1125
1126 let millis = match datum {
1128 serde_json::Value::Object(obj)
1130 if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1131 {
1132 obj["$numberLong"]
1133 .as_str()
1134 .unwrap()
1135 .parse::<i64>()
1136 .map_err(|_| AccessError::TypeError {
1137 expected: "timestamp".into(),
1138 got: "object".into(),
1139 value: datum.to_string(),
1140 })?
1141 }
1142 serde_json::Value::String(s) => {
1144 let dt =
1145 chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1146 expected: "valid ISO-8601 date string".into(),
1147 got: "string".into(),
1148 value: datum.to_string(),
1149 })?;
1150 dt.timestamp_millis()
1151 }
1152
1153 serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1156 expected: "timestamp".into(),
1157 got: "number".into(),
1158 value: datum.to_string(),
1159 })?,
1160
1161 _ => return Err(type_error()),
1162 };
1163
1164 let datetime =
1165 chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1166 expected: "timestamp".into(),
1167 got: "object".into(),
1168 value: datum.to_string(),
1169 })?;
1170
1171 let res = match type_expected {
1172 DataType::Date => {
1173 let naive = datetime.naive_local();
1174 let dt = naive.date();
1175 Some(ScalarImpl::Date(dt.into()))
1176 }
1177 DataType::Time => {
1178 let naive = datetime.naive_local();
1179 let dt = naive.time();
1180 Some(ScalarImpl::Time(dt.into()))
1181 }
1182 DataType::Timestamp => {
1183 let naive = datetime.naive_local();
1184 let dt = Timestamp::from(naive);
1185 Some(ScalarImpl::Timestamp(dt))
1186 }
1187 DataType::Timestamptz => {
1188 let dt = datetime.into();
1189 Some(ScalarImpl::Timestamptz(dt))
1190 }
1191 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1192 };
1193 Ok(res)
1194}
1195
1196fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1197 let Some(obj) = bson_doc["$timestamp"].as_object() else {
1215 return Err(AccessError::TypeError {
1216 expected: "timestamp".into(),
1217 got: "object".into(),
1218 value: bson_doc.to_string(),
1219 });
1220 };
1221
1222 if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1223 {
1224 return Err(AccessError::TypeError {
1225 expected: "timestamp with valid seconds since epoch".into(),
1226 got: "object".into(),
1227 value: bson_doc.to_string(),
1228 });
1229 }
1230
1231 let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1232 expected: "timestamp with valid seconds since epoch".into(),
1233 got: "object".into(),
1234 value: bson_doc.to_string(),
1235 })?;
1236
1237 let chrono_datetime =
1238 chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1239 expected: type_expected.to_string(),
1240 got: "object".to_owned(),
1241 value: bson_doc.to_string(),
1242 })?;
1243
1244 let res = match type_expected {
1245 DataType::Date => {
1246 let naive = chrono_datetime.naive_local();
1247 let dt = naive.date();
1248 Some(ScalarImpl::Date(dt.into()))
1249 }
1250 DataType::Time => {
1251 let naive = chrono_datetime.naive_local();
1252 let dt = naive.time();
1253 Some(ScalarImpl::Time(dt.into()))
1254 }
1255 DataType::Timestamp => {
1256 let naive = chrono_datetime.naive_local();
1257 let dt = Timestamp::from(naive);
1258 Some(ScalarImpl::Timestamp(dt))
1259 }
1260 DataType::Timestamptz => {
1261 let dt = chrono_datetime.into();
1262 Some(ScalarImpl::Timestamptz(dt))
1263 }
1264 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1265 };
1266
1267 Ok(res)
1268}
1269
1270impl<A> MongoJsonAccess<A> {
1271 pub fn new(accessor: A, strong_schema: bool) -> Self {
1272 Self {
1273 accessor,
1274 strong_schema,
1275 }
1276 }
1277}
1278
1279impl<A> Access for MongoJsonAccess<A>
1280where
1281 A: Access,
1282{
1283 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1284 match path {
1285 ["after" | "before", "_id"] => {
1286 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1287 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1288 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1289 } else {
1290 Err(AccessError::Undefined {
1292 name: "_id".to_owned(),
1293 path: path[0].to_owned(),
1294 })?
1295 }
1296 }
1297
1298 ["after" | "before", "payload"] if !self.strong_schema => {
1299 self.access(&[path[0]], &DataType::Jsonb)
1300 }
1301
1302 ["after" | "before", field] if self.strong_schema => {
1303 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1304 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1305 Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1306 } else {
1307 Err(AccessError::Undefined {
1309 name: field.to_string(),
1310 path: path[0].to_owned(),
1311 })?
1312 }
1313 }
1314
1315 ["_id"] => {
1319 let ret = self.accessor.access(path, type_expected);
1320 if matches!(ret, Err(AccessError::Undefined { .. })) {
1321 let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1322 if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1323 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1324 } else {
1325 Err(AccessError::Undefined {
1327 name: "_id".to_owned(),
1328 path: "id".to_owned(),
1329 })?
1330 }
1331 } else {
1332 ret
1333 }
1334 }
1335 _ => self.accessor.access(path, type_expected),
1336 }
1337 }
1338}