1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::types::{
20 DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
21 Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
22};
23use risingwave_connector_codec::decoder::AccessExt;
24use risingwave_pb::plan_common::additional_column::ColumnType;
25use thiserror_ext::AsReport;
26
27use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
28use crate::parser::TransactionControl;
29use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
30use crate::parser::schema_change::TableChangeType;
31use crate::source::cdc::build_cdc_table_id;
32use crate::source::cdc::external::mysql::{
33 mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
34};
35use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
36use crate::source::{ConnectorProperties, SourceColumnDesc};
37
38fn is_bigserial_default(default_value_expression: &str) -> bool {
53 if default_value_expression.trim().is_empty() {
54 return false;
55 }
56
57 let expr = default_value_expression.trim();
58
59 expr.starts_with("nextval(")
61}
62
63pub struct DebeziumChangeEvent<A> {
97 value_accessor: Option<A>,
98 key_accessor: Option<A>,
99 is_mongodb: bool,
100}
101
102const BEFORE: &str = "before";
103const AFTER: &str = "after";
104
105const UPSTREAM_DDL: &str = "ddl";
106const SOURCE: &str = "source";
107const SOURCE_TS_MS: &str = "ts_ms";
108const SOURCE_DB: &str = "db";
109const SOURCE_SCHEMA: &str = "schema";
110const SOURCE_TABLE: &str = "table";
111const SOURCE_COLLECTION: &str = "collection";
112
113const OP: &str = "op";
114pub const TRANSACTION_STATUS: &str = "status";
115pub const TRANSACTION_ID: &str = "id";
116
117pub const TABLE_CHANGES: &str = "tableChanges";
118
119pub const DEBEZIUM_READ_OP: &str = "r";
120pub const DEBEZIUM_CREATE_OP: &str = "c";
121pub const DEBEZIUM_UPDATE_OP: &str = "u";
122pub const DEBEZIUM_DELETE_OP: &str = "d";
123
124pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
125pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
126
127pub fn parse_transaction_meta(
128 accessor: &impl Access,
129 connector_props: &ConnectorProperties,
130) -> AccessResult<TransactionControl> {
131 if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
132 accessor
133 .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
134 .to_datum_ref(),
135 accessor
136 .access(&[TRANSACTION_ID], &DataType::Varchar)?
137 .to_datum_ref(),
138 ) {
139 match status {
144 DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
145 ConnectorProperties::PostgresCdc(_) => {
146 let (tx_id, _) = id.split_once(':').unwrap();
147 return Ok(TransactionControl::Begin { id: tx_id.into() });
148 }
149 ConnectorProperties::MysqlCdc(_) => {
150 return Ok(TransactionControl::Begin { id: id.into() });
151 }
152 ConnectorProperties::SqlServerCdc(_) => {
153 return Ok(TransactionControl::Begin { id: id.into() });
154 }
155 _ => {}
156 },
157 DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
158 ConnectorProperties::PostgresCdc(_) => {
159 let (tx_id, _) = id.split_once(':').unwrap();
160 return Ok(TransactionControl::Commit { id: tx_id.into() });
161 }
162 ConnectorProperties::MysqlCdc(_) => {
163 return Ok(TransactionControl::Commit { id: id.into() });
164 }
165 ConnectorProperties::SqlServerCdc(_) => {
166 return Ok(TransactionControl::Commit { id: id.into() });
167 }
168 _ => {}
169 },
170 _ => {}
171 }
172 }
173
174 Err(AccessError::Undefined {
175 name: "transaction status".into(),
176 path: TRANSACTION_STATUS.into(),
177 })
178}
179
180macro_rules! jsonb_access_field {
181 ($col:expr, $field:expr, $as_type:tt) => {
182 $crate::paste! {
183 $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
184 }
185 };
186}
187
188pub fn parse_schema_change(
192 accessor: &impl Access,
193 source_id: u32,
194 source_name: &str,
195 connector_props: &ConnectorProperties,
196) -> AccessResult<SchemaChangeEnvelope> {
197 let mut schema_changes = vec![];
198
199 let upstream_ddl: String = accessor
200 .access(&[UPSTREAM_DDL], &DataType::Varchar)?
201 .to_owned_datum()
202 .unwrap()
203 .as_utf8()
204 .to_string();
205
206 if let Some(ScalarRefImpl::List(table_changes)) = accessor
207 .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
208 .to_datum_ref()
209 {
210 for datum in table_changes.iter() {
211 let jsonb = match datum {
212 Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
213 _ => unreachable!(""),
214 };
215 let id: String = jsonb_access_field!(jsonb, "id", string);
216 let ty = jsonb_access_field!(jsonb, "type", string);
217
218 let table_name = id.trim_matches('"').to_owned();
219 let ddl_type: TableChangeType = ty.as_str().into();
220 if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
221 tracing::debug!("skip table schema change for create/drop command");
222 continue;
223 }
224
225 let mut column_descs: Vec<ColumnDesc> = vec![];
226 if let Some(table) = jsonb.access_object_field("table")
227 && let Some(columns) = table.access_object_field("columns")
228 {
229 for col in columns.array_elements().unwrap() {
230 let name = jsonb_access_field!(col, "name", string);
231 let type_name = jsonb_access_field!(col, "typeName", string);
232 let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
234 let data_type = match *connector_props {
235 ConnectorProperties::PostgresCdc(_) => {
236 let ty = type_name_to_pg_type(type_name.as_str());
237 if is_enum {
238 tracing::debug!(target: "auto_schema_change",
239 "Convert PostgreSQL user defined enum type '{}' to VARCHAR", type_name);
240 DataType::Varchar
241 } else {
242 match ty {
243 Some(ty) => match pg_type_to_rw_type(&ty) {
244 Ok(data_type) => data_type,
245 Err(err) => {
246 tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
247 return Err(AccessError::CdcAutoSchemaChangeError {
248 ty: type_name,
249 table_name: format!(
250 "{}.{}",
251 source_name, table_name
252 ),
253 });
254 }
255 },
256 None => {
257 return Err(AccessError::CdcAutoSchemaChangeError {
258 ty: type_name,
259 table_name: format!("{}.{}", source_name, table_name),
260 });
261 }
262 }
263 }
264 }
265 ConnectorProperties::MysqlCdc(_) => {
266 let ty = type_name_to_mysql_type(type_name.as_str());
267 match ty {
268 Some(ty) => match mysql_type_to_rw_type(&ty) {
269 Ok(data_type) => data_type,
270 Err(err) => {
271 tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
272 return Err(AccessError::CdcAutoSchemaChangeError {
273 ty: type_name,
274 table_name: format!("{}.{}", source_name, table_name),
275 });
276 }
277 },
278 None => {
279 return Err(AccessError::CdcAutoSchemaChangeError {
280 ty: type_name,
281 table_name: format!("{}.{}", source_name, table_name),
282 });
283 }
284 }
285 }
286 _ => {
287 unreachable!()
288 }
289 };
290
291 let column_desc = match col.access_object_field("defaultValueExpression") {
293 Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
294 let default_val_expr_str = default_val_expr_str.as_str().unwrap();
295 if is_bigserial_default(default_val_expr_str) {
297 tracing::warn!(target: "auto_schema_change",
298 "Ignoring unsupported BIGSERIAL default value expression: {}", default_val_expr_str);
299 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
300 } else {
301 let value_text: Option<String>;
302 match *connector_props {
303 ConnectorProperties::PostgresCdc(_) => {
304 match default_val_expr_str
307 .split("::")
308 .map(|s| s.trim_matches('\''))
309 .next()
310 {
311 None => {
312 value_text = None;
313 }
314 Some(val_text) => {
315 value_text = Some(val_text.to_owned());
316 }
317 }
318 }
319 ConnectorProperties::MysqlCdc(_) => {
320 if data_type == DataType::Timestamptz {
323 value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
324 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
325 AccessError::TypeError {
326 expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
327 got: data_type.to_string(),
328 value: default_val_expr_str.to_owned(),
329 }
330 })?);
331 } else {
332 value_text = Some(default_val_expr_str.to_owned());
333 }
334 }
335 _ => {
336 unreachable!("connector doesn't support schema change")
337 }
338 }
339
340 let snapshot_value: Datum = if let Some(value_text) = value_text {
341 Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
342 |err| {
343 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
344 AccessError::TypeError {
345 expected: "constant expression".into(),
346 got: data_type.to_string(),
347 value: value_text,
348 }
349 },
350 )?)
351 } else {
352 None
353 };
354
355 if snapshot_value.is_none() {
356 tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
357 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
358 } else {
359 ColumnDesc::named_with_default_value(
360 name,
361 ColumnId::placeholder(),
362 data_type,
363 snapshot_value,
364 )
365 }
366 }
367 }
368 _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
369 };
370 column_descs.push(column_desc);
371 }
372 }
373
374 let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
376 schema_changes.push(TableSchemaChange {
377 cdc_table_id,
378 columns: column_descs
379 .into_iter()
380 .map(|column_desc| ColumnCatalog {
381 column_desc,
382 is_hidden: false,
383 })
384 .collect_vec(),
385 change_type: ty.as_str().into(),
386 upstream_ddl: upstream_ddl.clone(),
387 });
388 }
389
390 Ok(SchemaChangeEnvelope {
391 table_changes: schema_changes,
392 })
393 } else {
394 Err(AccessError::Undefined {
395 name: "table schema change".into(),
396 path: TABLE_CHANGES.into(),
397 })
398 }
399}
400
401impl<A> DebeziumChangeEvent<A>
402where
403 A: Access,
404{
405 pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
407 assert!(key_accessor.is_some() || value_accessor.is_some());
408 Self {
409 value_accessor,
410 key_accessor,
411 is_mongodb: false,
412 }
413 }
414
415 pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
416 assert!(key_accessor.is_some() || value_accessor.is_some());
417 Self {
418 value_accessor,
419 key_accessor,
420 is_mongodb: true,
421 }
422 }
423
424 pub(crate) fn transaction_control(
428 &self,
429 connector_props: &ConnectorProperties,
430 ) -> Option<TransactionControl> {
431 self.value_accessor
434 .as_ref()
435 .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
436 }
437}
438
439impl<A> ChangeEvent for DebeziumChangeEvent<A>
440where
441 A: Access,
442{
443 fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
444 match self.op()? {
445 ChangeEventOperation::Delete => {
446 if self.is_mongodb && desc.name == "_id" {
449 return self
450 .key_accessor
451 .as_ref()
452 .expect("key_accessor must be provided for delete operation")
453 .access(&[&desc.name], &desc.data_type);
454 }
455
456 if let Some(va) = self.value_accessor.as_ref() {
457 va.access(&[BEFORE, &desc.name], &desc.data_type)
458 } else {
459 self.key_accessor
460 .as_ref()
461 .unwrap()
462 .access(&[&desc.name], &desc.data_type)
463 }
464 }
465
466 ChangeEventOperation::Upsert => {
468 desc.additional_column.column_type.as_ref().map_or_else(
470 || {
471 self.value_accessor
472 .as_ref()
473 .expect("value_accessor must be provided for upsert operation")
474 .access(&[AFTER, &desc.name], &desc.data_type)
475 },
476 |additional_column_type| {
477 match *additional_column_type {
478 ColumnType::Timestamp(_) => {
479 let ts_ms = self
481 .value_accessor
482 .as_ref()
483 .expect("value_accessor must be provided for upsert operation")
484 .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
485 Ok(DatumCow::Owned(ts_ms.map(|scalar| {
486 Timestamptz::from_millis(scalar.into_int64())
487 .expect("source.ts_ms must in millisecond")
488 .to_scalar_value()
489 })))
490 }
491 ColumnType::DatabaseName(_) => self
492 .value_accessor
493 .as_ref()
494 .expect("value_accessor must be provided for upsert operation")
495 .access(&[SOURCE, SOURCE_DB], &desc.data_type),
496 ColumnType::SchemaName(_) => self
497 .value_accessor
498 .as_ref()
499 .expect("value_accessor must be provided for upsert operation")
500 .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
501 ColumnType::TableName(_) => self
502 .value_accessor
503 .as_ref()
504 .expect("value_accessor must be provided for upsert operation")
505 .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
506 ColumnType::CollectionName(_) => self
507 .value_accessor
508 .as_ref()
509 .expect("value_accessor must be provided for upsert operation")
510 .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
511 _ => Err(AccessError::UnsupportedAdditionalColumn {
512 name: desc.name.clone(),
513 }),
514 }
515 },
516 )
517 }
518 }
519 }
520
521 fn op(&self) -> Result<ChangeEventOperation, AccessError> {
522 if let Some(accessor) = &self.value_accessor {
523 if let Some(ScalarRefImpl::Utf8(op)) =
524 accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
525 {
526 match op {
527 DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
528 return Ok(ChangeEventOperation::Upsert);
529 }
530 DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
531 _ => (),
532 }
533 }
534 Err(super::AccessError::Undefined {
535 name: "op".into(),
536 path: Default::default(),
537 })
538 } else {
539 Ok(ChangeEventOperation::Delete)
540 }
541 }
542}
543
544pub struct MongoJsonAccess<A> {
548 accessor: A,
549 strong_schema: bool,
550}
551
552pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
553 let id_field = if let Some(value) = bson_doc.get("_id") {
554 value
555 } else {
556 bson_doc
557 };
558
559 let type_error = || AccessError::TypeError {
560 expected: id_type.to_string(),
561 got: match id_field {
562 serde_json::Value::Null => "null",
563 serde_json::Value::Bool(_) => "bool",
564 serde_json::Value::Number(_) => "number",
565 serde_json::Value::String(_) => "string",
566 serde_json::Value::Array(_) => "array",
567 serde_json::Value::Object(_) => "object",
568 }
569 .to_owned(),
570 value: id_field.to_string(),
571 };
572
573 let id: Datum = match id_type {
574 DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
575 DataType::Varchar => match id_field {
576 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
577 serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
578 obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
579 )),
580 _ => return Err(type_error()),
581 },
582 DataType::Int32 => {
583 if let serde_json::Value::Object(obj) = id_field
584 && obj.contains_key("$numberInt")
585 {
586 let int_str = obj["$numberInt"].as_str().unwrap_or_default();
587 Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
588 } else {
589 return Err(type_error());
590 }
591 }
592 DataType::Int64 => {
593 if let serde_json::Value::Object(obj) = id_field
594 && obj.contains_key("$numberLong")
595 {
596 let int_str = obj["$numberLong"].as_str().unwrap_or_default();
597 Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
598 } else {
599 return Err(type_error());
600 }
601 }
602 _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
603 };
604 Ok(id)
605}
606
607pub fn extract_bson_field(
620 type_expected: &DataType,
621 bson_doc: &serde_json::Value,
622 field: Option<&str>,
623) -> AccessResult {
624 let type_error = |datum: &serde_json::Value| AccessError::TypeError {
625 expected: type_expected.to_string(),
626 got: match bson_doc {
627 serde_json::Value::Null => "null",
628 serde_json::Value::Bool(_) => "bool",
629 serde_json::Value::Number(_) => "number",
630 serde_json::Value::String(_) => "string",
631 serde_json::Value::Array(_) => "array",
632 serde_json::Value::Object(_) => "object",
633 }
634 .to_owned(),
635 value: datum.to_string(),
636 };
637
638 let datum = if let Some(field) = field {
639 let Some(bson_doc) = bson_doc.get(field) else {
640 return Err(type_error(bson_doc));
641 };
642 bson_doc
643 } else {
644 bson_doc
645 };
646
647 if datum.is_null() {
648 return Ok(None);
649 }
650
651 let field_datum: Datum = match type_expected {
652 DataType::Boolean => {
653 if datum.is_boolean() {
654 Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
655 } else {
656 return Err(type_error(datum));
657 }
658 }
659 DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
660 DataType::Varchar => match datum {
661 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
662 serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
663 obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
664 }
665 _ => return Err(type_error(datum)),
666 },
667 DataType::Int16
668 | DataType::Int32
669 | DataType::Int64
670 | DataType::Int256
671 | DataType::Float32
672 | DataType::Float64 => {
673 if !datum.is_object() {
674 return Err(type_error(datum));
675 };
676
677 bson_extract_number(datum, type_expected)?
678 }
679
680 DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
681 if let serde_json::Value::Object(mp) = datum {
682 if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
683 bson_extract_timestamp(datum, type_expected)?
684 } else if mp.contains_key("$date") {
685 bson_extract_date(datum, type_expected)?
686 } else {
687 return Err(type_error(datum));
688 }
689 } else {
690 return Err(type_error(datum));
691 }
692 }
693 DataType::Decimal => {
694 if let serde_json::Value::Object(obj) = datum
695 && obj.contains_key("$numberDecimal")
696 && obj["$numberDecimal"].is_string()
697 {
698 let number = obj["$numberDecimal"].as_str().unwrap();
699
700 let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
701 AccessError::TypeError {
702 expected: type_expected.to_string(),
703 got: "unparsable string".into(),
704 value: number.to_owned(),
705 }
706 })?;
707 Some(ScalarImpl::Decimal(dec))
708 } else {
709 return Err(type_error(datum));
710 }
711 }
712
713 DataType::Bytea => {
714 if let serde_json::Value::Object(obj) = datum
715 && obj.contains_key("$binary")
716 && obj["$binary"].is_object()
717 {
718 use base64::Engine;
719
720 let binary = obj["$binary"].as_object().unwrap();
721
722 if !binary.contains_key("$base64")
723 || !binary["$base64"].is_string()
724 || !binary.contains_key("$subType")
725 || !binary["$subType"].is_string()
726 {
727 return Err(AccessError::TypeError {
728 expected: type_expected.to_string(),
729 got: "object".into(),
730 value: datum.to_string(),
731 });
732 }
733
734 let b64_str = binary["$base64"]
735 .as_str()
736 .ok_or_else(|| AccessError::TypeError {
737 expected: type_expected.to_string(),
738 got: "object".into(),
739 value: datum.to_string(),
740 })?;
741
742 let _type_str =
744 binary["$subType"]
745 .as_str()
746 .ok_or_else(|| AccessError::TypeError {
747 expected: type_expected.to_string(),
748 got: "object".into(),
749 value: datum.to_string(),
750 })?;
751
752 let bytes = base64::prelude::BASE64_STANDARD
753 .decode(b64_str)
754 .map_err(|_| AccessError::TypeError {
755 expected: "$binary object with $base64 string and $subType string field"
756 .to_owned(),
757 got: "string".to_owned(),
758 value: bson_doc.to_string(),
759 })?;
760 let bytea = ScalarImpl::Bytea(bytes.into());
761 Some(bytea)
762 } else {
763 return Err(type_error(datum));
764 }
765 }
766
767 DataType::Struct(struct_fields) => {
768 let mut datums = vec![];
769 for (field_name, field_type) in struct_fields.iter() {
770 let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
771 datums.push(field_datum);
772 }
773 let value = StructValue::new(datums);
774
775 Some(ScalarImpl::Struct(value))
776 }
777
778 DataType::List(list_type) => {
779 let elem_type = list_type.elem();
780 let Some(d_array) = datum.as_array() else {
781 return Err(type_error(datum));
782 };
783
784 let mut builder = elem_type.create_array_builder(d_array.len());
785 for item in d_array {
786 builder.append(extract_bson_field(elem_type, item, None)?);
787 }
788 Some(ScalarImpl::from(ListValue::new(builder.finish())))
789 }
790
791 _ => {
792 if let Some(field_name) = field {
793 unreachable!(
794 "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
795 )
796 } else {
797 let type_expected = type_expected.to_string();
798 unreachable!(
799 "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
800 )
801 }
802 }
803 };
804 Ok(field_datum)
805}
806
807fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
808 let field_name = match type_expected {
809 DataType::Int16 => "$numberInt",
810 DataType::Int32 => "$numberInt",
811 DataType::Int64 => "$numberLong",
812 DataType::Int256 => "$numberLong",
813 DataType::Float32 => "$numberDouble",
814 DataType::Float64 => "$numberDouble",
815 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
816 };
817
818 let datum = bson_doc.get(field_name);
819 if datum.is_none() {
820 return Err(AccessError::TypeError {
821 expected: type_expected.to_string(),
822 got: "object".into(),
823 value: bson_doc.to_string(),
824 });
825 }
826
827 let datum = datum.unwrap();
828
829 if datum.is_string() {
830 let Some(num_str) = datum.as_str() else {
831 return Err(AccessError::TypeError {
832 expected: type_expected.to_string(),
833 got: "string".into(),
834 value: datum.to_string(),
835 });
836 };
837 if [DataType::Float32, DataType::Float64].contains(type_expected) {
839 match (num_str, type_expected) {
840 ("Infinity", DataType::Float64) => {
841 return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
842 }
843 ("Infinity", DataType::Float32) => {
844 return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
845 }
846 ("-Infinity", DataType::Float64) => {
847 return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
848 }
849 ("-Infinity", DataType::Float32) => {
850 return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
851 }
852 ("NaN", DataType::Float64) => {
853 return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
854 }
855 ("NaN", DataType::Float32) => {
856 return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
857 }
858 _ => {}
859 }
860
861 let parsed_num: f64 = match num_str.parse() {
862 Ok(n) => n,
863 Err(_e) => {
864 return Err(AccessError::TypeError {
865 expected: type_expected.to_string(),
866 got: "string".into(),
867 value: num_str.to_owned(),
868 });
869 }
870 };
871 if *type_expected == DataType::Float64 {
872 return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
873 } else {
874 let parsed_num = parsed_num as f32;
875 return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
876 }
877 }
878 if *type_expected == DataType::Int256 {
880 let parsed_num = match Int256::from_str(num_str) {
881 Ok(n) => n,
882 Err(_) => {
883 return Err(AccessError::TypeError {
884 expected: type_expected.to_string(),
885 got: "string".into(),
886 value: num_str.to_owned(),
887 });
888 }
889 };
890 return Ok(Some(ScalarImpl::Int256(parsed_num)));
891 }
892
893 let parsed_num: i64 = match num_str.parse() {
895 Ok(n) => n,
896 Err(_e) => {
897 return Err(AccessError::TypeError {
898 expected: type_expected.to_string(),
899 got: "string".into(),
900 value: num_str.to_owned(),
901 });
902 }
903 };
904 match type_expected {
905 DataType::Int16 => {
906 if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
907 return Err(AccessError::TypeError {
908 expected: type_expected.to_string(),
909 got: "string".into(),
910 value: num_str.to_owned(),
911 });
912 }
913 return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
914 }
915 DataType::Int32 => {
916 if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
917 return Err(AccessError::TypeError {
918 expected: type_expected.to_string(),
919 got: "string".into(),
920 value: num_str.to_owned(),
921 });
922 }
923 return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
924 }
925 DataType::Int64 => {
926 return Ok(Some(ScalarImpl::Int64(parsed_num)));
927 }
928 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
929 }
930 }
931 if datum.is_null() {
932 return Err(AccessError::TypeError {
933 expected: type_expected.to_string(),
934 got: "null".into(),
935 value: bson_doc.to_string(),
936 });
937 }
938
939 if datum.is_array() {
940 return Err(AccessError::TypeError {
941 expected: type_expected.to_string(),
942 got: "array".to_owned(),
943 value: datum.to_string(),
944 });
945 }
946
947 if datum.is_object() {
948 return Err(AccessError::TypeError {
949 expected: type_expected.to_string(),
950 got: "object".to_owned(),
951 value: datum.to_string(),
952 });
953 }
954
955 if datum.is_boolean() {
956 return Err(AccessError::TypeError {
957 expected: type_expected.to_string(),
958 got: "boolean".into(),
959 value: bson_doc.to_string(),
960 });
961 }
962
963 if datum.is_number() {
964 let got_type = if datum.is_f64() { "f64" } else { "i64" };
965 return Err(AccessError::TypeError {
966 expected: type_expected.to_string(),
967 got: got_type.into(),
968 value: bson_doc.to_string(),
969 });
970 }
971
972 Err(AccessError::TypeError {
973 expected: type_expected.to_string(),
974 got: "unknown".into(),
975 value: bson_doc.to_string(),
976 })
977}
978
979fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
980 let datum = &bson_doc["$date"];
995
996 let type_error = || AccessError::TypeError {
997 expected: type_expected.to_string(),
998 got: match bson_doc {
999 serde_json::Value::Null => "null",
1000 serde_json::Value::Bool(_) => "bool",
1001 serde_json::Value::Number(_) => "number",
1002 serde_json::Value::String(_) => "string",
1003 serde_json::Value::Array(_) => "array",
1004 serde_json::Value::Object(_) => "object",
1005 }
1006 .to_owned(),
1007 value: datum.to_string(),
1008 };
1009
1010 let millis = match datum {
1012 serde_json::Value::Object(obj)
1014 if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1015 {
1016 obj["$numberLong"]
1017 .as_str()
1018 .unwrap()
1019 .parse::<i64>()
1020 .map_err(|_| AccessError::TypeError {
1021 expected: "timestamp".into(),
1022 got: "object".into(),
1023 value: datum.to_string(),
1024 })?
1025 }
1026 serde_json::Value::String(s) => {
1028 let dt =
1029 chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1030 expected: "valid ISO-8601 date string".into(),
1031 got: "string".into(),
1032 value: datum.to_string(),
1033 })?;
1034 dt.timestamp_millis()
1035 }
1036
1037 serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1040 expected: "timestamp".into(),
1041 got: "number".into(),
1042 value: datum.to_string(),
1043 })?,
1044
1045 _ => return Err(type_error()),
1046 };
1047
1048 let datetime =
1049 chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1050 expected: "timestamp".into(),
1051 got: "object".into(),
1052 value: datum.to_string(),
1053 })?;
1054
1055 let res = match type_expected {
1056 DataType::Date => {
1057 let naive = datetime.naive_local();
1058 let dt = naive.date();
1059 Some(ScalarImpl::Date(dt.into()))
1060 }
1061 DataType::Time => {
1062 let naive = datetime.naive_local();
1063 let dt = naive.time();
1064 Some(ScalarImpl::Time(dt.into()))
1065 }
1066 DataType::Timestamp => {
1067 let naive = datetime.naive_local();
1068 let dt = Timestamp::from(naive);
1069 Some(ScalarImpl::Timestamp(dt))
1070 }
1071 DataType::Timestamptz => {
1072 let dt = datetime.into();
1073 Some(ScalarImpl::Timestamptz(dt))
1074 }
1075 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1076 };
1077 Ok(res)
1078}
1079
1080fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1081 let Some(obj) = bson_doc["$timestamp"].as_object() else {
1099 return Err(AccessError::TypeError {
1100 expected: "timestamp".into(),
1101 got: "object".into(),
1102 value: bson_doc.to_string(),
1103 });
1104 };
1105
1106 if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1107 {
1108 return Err(AccessError::TypeError {
1109 expected: "timestamp with valid seconds since epoch".into(),
1110 got: "object".into(),
1111 value: bson_doc.to_string(),
1112 });
1113 }
1114
1115 let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1116 expected: "timestamp with valid seconds since epoch".into(),
1117 got: "object".into(),
1118 value: bson_doc.to_string(),
1119 })?;
1120
1121 let chrono_datetime =
1122 chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1123 expected: type_expected.to_string(),
1124 got: "object".to_owned(),
1125 value: bson_doc.to_string(),
1126 })?;
1127
1128 let res = match type_expected {
1129 DataType::Date => {
1130 let naive = chrono_datetime.naive_local();
1131 let dt = naive.date();
1132 Some(ScalarImpl::Date(dt.into()))
1133 }
1134 DataType::Time => {
1135 let naive = chrono_datetime.naive_local();
1136 let dt = naive.time();
1137 Some(ScalarImpl::Time(dt.into()))
1138 }
1139 DataType::Timestamp => {
1140 let naive = chrono_datetime.naive_local();
1141 let dt = Timestamp::from(naive);
1142 Some(ScalarImpl::Timestamp(dt))
1143 }
1144 DataType::Timestamptz => {
1145 let dt = chrono_datetime.into();
1146 Some(ScalarImpl::Timestamptz(dt))
1147 }
1148 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1149 };
1150
1151 Ok(res)
1152}
1153
1154impl<A> MongoJsonAccess<A> {
1155 pub fn new(accessor: A, strong_schema: bool) -> Self {
1156 Self {
1157 accessor,
1158 strong_schema,
1159 }
1160 }
1161}
1162
1163impl<A> Access for MongoJsonAccess<A>
1164where
1165 A: Access,
1166{
1167 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1168 match path {
1169 ["after" | "before", "_id"] => {
1170 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1171 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1172 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1173 } else {
1174 Err(AccessError::Undefined {
1176 name: "_id".to_owned(),
1177 path: path[0].to_owned(),
1178 })?
1179 }
1180 }
1181
1182 ["after" | "before", "payload"] if !self.strong_schema => {
1183 self.access(&[path[0]], &DataType::Jsonb)
1184 }
1185
1186 ["after" | "before", field] if self.strong_schema => {
1187 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1188 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1189 Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1190 } else {
1191 Err(AccessError::Undefined {
1193 name: field.to_string(),
1194 path: path[0].to_owned(),
1195 })?
1196 }
1197 }
1198
1199 ["_id"] => {
1203 let ret = self.accessor.access(path, type_expected);
1204 if matches!(ret, Err(AccessError::Undefined { .. })) {
1205 let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1206 if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1207 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1208 } else {
1209 Err(AccessError::Undefined {
1211 name: "_id".to_owned(),
1212 path: "id".to_owned(),
1213 })?
1214 }
1215 } else {
1216 ret
1217 }
1218 }
1219 _ => self.accessor.access(path, type_expected),
1220 }
1221 }
1222}