1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::types::{
20 DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
21 Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
22};
23use risingwave_connector_codec::decoder::AccessExt;
24use risingwave_pb::plan_common::additional_column::ColumnType;
25use thiserror_ext::AsReport;
26
27use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
28use crate::parser::TransactionControl;
29use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
30use crate::parser::schema_change::TableChangeType;
31use crate::source::cdc::build_cdc_table_id;
32use crate::source::cdc::external::mysql::{
33 mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
34};
35use crate::source::{ConnectorProperties, SourceColumnDesc};
36
37pub struct DebeziumChangeEvent<A> {
71 value_accessor: Option<A>,
72 key_accessor: Option<A>,
73 is_mongodb: bool,
74}
75
76const BEFORE: &str = "before";
77const AFTER: &str = "after";
78
79const UPSTREAM_DDL: &str = "ddl";
80const SOURCE: &str = "source";
81const SOURCE_TS_MS: &str = "ts_ms";
82const SOURCE_DB: &str = "db";
83const SOURCE_SCHEMA: &str = "schema";
84const SOURCE_TABLE: &str = "table";
85const SOURCE_COLLECTION: &str = "collection";
86
87const OP: &str = "op";
88pub const TRANSACTION_STATUS: &str = "status";
89pub const TRANSACTION_ID: &str = "id";
90
91pub const TABLE_CHANGES: &str = "tableChanges";
92
93pub const DEBEZIUM_READ_OP: &str = "r";
94pub const DEBEZIUM_CREATE_OP: &str = "c";
95pub const DEBEZIUM_UPDATE_OP: &str = "u";
96pub const DEBEZIUM_DELETE_OP: &str = "d";
97
98pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
99pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
100
101pub fn parse_transaction_meta(
102 accessor: &impl Access,
103 connector_props: &ConnectorProperties,
104) -> AccessResult<TransactionControl> {
105 if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
106 accessor
107 .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
108 .to_datum_ref(),
109 accessor
110 .access(&[TRANSACTION_ID], &DataType::Varchar)?
111 .to_datum_ref(),
112 ) {
113 match status {
118 DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
119 ConnectorProperties::PostgresCdc(_) => {
120 let (tx_id, _) = id.split_once(':').unwrap();
121 return Ok(TransactionControl::Begin { id: tx_id.into() });
122 }
123 ConnectorProperties::MysqlCdc(_) => {
124 return Ok(TransactionControl::Begin { id: id.into() });
125 }
126 ConnectorProperties::SqlServerCdc(_) => {
127 return Ok(TransactionControl::Begin { id: id.into() });
128 }
129 _ => {}
130 },
131 DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
132 ConnectorProperties::PostgresCdc(_) => {
133 let (tx_id, _) = id.split_once(':').unwrap();
134 return Ok(TransactionControl::Commit { id: tx_id.into() });
135 }
136 ConnectorProperties::MysqlCdc(_) => {
137 return Ok(TransactionControl::Commit { id: id.into() });
138 }
139 ConnectorProperties::SqlServerCdc(_) => {
140 return Ok(TransactionControl::Commit { id: id.into() });
141 }
142 _ => {}
143 },
144 _ => {}
145 }
146 }
147
148 Err(AccessError::Undefined {
149 name: "transaction status".into(),
150 path: TRANSACTION_STATUS.into(),
151 })
152}
153
154macro_rules! jsonb_access_field {
155 ($col:expr, $field:expr, $as_type:tt) => {
156 $crate::paste! {
157 $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
158 }
159 };
160}
161
162pub fn parse_schema_change(
166 accessor: &impl Access,
167 source_id: u32,
168 connector_props: &ConnectorProperties,
169) -> AccessResult<SchemaChangeEnvelope> {
170 let mut schema_changes = vec![];
171
172 let upstream_ddl: String = accessor
173 .access(&[UPSTREAM_DDL], &DataType::Varchar)?
174 .to_owned_datum()
175 .unwrap()
176 .as_utf8()
177 .to_string();
178
179 if let Some(ScalarRefImpl::List(table_changes)) = accessor
180 .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
181 .to_datum_ref()
182 {
183 for datum in table_changes.iter() {
184 let jsonb = match datum {
185 Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
186 _ => unreachable!(""),
187 };
188
189 let id = jsonb_access_field!(jsonb, "id", string);
190 let ty = jsonb_access_field!(jsonb, "type", string);
191 let ddl_type: TableChangeType = ty.as_str().into();
192 if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
193 tracing::debug!("skip table schema change for create/drop command");
194 continue;
195 }
196
197 let mut column_descs: Vec<ColumnDesc> = vec![];
198 if let Some(table) = jsonb.access_object_field("table")
199 && let Some(columns) = table.access_object_field("columns")
200 {
201 for col in columns.array_elements().unwrap() {
202 let name = jsonb_access_field!(col, "name", string);
203 let type_name = jsonb_access_field!(col, "typeName", string);
204
205 let data_type = match *connector_props {
206 ConnectorProperties::PostgresCdc(_) => {
207 DataType::from_str(type_name.as_str()).map_err(|err| {
208 tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
209 AccessError::UnsupportedType {
210 ty: type_name.clone(),
211 }
212 })?
213 }
214 ConnectorProperties::MysqlCdc(_) => {
215 let ty = type_name_to_mysql_type(type_name.as_str());
216 match ty {
217 Some(ty) => mysql_type_to_rw_type(&ty).map_err(|err| {
218 tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
219 AccessError::UnsupportedType {
220 ty: type_name.clone(),
221 }
222 })?,
223 None => {
224 Err(AccessError::UnsupportedType { ty: type_name })?
225 }
226 }
227 }
228 _ => {
229 unreachable!()
230 }
231 };
232
233 let column_desc = match col.access_object_field("defaultValueExpression") {
235 Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
236 let value_text: Option<String>;
237 let default_val_expr_str = default_val_expr_str.as_str().unwrap();
238 match *connector_props {
239 ConnectorProperties::PostgresCdc(_) => {
240 match default_val_expr_str
243 .split("::")
244 .map(|s| s.trim_matches('\''))
245 .next()
246 {
247 None => {
248 value_text = None;
249 }
250 Some(val_text) => {
251 value_text = Some(val_text.to_owned());
252 }
253 }
254 }
255 ConnectorProperties::MysqlCdc(_) => {
256 if data_type == DataType::Timestamptz {
259 value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
260 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
261 AccessError::TypeError {
262 expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
263 got: data_type.to_string(),
264 value: default_val_expr_str.to_owned(),
265 }
266 })?);
267 } else {
268 value_text = Some(default_val_expr_str.to_owned());
269 }
270 }
271 _ => {
272 unreachable!("connector doesn't support schema change")
273 }
274 }
275
276 let snapshot_value: Datum = if let Some(value_text) = value_text {
277 Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
278 |err| {
279 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
280 AccessError::TypeError {
281 expected: "constant expression".into(),
282 got: data_type.to_string(),
283 value: value_text,
284 }
285 },
286 )?)
287 } else {
288 None
289 };
290
291 if snapshot_value.is_none() {
292 tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
293 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
294 } else {
295 ColumnDesc::named_with_default_value(
296 name,
297 ColumnId::placeholder(),
298 data_type,
299 snapshot_value,
300 )
301 }
302 }
303 _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
304 };
305 column_descs.push(column_desc);
306 }
307 }
308
309 let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
311 schema_changes.push(TableSchemaChange {
312 cdc_table_id,
313 columns: column_descs
314 .into_iter()
315 .map(|column_desc| ColumnCatalog {
316 column_desc,
317 is_hidden: false,
318 })
319 .collect_vec(),
320 change_type: ty.as_str().into(),
321 upstream_ddl: upstream_ddl.clone(),
322 });
323 }
324
325 Ok(SchemaChangeEnvelope {
326 table_changes: schema_changes,
327 })
328 } else {
329 Err(AccessError::Undefined {
330 name: "table schema change".into(),
331 path: TABLE_CHANGES.into(),
332 })
333 }
334}
335
336impl<A> DebeziumChangeEvent<A>
337where
338 A: Access,
339{
340 pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
342 assert!(key_accessor.is_some() || value_accessor.is_some());
343 Self {
344 value_accessor,
345 key_accessor,
346 is_mongodb: false,
347 }
348 }
349
350 pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
351 assert!(key_accessor.is_some() || value_accessor.is_some());
352 Self {
353 value_accessor,
354 key_accessor,
355 is_mongodb: true,
356 }
357 }
358
359 pub(crate) fn transaction_control(
363 &self,
364 connector_props: &ConnectorProperties,
365 ) -> Option<TransactionControl> {
366 self.value_accessor
369 .as_ref()
370 .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
371 }
372}
373
374impl<A> ChangeEvent for DebeziumChangeEvent<A>
375where
376 A: Access,
377{
378 fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
379 match self.op()? {
380 ChangeEventOperation::Delete => {
381 if self.is_mongodb && desc.name == "_id" {
384 return self
385 .key_accessor
386 .as_ref()
387 .expect("key_accessor must be provided for delete operation")
388 .access(&[&desc.name], &desc.data_type);
389 }
390
391 if let Some(va) = self.value_accessor.as_ref() {
392 va.access(&[BEFORE, &desc.name], &desc.data_type)
393 } else {
394 self.key_accessor
395 .as_ref()
396 .unwrap()
397 .access(&[&desc.name], &desc.data_type)
398 }
399 }
400
401 ChangeEventOperation::Upsert => {
403 desc.additional_column.column_type.as_ref().map_or_else(
405 || {
406 self.value_accessor
407 .as_ref()
408 .expect("value_accessor must be provided for upsert operation")
409 .access(&[AFTER, &desc.name], &desc.data_type)
410 },
411 |additional_column_type| {
412 match *additional_column_type {
413 ColumnType::Timestamp(_) => {
414 let ts_ms = self
416 .value_accessor
417 .as_ref()
418 .expect("value_accessor must be provided for upsert operation")
419 .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
420 Ok(DatumCow::Owned(ts_ms.map(|scalar| {
421 Timestamptz::from_millis(scalar.into_int64())
422 .expect("source.ts_ms must in millisecond")
423 .to_scalar_value()
424 })))
425 }
426 ColumnType::DatabaseName(_) => self
427 .value_accessor
428 .as_ref()
429 .expect("value_accessor must be provided for upsert operation")
430 .access(&[SOURCE, SOURCE_DB], &desc.data_type),
431 ColumnType::SchemaName(_) => self
432 .value_accessor
433 .as_ref()
434 .expect("value_accessor must be provided for upsert operation")
435 .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
436 ColumnType::TableName(_) => self
437 .value_accessor
438 .as_ref()
439 .expect("value_accessor must be provided for upsert operation")
440 .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
441 ColumnType::CollectionName(_) => self
442 .value_accessor
443 .as_ref()
444 .expect("value_accessor must be provided for upsert operation")
445 .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
446 _ => Err(AccessError::UnsupportedAdditionalColumn {
447 name: desc.name.clone(),
448 }),
449 }
450 },
451 )
452 }
453 }
454 }
455
456 fn op(&self) -> Result<ChangeEventOperation, AccessError> {
457 if let Some(accessor) = &self.value_accessor {
458 if let Some(ScalarRefImpl::Utf8(op)) =
459 accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
460 {
461 match op {
462 DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
463 return Ok(ChangeEventOperation::Upsert);
464 }
465 DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
466 _ => (),
467 }
468 }
469 Err(super::AccessError::Undefined {
470 name: "op".into(),
471 path: Default::default(),
472 })
473 } else {
474 Ok(ChangeEventOperation::Delete)
475 }
476 }
477}
478
479pub struct MongoJsonAccess<A> {
483 accessor: A,
484 strong_schema: bool,
485}
486
487pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
488 let id_field = if let Some(value) = bson_doc.get("_id") {
489 value
490 } else {
491 bson_doc
492 };
493
494 let type_error = || AccessError::TypeError {
495 expected: id_type.to_string(),
496 got: match id_field {
497 serde_json::Value::Null => "null",
498 serde_json::Value::Bool(_) => "bool",
499 serde_json::Value::Number(_) => "number",
500 serde_json::Value::String(_) => "string",
501 serde_json::Value::Array(_) => "array",
502 serde_json::Value::Object(_) => "object",
503 }
504 .to_owned(),
505 value: id_field.to_string(),
506 };
507
508 let id: Datum = match id_type {
509 DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
510 DataType::Varchar => match id_field {
511 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
512 serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
513 obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
514 )),
515 _ => return Err(type_error()),
516 },
517 DataType::Int32 => {
518 if let serde_json::Value::Object(obj) = id_field
519 && obj.contains_key("$numberInt")
520 {
521 let int_str = obj["$numberInt"].as_str().unwrap_or_default();
522 Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
523 } else {
524 return Err(type_error());
525 }
526 }
527 DataType::Int64 => {
528 if let serde_json::Value::Object(obj) = id_field
529 && obj.contains_key("$numberLong")
530 {
531 let int_str = obj["$numberLong"].as_str().unwrap_or_default();
532 Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
533 } else {
534 return Err(type_error());
535 }
536 }
537 _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
538 };
539 Ok(id)
540}
541
542pub fn extract_bson_field(
555 type_expected: &DataType,
556 bson_doc: &serde_json::Value,
557 field: Option<&str>,
558) -> AccessResult {
559 let type_error = |datum: &serde_json::Value| AccessError::TypeError {
560 expected: type_expected.to_string(),
561 got: match bson_doc {
562 serde_json::Value::Null => "null",
563 serde_json::Value::Bool(_) => "bool",
564 serde_json::Value::Number(_) => "number",
565 serde_json::Value::String(_) => "string",
566 serde_json::Value::Array(_) => "array",
567 serde_json::Value::Object(_) => "object",
568 }
569 .to_owned(),
570 value: datum.to_string(),
571 };
572
573 let datum = if field.is_some() {
574 let Some(bson_doc) = bson_doc.get(field.unwrap()) else {
575 return Err(type_error(bson_doc));
576 };
577 bson_doc
578 } else {
579 bson_doc
580 };
581
582 if datum.is_null() {
583 return Ok(None);
584 }
585
586 let field_datum: Datum = match type_expected {
587 DataType::Boolean => {
588 if datum.is_boolean() {
589 Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
590 } else {
591 return Err(type_error(datum));
592 }
593 }
594 DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
595 DataType::Varchar => match datum {
596 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
597 serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
598 obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
599 }
600 _ => return Err(type_error(datum)),
601 },
602 DataType::Int16
603 | DataType::Int32
604 | DataType::Int64
605 | DataType::Int256
606 | DataType::Float32
607 | DataType::Float64 => {
608 if !datum.is_object() {
609 return Err(type_error(datum));
610 };
611
612 bson_extract_number(datum, type_expected)?
613 }
614
615 DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
616 if let serde_json::Value::Object(mp) = datum {
617 if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
618 bson_extract_timestamp(datum, type_expected)?
619 } else if mp.contains_key("$date") {
620 bson_extract_date(datum, type_expected)?
621 } else {
622 return Err(type_error(datum));
623 }
624 } else {
625 return Err(type_error(datum));
626 }
627 }
628 DataType::Decimal => {
629 if let serde_json::Value::Object(obj) = datum
630 && obj.contains_key("$numberDecimal")
631 && obj["$numberDecimal"].is_string()
632 {
633 let number = obj["$numberDecimal"].as_str().unwrap();
634
635 let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
636 AccessError::TypeError {
637 expected: type_expected.to_string(),
638 got: "unparsable string".into(),
639 value: number.to_owned(),
640 }
641 })?;
642 Some(ScalarImpl::Decimal(dec))
643 } else {
644 return Err(type_error(datum));
645 }
646 }
647
648 DataType::Bytea => {
649 if let serde_json::Value::Object(obj) = datum
650 && obj.contains_key("$binary")
651 && obj["$binary"].is_object()
652 {
653 use base64::Engine;
654
655 let binary = obj["$binary"].as_object().unwrap();
656
657 if !binary.contains_key("$base64")
658 || !binary["$base64"].is_string()
659 || !binary.contains_key("$subType")
660 || !binary["$subType"].is_string()
661 {
662 return Err(AccessError::TypeError {
663 expected: type_expected.to_string(),
664 got: "object".into(),
665 value: datum.to_string(),
666 });
667 }
668
669 let b64_str = binary["$base64"]
670 .as_str()
671 .ok_or_else(|| AccessError::TypeError {
672 expected: type_expected.to_string(),
673 got: "object".into(),
674 value: datum.to_string(),
675 })?;
676
677 let _type_str =
679 binary["$subType"]
680 .as_str()
681 .ok_or_else(|| AccessError::TypeError {
682 expected: type_expected.to_string(),
683 got: "object".into(),
684 value: datum.to_string(),
685 })?;
686
687 let bytes = base64::prelude::BASE64_STANDARD
688 .decode(b64_str)
689 .map_err(|_| AccessError::TypeError {
690 expected: "$binary object with $base64 string and $subType string field"
691 .to_owned(),
692 got: "string".to_owned(),
693 value: bson_doc.to_string(),
694 })?;
695 let bytea = ScalarImpl::Bytea(bytes.into());
696 Some(bytea)
697 } else {
698 return Err(type_error(datum));
699 }
700 }
701
702 DataType::Struct(struct_fields) => {
703 let mut datums = vec![];
704 for (field_name, field_type) in struct_fields.iter() {
705 let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
706 datums.push(field_datum);
707 }
708 let value = StructValue::new(datums);
709
710 Some(ScalarImpl::Struct(value))
711 }
712
713 DataType::List(list_type) => {
714 let Some(d_array) = datum.as_array() else {
715 return Err(type_error(datum));
716 };
717
718 let mut builder = list_type.create_array_builder(d_array.len());
719 for item in d_array {
720 builder.append(extract_bson_field(list_type, item, None)?);
721 }
722 Some(ScalarImpl::from(ListValue::new(builder.finish())))
723 }
724
725 _ => {
726 if let Some(field_name) = field {
727 unreachable!(
728 "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
729 )
730 } else {
731 let type_expected = type_expected.to_string();
732 unreachable!(
733 "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
734 )
735 }
736 }
737 };
738 Ok(field_datum)
739}
740
741fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
742 let field_name = match type_expected {
743 DataType::Int16 => "$numberInt",
744 DataType::Int32 => "$numberInt",
745 DataType::Int64 => "$numberLong",
746 DataType::Int256 => "$numberLong",
747 DataType::Float32 => "$numberDouble",
748 DataType::Float64 => "$numberDouble",
749 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
750 };
751
752 let datum = bson_doc.get(field_name);
753 if datum.is_none() {
754 return Err(AccessError::TypeError {
755 expected: type_expected.to_string(),
756 got: "object".into(),
757 value: bson_doc.to_string(),
758 });
759 }
760
761 let datum = datum.unwrap();
762
763 if datum.is_string() {
764 let Some(num_str) = datum.as_str() else {
765 return Err(AccessError::TypeError {
766 expected: type_expected.to_string(),
767 got: "string".into(),
768 value: datum.to_string(),
769 });
770 };
771 if [DataType::Float32, DataType::Float64].contains(type_expected) {
773 match (num_str, type_expected) {
774 ("Infinity", DataType::Float64) => {
775 return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
776 }
777 ("Infinity", DataType::Float32) => {
778 return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
779 }
780 ("-Infinity", DataType::Float64) => {
781 return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
782 }
783 ("-Infinity", DataType::Float32) => {
784 return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
785 }
786 ("NaN", DataType::Float64) => {
787 return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
788 }
789 ("NaN", DataType::Float32) => {
790 return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
791 }
792 _ => {}
793 }
794
795 let parsed_num: f64 = match num_str.parse() {
796 Ok(n) => n,
797 Err(_e) => {
798 return Err(AccessError::TypeError {
799 expected: type_expected.to_string(),
800 got: "string".into(),
801 value: num_str.to_owned(),
802 });
803 }
804 };
805 if *type_expected == DataType::Float64 {
806 return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
807 } else {
808 let parsed_num = parsed_num as f32;
809 return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
810 }
811 }
812 if *type_expected == DataType::Int256 {
814 let parsed_num = match Int256::from_str(num_str) {
815 Ok(n) => n,
816 Err(_) => {
817 return Err(AccessError::TypeError {
818 expected: type_expected.to_string(),
819 got: "string".into(),
820 value: num_str.to_owned(),
821 });
822 }
823 };
824 return Ok(Some(ScalarImpl::Int256(parsed_num)));
825 }
826
827 let parsed_num: i64 = match num_str.parse() {
829 Ok(n) => n,
830 Err(_e) => {
831 return Err(AccessError::TypeError {
832 expected: type_expected.to_string(),
833 got: "string".into(),
834 value: num_str.to_owned(),
835 });
836 }
837 };
838 match type_expected {
839 DataType::Int16 => {
840 if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
841 return Err(AccessError::TypeError {
842 expected: type_expected.to_string(),
843 got: "string".into(),
844 value: num_str.to_owned(),
845 });
846 }
847 return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
848 }
849 DataType::Int32 => {
850 if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
851 return Err(AccessError::TypeError {
852 expected: type_expected.to_string(),
853 got: "string".into(),
854 value: num_str.to_owned(),
855 });
856 }
857 return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
858 }
859 DataType::Int64 => {
860 return Ok(Some(ScalarImpl::Int64(parsed_num)));
861 }
862 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
863 }
864 }
865 if datum.is_null() {
866 return Err(AccessError::TypeError {
867 expected: type_expected.to_string(),
868 got: "null".into(),
869 value: bson_doc.to_string(),
870 });
871 }
872
873 if datum.is_array() {
874 return Err(AccessError::TypeError {
875 expected: type_expected.to_string(),
876 got: "array".to_owned(),
877 value: datum.to_string(),
878 });
879 }
880
881 if datum.is_object() {
882 return Err(AccessError::TypeError {
883 expected: type_expected.to_string(),
884 got: "object".to_owned(),
885 value: datum.to_string(),
886 });
887 }
888
889 if datum.is_boolean() {
890 return Err(AccessError::TypeError {
891 expected: type_expected.to_string(),
892 got: "boolean".into(),
893 value: bson_doc.to_string(),
894 });
895 }
896
897 if datum.is_number() {
898 let got_type = if datum.is_f64() { "f64" } else { "i64" };
899 return Err(AccessError::TypeError {
900 expected: type_expected.to_string(),
901 got: got_type.into(),
902 value: bson_doc.to_string(),
903 });
904 }
905
906 Err(AccessError::TypeError {
907 expected: type_expected.to_string(),
908 got: "unknown".into(),
909 value: bson_doc.to_string(),
910 })
911}
912
913fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
914 let datum = &bson_doc["$date"];
929
930 let type_error = || AccessError::TypeError {
931 expected: type_expected.to_string(),
932 got: match bson_doc {
933 serde_json::Value::Null => "null",
934 serde_json::Value::Bool(_) => "bool",
935 serde_json::Value::Number(_) => "number",
936 serde_json::Value::String(_) => "string",
937 serde_json::Value::Array(_) => "array",
938 serde_json::Value::Object(_) => "object",
939 }
940 .to_owned(),
941 value: datum.to_string(),
942 };
943
944 let millis = match datum {
946 serde_json::Value::Object(obj)
948 if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
949 {
950 obj["$numberLong"]
951 .as_str()
952 .unwrap()
953 .parse::<i64>()
954 .map_err(|_| AccessError::TypeError {
955 expected: "timestamp".into(),
956 got: "object".into(),
957 value: datum.to_string(),
958 })?
959 }
960 serde_json::Value::String(s) => {
962 let dt =
963 chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
964 expected: "valid ISO-8601 date string".into(),
965 got: "string".into(),
966 value: datum.to_string(),
967 })?;
968 dt.timestamp_millis()
969 }
970
971 serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
974 expected: "timestamp".into(),
975 got: "number".into(),
976 value: datum.to_string(),
977 })?,
978
979 _ => return Err(type_error()),
980 };
981
982 let datetime =
983 chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
984 expected: "timestamp".into(),
985 got: "object".into(),
986 value: datum.to_string(),
987 })?;
988
989 let res = match type_expected {
990 DataType::Date => {
991 let naive = datetime.naive_local();
992 let dt = naive.date();
993 Some(ScalarImpl::Date(dt.into()))
994 }
995 DataType::Time => {
996 let naive = datetime.naive_local();
997 let dt = naive.time();
998 Some(ScalarImpl::Time(dt.into()))
999 }
1000 DataType::Timestamp => {
1001 let naive = datetime.naive_local();
1002 let dt = Timestamp::from(naive);
1003 Some(ScalarImpl::Timestamp(dt))
1004 }
1005 DataType::Timestamptz => {
1006 let dt = datetime.into();
1007 Some(ScalarImpl::Timestamptz(dt))
1008 }
1009 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1010 };
1011 Ok(res)
1012}
1013
1014fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1015 let Some(obj) = bson_doc["$timestamp"].as_object() else {
1033 return Err(AccessError::TypeError {
1034 expected: "timestamp".into(),
1035 got: "object".into(),
1036 value: bson_doc.to_string(),
1037 });
1038 };
1039
1040 if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1041 {
1042 return Err(AccessError::TypeError {
1043 expected: "timestamp with valid seconds since epoch".into(),
1044 got: "object".into(),
1045 value: bson_doc.to_string(),
1046 });
1047 }
1048
1049 let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1050 expected: "timestamp with valid seconds since epoch".into(),
1051 got: "object".into(),
1052 value: bson_doc.to_string(),
1053 })?;
1054
1055 let chrono_datetime =
1056 chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1057 expected: type_expected.to_string(),
1058 got: "object".to_owned(),
1059 value: bson_doc.to_string(),
1060 })?;
1061
1062 let res = match type_expected {
1063 DataType::Date => {
1064 let naive = chrono_datetime.naive_local();
1065 let dt = naive.date();
1066 Some(ScalarImpl::Date(dt.into()))
1067 }
1068 DataType::Time => {
1069 let naive = chrono_datetime.naive_local();
1070 let dt = naive.time();
1071 Some(ScalarImpl::Time(dt.into()))
1072 }
1073 DataType::Timestamp => {
1074 let naive = chrono_datetime.naive_local();
1075 let dt = Timestamp::from(naive);
1076 Some(ScalarImpl::Timestamp(dt))
1077 }
1078 DataType::Timestamptz => {
1079 let dt = chrono_datetime.into();
1080 Some(ScalarImpl::Timestamptz(dt))
1081 }
1082 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1083 };
1084
1085 Ok(res)
1086}
1087
1088impl<A> MongoJsonAccess<A> {
1089 pub fn new(accessor: A, strong_schema: bool) -> Self {
1090 Self {
1091 accessor,
1092 strong_schema,
1093 }
1094 }
1095}
1096
1097impl<A> Access for MongoJsonAccess<A>
1098where
1099 A: Access,
1100{
1101 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1102 match path {
1103 ["after" | "before", "_id"] => {
1104 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1105 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1106 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1107 } else {
1108 Err(AccessError::Undefined {
1110 name: "_id".to_owned(),
1111 path: path[0].to_owned(),
1112 })?
1113 }
1114 }
1115
1116 ["after" | "before", "payload"] if !self.strong_schema => {
1117 self.access(&[path[0]], &DataType::Jsonb)
1118 }
1119
1120 ["after" | "before", field] if self.strong_schema => {
1121 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1122 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1123 Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1124 } else {
1125 Err(AccessError::Undefined {
1127 name: field.to_string(),
1128 path: path[0].to_owned(),
1129 })?
1130 }
1131 }
1132
1133 ["_id"] => {
1137 let ret = self.accessor.access(path, type_expected);
1138 if matches!(ret, Err(AccessError::Undefined { .. })) {
1139 let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1140 if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1141 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1142 } else {
1143 Err(AccessError::Undefined {
1145 name: "_id".to_owned(),
1146 path: "id".to_owned(),
1147 })?
1148 }
1149 } else {
1150 ret
1151 }
1152 }
1153 _ => self.accessor.access(path, type_expected),
1154 }
1155 }
1156}