1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::types::{
20 DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
21 Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
22};
23use risingwave_connector_codec::decoder::AccessExt;
24use risingwave_pb::plan_common::additional_column::ColumnType;
25use thiserror_ext::AsReport;
26
27use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
28use crate::parser::TransactionControl;
29use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
30use crate::parser::schema_change::TableChangeType;
31use crate::source::cdc::build_cdc_table_id;
32use crate::source::cdc::external::mysql::{
33 mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
34};
35use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
36use crate::source::{ConnectorProperties, SourceColumnDesc};
37
38fn is_bigserial_default(default_value_expression: &str) -> bool {
53 if default_value_expression.trim().is_empty() {
54 return false;
55 }
56
57 let expr = default_value_expression.trim();
58
59 expr.starts_with("nextval(")
61}
62
63pub struct DebeziumChangeEvent<A> {
97 value_accessor: Option<A>,
98 key_accessor: Option<A>,
99 is_mongodb: bool,
100}
101
102const BEFORE: &str = "before";
103const AFTER: &str = "after";
104
105const UPSTREAM_DDL: &str = "ddl";
106const SOURCE: &str = "source";
107const SOURCE_TS_MS: &str = "ts_ms";
108const SOURCE_DB: &str = "db";
109const SOURCE_SCHEMA: &str = "schema";
110const SOURCE_TABLE: &str = "table";
111const SOURCE_COLLECTION: &str = "collection";
112
113const OP: &str = "op";
114pub const TRANSACTION_STATUS: &str = "status";
115pub const TRANSACTION_ID: &str = "id";
116
117pub const TABLE_CHANGES: &str = "tableChanges";
118
119pub const DEBEZIUM_READ_OP: &str = "r";
120pub const DEBEZIUM_CREATE_OP: &str = "c";
121pub const DEBEZIUM_UPDATE_OP: &str = "u";
122pub const DEBEZIUM_DELETE_OP: &str = "d";
123
124pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
125pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
126
127pub fn parse_transaction_meta(
128 accessor: &impl Access,
129 connector_props: &ConnectorProperties,
130) -> AccessResult<TransactionControl> {
131 if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
132 accessor
133 .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
134 .to_datum_ref(),
135 accessor
136 .access(&[TRANSACTION_ID], &DataType::Varchar)?
137 .to_datum_ref(),
138 ) {
139 match status {
144 DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
145 ConnectorProperties::PostgresCdc(_) => {
146 let (tx_id, _) = id.split_once(':').unwrap();
147 return Ok(TransactionControl::Begin { id: tx_id.into() });
148 }
149 ConnectorProperties::MysqlCdc(_) => {
150 return Ok(TransactionControl::Begin { id: id.into() });
151 }
152 ConnectorProperties::SqlServerCdc(_) => {
153 return Ok(TransactionControl::Begin { id: id.into() });
154 }
155 _ => {}
156 },
157 DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
158 ConnectorProperties::PostgresCdc(_) => {
159 let (tx_id, _) = id.split_once(':').unwrap();
160 return Ok(TransactionControl::Commit { id: tx_id.into() });
161 }
162 ConnectorProperties::MysqlCdc(_) => {
163 return Ok(TransactionControl::Commit { id: id.into() });
164 }
165 ConnectorProperties::SqlServerCdc(_) => {
166 return Ok(TransactionControl::Commit { id: id.into() });
167 }
168 _ => {}
169 },
170 _ => {}
171 }
172 }
173
174 Err(AccessError::Undefined {
175 name: "transaction status".into(),
176 path: TRANSACTION_STATUS.into(),
177 })
178}
179
180macro_rules! jsonb_access_field {
181 ($col:expr, $field:expr, $as_type:tt) => {
182 $crate::paste! {
183 $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
184 }
185 };
186}
187
188pub fn parse_schema_change(
192 accessor: &impl Access,
193 source_id: u32,
194 source_name: &str,
195 connector_props: &ConnectorProperties,
196) -> AccessResult<SchemaChangeEnvelope> {
197 let mut schema_changes = vec![];
198
199 let upstream_ddl: String = accessor
200 .access(&[UPSTREAM_DDL], &DataType::Varchar)?
201 .to_owned_datum()
202 .unwrap()
203 .as_utf8()
204 .to_string();
205
206 if let Some(ScalarRefImpl::List(table_changes)) = accessor
207 .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
208 .to_datum_ref()
209 {
210 for datum in table_changes.iter() {
211 let jsonb = match datum {
212 Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
213 _ => unreachable!(""),
214 };
215 let id: String = jsonb_access_field!(jsonb, "id", string);
216 let ty = jsonb_access_field!(jsonb, "type", string);
217
218 let table_name = id.trim_matches('"').to_owned();
219 let ddl_type: TableChangeType = ty.as_str().into();
220 if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
221 tracing::debug!("skip table schema change for create/drop command");
222 continue;
223 }
224
225 let mut column_descs: Vec<ColumnDesc> = vec![];
226 if let Some(table) = jsonb.access_object_field("table")
227 && let Some(columns) = table.access_object_field("columns")
228 {
229 for col in columns.array_elements().unwrap() {
230 let name = jsonb_access_field!(col, "name", string);
231 let type_name = jsonb_access_field!(col, "typeName", string);
232 let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
234 let data_type = match *connector_props {
235 ConnectorProperties::PostgresCdc(_) => {
236 let ty = type_name_to_pg_type(type_name.as_str());
237 if is_enum {
238 tracing::debug!(target: "auto_schema_change",
239 "Convert PostgreSQL user defined enum type '{}' to VARCHAR", type_name);
240 DataType::Varchar
241 } else {
242 match ty {
243 Some(ty) => match pg_type_to_rw_type(&ty) {
244 Ok(data_type) => data_type,
245 Err(err) => {
246 tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
247 return Err(AccessError::CdcAutoSchemaChangeError {
248 ty: type_name,
249 table_name: format!(
250 "{}.{}",
251 source_name, table_name
252 ),
253 });
254 }
255 },
256 None => {
257 return Err(AccessError::CdcAutoSchemaChangeError {
258 ty: type_name,
259 table_name: format!("{}.{}", source_name, table_name),
260 });
261 }
262 }
263 }
264 }
265 ConnectorProperties::MysqlCdc(_) => {
266 let ty = type_name_to_mysql_type(type_name.as_str());
267 match ty {
268 Some(ty) => match mysql_type_to_rw_type(&ty) {
269 Ok(data_type) => data_type,
270 Err(err) => {
271 tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
272 return Err(AccessError::CdcAutoSchemaChangeError {
273 ty: type_name,
274 table_name: format!("{}.{}", source_name, table_name),
275 });
276 }
277 },
278 None => {
279 return Err(AccessError::CdcAutoSchemaChangeError {
280 ty: type_name,
281 table_name: format!("{}.{}", source_name, table_name),
282 });
283 }
284 }
285 }
286 _ => {
287 unreachable!()
288 }
289 };
290
291 let column_desc = match col.access_object_field("defaultValueExpression") {
293 Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
294 let default_val_expr_str = default_val_expr_str.as_str().unwrap();
295 if is_bigserial_default(default_val_expr_str) {
297 tracing::warn!(target: "auto_schema_change",
298 "Ignoring unsupported BIGSERIAL default value expression: {}", default_val_expr_str);
299 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
300 } else {
301 let value_text: Option<String>;
302 match *connector_props {
303 ConnectorProperties::PostgresCdc(_) => {
304 match default_val_expr_str
307 .split("::")
308 .map(|s| s.trim_matches('\''))
309 .next()
310 {
311 None => {
312 value_text = None;
313 }
314 Some(val_text) => {
315 value_text = Some(val_text.to_owned());
316 }
317 }
318 }
319 ConnectorProperties::MysqlCdc(_) => {
320 if data_type == DataType::Timestamptz {
323 value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
324 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
325 AccessError::TypeError {
326 expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
327 got: data_type.to_string(),
328 value: default_val_expr_str.to_owned(),
329 }
330 })?);
331 } else {
332 value_text = Some(default_val_expr_str.to_owned());
333 }
334 }
335 _ => {
336 unreachable!("connector doesn't support schema change")
337 }
338 }
339
340 let snapshot_value: Datum = if let Some(value_text) = value_text {
341 Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
342 |err| {
343 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
344 AccessError::TypeError {
345 expected: "constant expression".into(),
346 got: data_type.to_string(),
347 value: value_text,
348 }
349 },
350 )?)
351 } else {
352 None
353 };
354
355 if snapshot_value.is_none() {
356 tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
357 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
358 } else {
359 ColumnDesc::named_with_default_value(
360 name,
361 ColumnId::placeholder(),
362 data_type,
363 snapshot_value,
364 )
365 }
366 }
367 }
368 _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
369 };
370 column_descs.push(column_desc);
371 }
372 }
373
374 let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
376 schema_changes.push(TableSchemaChange {
377 cdc_table_id,
378 columns: column_descs
379 .into_iter()
380 .map(|column_desc| ColumnCatalog {
381 column_desc,
382 is_hidden: false,
383 })
384 .collect_vec(),
385 change_type: ty.as_str().into(),
386 upstream_ddl: upstream_ddl.clone(),
387 });
388 }
389
390 Ok(SchemaChangeEnvelope {
391 table_changes: schema_changes,
392 })
393 } else {
394 Err(AccessError::Undefined {
395 name: "table schema change".into(),
396 path: TABLE_CHANGES.into(),
397 })
398 }
399}
400
401impl<A> DebeziumChangeEvent<A>
402where
403 A: Access,
404{
405 pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
407 assert!(key_accessor.is_some() || value_accessor.is_some());
408 Self {
409 value_accessor,
410 key_accessor,
411 is_mongodb: false,
412 }
413 }
414
415 pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
416 assert!(key_accessor.is_some() || value_accessor.is_some());
417 Self {
418 value_accessor,
419 key_accessor,
420 is_mongodb: true,
421 }
422 }
423
424 pub(crate) fn transaction_control(
428 &self,
429 connector_props: &ConnectorProperties,
430 ) -> Option<TransactionControl> {
431 self.value_accessor
434 .as_ref()
435 .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
436 }
437}
438
439impl<A> ChangeEvent for DebeziumChangeEvent<A>
440where
441 A: Access,
442{
443 fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
444 match self.op()? {
445 ChangeEventOperation::Delete => {
446 if self.is_mongodb && desc.name == "_id" {
449 return self
450 .key_accessor
451 .as_ref()
452 .expect("key_accessor must be provided for delete operation")
453 .access(&[&desc.name], &desc.data_type);
454 }
455
456 if let Some(va) = self.value_accessor.as_ref() {
457 va.access(&[BEFORE, &desc.name], &desc.data_type)
458 } else {
459 self.key_accessor
460 .as_ref()
461 .unwrap()
462 .access(&[&desc.name], &desc.data_type)
463 }
464 }
465
466 ChangeEventOperation::Upsert => {
468 desc.additional_column.column_type.as_ref().map_or_else(
470 || {
471 self.value_accessor
472 .as_ref()
473 .expect("value_accessor must be provided for upsert operation")
474 .access(&[AFTER, &desc.name], &desc.data_type)
475 },
476 |additional_column_type| {
477 match *additional_column_type {
478 ColumnType::Timestamp(_) => {
479 let ts_ms = self
481 .value_accessor
482 .as_ref()
483 .expect("value_accessor must be provided for upsert operation")
484 .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
485 Ok(DatumCow::Owned(ts_ms.map(|scalar| {
486 Timestamptz::from_millis(scalar.into_int64())
487 .expect("source.ts_ms must in millisecond")
488 .to_scalar_value()
489 })))
490 }
491 ColumnType::DatabaseName(_) => self
492 .value_accessor
493 .as_ref()
494 .expect("value_accessor must be provided for upsert operation")
495 .access(&[SOURCE, SOURCE_DB], &desc.data_type),
496 ColumnType::SchemaName(_) => self
497 .value_accessor
498 .as_ref()
499 .expect("value_accessor must be provided for upsert operation")
500 .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
501 ColumnType::TableName(_) => self
502 .value_accessor
503 .as_ref()
504 .expect("value_accessor must be provided for upsert operation")
505 .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
506 ColumnType::CollectionName(_) => self
507 .value_accessor
508 .as_ref()
509 .expect("value_accessor must be provided for upsert operation")
510 .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
511 _ => Err(AccessError::UnsupportedAdditionalColumn {
512 name: desc.name.clone(),
513 }),
514 }
515 },
516 )
517 }
518 }
519 }
520
521 fn op(&self) -> Result<ChangeEventOperation, AccessError> {
522 if let Some(accessor) = &self.value_accessor {
523 if let Some(ScalarRefImpl::Utf8(op)) =
524 accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
525 {
526 match op {
527 DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
528 return Ok(ChangeEventOperation::Upsert);
529 }
530 DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
531 _ => (),
532 }
533 }
534 Err(super::AccessError::Undefined {
535 name: "op".into(),
536 path: Default::default(),
537 })
538 } else {
539 Ok(ChangeEventOperation::Delete)
540 }
541 }
542}
543
544pub struct MongoJsonAccess<A> {
548 accessor: A,
549 strong_schema: bool,
550}
551
552pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
553 let id_field = if let Some(value) = bson_doc.get("_id") {
554 value
555 } else {
556 bson_doc
557 };
558
559 let type_error = || AccessError::TypeError {
560 expected: id_type.to_string(),
561 got: match id_field {
562 serde_json::Value::Null => "null",
563 serde_json::Value::Bool(_) => "bool",
564 serde_json::Value::Number(_) => "number",
565 serde_json::Value::String(_) => "string",
566 serde_json::Value::Array(_) => "array",
567 serde_json::Value::Object(_) => "object",
568 }
569 .to_owned(),
570 value: id_field.to_string(),
571 };
572
573 let id: Datum = match id_type {
574 DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
575 DataType::Varchar => match id_field {
576 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
577 serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
578 obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
579 )),
580 _ => return Err(type_error()),
581 },
582 DataType::Int32 => {
583 if let serde_json::Value::Object(obj) = id_field
584 && obj.contains_key("$numberInt")
585 {
586 let int_str = obj["$numberInt"].as_str().unwrap_or_default();
587 Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
588 } else {
589 return Err(type_error());
590 }
591 }
592 DataType::Int64 => {
593 if let serde_json::Value::Object(obj) = id_field
594 && obj.contains_key("$numberLong")
595 {
596 let int_str = obj["$numberLong"].as_str().unwrap_or_default();
597 Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
598 } else {
599 return Err(type_error());
600 }
601 }
602 _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
603 };
604 Ok(id)
605}
606
607pub fn extract_bson_field(
620 type_expected: &DataType,
621 bson_doc: &serde_json::Value,
622 field: Option<&str>,
623) -> AccessResult {
624 let type_error = |datum: &serde_json::Value| AccessError::TypeError {
625 expected: type_expected.to_string(),
626 got: match bson_doc {
627 serde_json::Value::Null => "null",
628 serde_json::Value::Bool(_) => "bool",
629 serde_json::Value::Number(_) => "number",
630 serde_json::Value::String(_) => "string",
631 serde_json::Value::Array(_) => "array",
632 serde_json::Value::Object(_) => "object",
633 }
634 .to_owned(),
635 value: datum.to_string(),
636 };
637
638 let datum = if let Some(field) = field {
639 let Some(bson_doc) = bson_doc.get(field) else {
640 return Err(type_error(bson_doc));
641 };
642 bson_doc
643 } else {
644 bson_doc
645 };
646
647 if datum.is_null() {
648 return Ok(None);
649 }
650
651 let field_datum: Datum = match type_expected {
652 DataType::Boolean => {
653 if datum.is_boolean() {
654 Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
655 } else {
656 return Err(type_error(datum));
657 }
658 }
659 DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
660 DataType::Varchar => match datum {
661 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
662 serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
663 obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
664 }
665 _ => return Err(type_error(datum)),
666 },
667 DataType::Int16
668 | DataType::Int32
669 | DataType::Int64
670 | DataType::Int256
671 | DataType::Float32
672 | DataType::Float64 => {
673 if !datum.is_object() {
674 return Err(type_error(datum));
675 };
676
677 bson_extract_number(datum, type_expected)?
678 }
679
680 DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
681 if let serde_json::Value::Object(mp) = datum {
682 if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
683 bson_extract_timestamp(datum, type_expected)?
684 } else if mp.contains_key("$date") {
685 bson_extract_date(datum, type_expected)?
686 } else {
687 return Err(type_error(datum));
688 }
689 } else {
690 return Err(type_error(datum));
691 }
692 }
693 DataType::Decimal => {
694 if let serde_json::Value::Object(obj) = datum
695 && obj.contains_key("$numberDecimal")
696 && obj["$numberDecimal"].is_string()
697 {
698 let number = obj["$numberDecimal"].as_str().unwrap();
699
700 let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
701 AccessError::TypeError {
702 expected: type_expected.to_string(),
703 got: "unparsable string".into(),
704 value: number.to_owned(),
705 }
706 })?;
707 Some(ScalarImpl::Decimal(dec))
708 } else {
709 return Err(type_error(datum));
710 }
711 }
712
713 DataType::Bytea => {
714 if let serde_json::Value::Object(obj) = datum
715 && obj.contains_key("$binary")
716 && obj["$binary"].is_object()
717 {
718 use base64::Engine;
719
720 let binary = obj["$binary"].as_object().unwrap();
721
722 if !binary.contains_key("$base64")
723 || !binary["$base64"].is_string()
724 || !binary.contains_key("$subType")
725 || !binary["$subType"].is_string()
726 {
727 return Err(AccessError::TypeError {
728 expected: type_expected.to_string(),
729 got: "object".into(),
730 value: datum.to_string(),
731 });
732 }
733
734 let b64_str = binary["$base64"]
735 .as_str()
736 .ok_or_else(|| AccessError::TypeError {
737 expected: type_expected.to_string(),
738 got: "object".into(),
739 value: datum.to_string(),
740 })?;
741
742 let _type_str =
744 binary["$subType"]
745 .as_str()
746 .ok_or_else(|| AccessError::TypeError {
747 expected: type_expected.to_string(),
748 got: "object".into(),
749 value: datum.to_string(),
750 })?;
751
752 let bytes = base64::prelude::BASE64_STANDARD
753 .decode(b64_str)
754 .map_err(|_| AccessError::TypeError {
755 expected: "$binary object with $base64 string and $subType string field"
756 .to_owned(),
757 got: "string".to_owned(),
758 value: bson_doc.to_string(),
759 })?;
760 let bytea = ScalarImpl::Bytea(bytes.into());
761 Some(bytea)
762 } else {
763 return Err(type_error(datum));
764 }
765 }
766
767 DataType::Struct(struct_fields) => {
768 let mut datums = vec![];
769 for (field_name, field_type) in struct_fields.iter() {
770 let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
771 datums.push(field_datum);
772 }
773 let value = StructValue::new(datums);
774
775 Some(ScalarImpl::Struct(value))
776 }
777
778 DataType::List(list_type) => {
779 let Some(d_array) = datum.as_array() else {
780 return Err(type_error(datum));
781 };
782
783 let mut builder = list_type.create_array_builder(d_array.len());
784 for item in d_array {
785 builder.append(extract_bson_field(list_type, item, None)?);
786 }
787 Some(ScalarImpl::from(ListValue::new(builder.finish())))
788 }
789
790 _ => {
791 if let Some(field_name) = field {
792 unreachable!(
793 "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
794 )
795 } else {
796 let type_expected = type_expected.to_string();
797 unreachable!(
798 "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
799 )
800 }
801 }
802 };
803 Ok(field_datum)
804}
805
806fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
807 let field_name = match type_expected {
808 DataType::Int16 => "$numberInt",
809 DataType::Int32 => "$numberInt",
810 DataType::Int64 => "$numberLong",
811 DataType::Int256 => "$numberLong",
812 DataType::Float32 => "$numberDouble",
813 DataType::Float64 => "$numberDouble",
814 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
815 };
816
817 let datum = bson_doc.get(field_name);
818 if datum.is_none() {
819 return Err(AccessError::TypeError {
820 expected: type_expected.to_string(),
821 got: "object".into(),
822 value: bson_doc.to_string(),
823 });
824 }
825
826 let datum = datum.unwrap();
827
828 if datum.is_string() {
829 let Some(num_str) = datum.as_str() else {
830 return Err(AccessError::TypeError {
831 expected: type_expected.to_string(),
832 got: "string".into(),
833 value: datum.to_string(),
834 });
835 };
836 if [DataType::Float32, DataType::Float64].contains(type_expected) {
838 match (num_str, type_expected) {
839 ("Infinity", DataType::Float64) => {
840 return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
841 }
842 ("Infinity", DataType::Float32) => {
843 return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
844 }
845 ("-Infinity", DataType::Float64) => {
846 return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
847 }
848 ("-Infinity", DataType::Float32) => {
849 return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
850 }
851 ("NaN", DataType::Float64) => {
852 return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
853 }
854 ("NaN", DataType::Float32) => {
855 return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
856 }
857 _ => {}
858 }
859
860 let parsed_num: f64 = match num_str.parse() {
861 Ok(n) => n,
862 Err(_e) => {
863 return Err(AccessError::TypeError {
864 expected: type_expected.to_string(),
865 got: "string".into(),
866 value: num_str.to_owned(),
867 });
868 }
869 };
870 if *type_expected == DataType::Float64 {
871 return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
872 } else {
873 let parsed_num = parsed_num as f32;
874 return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
875 }
876 }
877 if *type_expected == DataType::Int256 {
879 let parsed_num = match Int256::from_str(num_str) {
880 Ok(n) => n,
881 Err(_) => {
882 return Err(AccessError::TypeError {
883 expected: type_expected.to_string(),
884 got: "string".into(),
885 value: num_str.to_owned(),
886 });
887 }
888 };
889 return Ok(Some(ScalarImpl::Int256(parsed_num)));
890 }
891
892 let parsed_num: i64 = match num_str.parse() {
894 Ok(n) => n,
895 Err(_e) => {
896 return Err(AccessError::TypeError {
897 expected: type_expected.to_string(),
898 got: "string".into(),
899 value: num_str.to_owned(),
900 });
901 }
902 };
903 match type_expected {
904 DataType::Int16 => {
905 if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
906 return Err(AccessError::TypeError {
907 expected: type_expected.to_string(),
908 got: "string".into(),
909 value: num_str.to_owned(),
910 });
911 }
912 return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
913 }
914 DataType::Int32 => {
915 if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
916 return Err(AccessError::TypeError {
917 expected: type_expected.to_string(),
918 got: "string".into(),
919 value: num_str.to_owned(),
920 });
921 }
922 return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
923 }
924 DataType::Int64 => {
925 return Ok(Some(ScalarImpl::Int64(parsed_num)));
926 }
927 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
928 }
929 }
930 if datum.is_null() {
931 return Err(AccessError::TypeError {
932 expected: type_expected.to_string(),
933 got: "null".into(),
934 value: bson_doc.to_string(),
935 });
936 }
937
938 if datum.is_array() {
939 return Err(AccessError::TypeError {
940 expected: type_expected.to_string(),
941 got: "array".to_owned(),
942 value: datum.to_string(),
943 });
944 }
945
946 if datum.is_object() {
947 return Err(AccessError::TypeError {
948 expected: type_expected.to_string(),
949 got: "object".to_owned(),
950 value: datum.to_string(),
951 });
952 }
953
954 if datum.is_boolean() {
955 return Err(AccessError::TypeError {
956 expected: type_expected.to_string(),
957 got: "boolean".into(),
958 value: bson_doc.to_string(),
959 });
960 }
961
962 if datum.is_number() {
963 let got_type = if datum.is_f64() { "f64" } else { "i64" };
964 return Err(AccessError::TypeError {
965 expected: type_expected.to_string(),
966 got: got_type.into(),
967 value: bson_doc.to_string(),
968 });
969 }
970
971 Err(AccessError::TypeError {
972 expected: type_expected.to_string(),
973 got: "unknown".into(),
974 value: bson_doc.to_string(),
975 })
976}
977
978fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
979 let datum = &bson_doc["$date"];
994
995 let type_error = || AccessError::TypeError {
996 expected: type_expected.to_string(),
997 got: match bson_doc {
998 serde_json::Value::Null => "null",
999 serde_json::Value::Bool(_) => "bool",
1000 serde_json::Value::Number(_) => "number",
1001 serde_json::Value::String(_) => "string",
1002 serde_json::Value::Array(_) => "array",
1003 serde_json::Value::Object(_) => "object",
1004 }
1005 .to_owned(),
1006 value: datum.to_string(),
1007 };
1008
1009 let millis = match datum {
1011 serde_json::Value::Object(obj)
1013 if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1014 {
1015 obj["$numberLong"]
1016 .as_str()
1017 .unwrap()
1018 .parse::<i64>()
1019 .map_err(|_| AccessError::TypeError {
1020 expected: "timestamp".into(),
1021 got: "object".into(),
1022 value: datum.to_string(),
1023 })?
1024 }
1025 serde_json::Value::String(s) => {
1027 let dt =
1028 chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1029 expected: "valid ISO-8601 date string".into(),
1030 got: "string".into(),
1031 value: datum.to_string(),
1032 })?;
1033 dt.timestamp_millis()
1034 }
1035
1036 serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1039 expected: "timestamp".into(),
1040 got: "number".into(),
1041 value: datum.to_string(),
1042 })?,
1043
1044 _ => return Err(type_error()),
1045 };
1046
1047 let datetime =
1048 chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1049 expected: "timestamp".into(),
1050 got: "object".into(),
1051 value: datum.to_string(),
1052 })?;
1053
1054 let res = match type_expected {
1055 DataType::Date => {
1056 let naive = datetime.naive_local();
1057 let dt = naive.date();
1058 Some(ScalarImpl::Date(dt.into()))
1059 }
1060 DataType::Time => {
1061 let naive = datetime.naive_local();
1062 let dt = naive.time();
1063 Some(ScalarImpl::Time(dt.into()))
1064 }
1065 DataType::Timestamp => {
1066 let naive = datetime.naive_local();
1067 let dt = Timestamp::from(naive);
1068 Some(ScalarImpl::Timestamp(dt))
1069 }
1070 DataType::Timestamptz => {
1071 let dt = datetime.into();
1072 Some(ScalarImpl::Timestamptz(dt))
1073 }
1074 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1075 };
1076 Ok(res)
1077}
1078
1079fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1080 let Some(obj) = bson_doc["$timestamp"].as_object() else {
1098 return Err(AccessError::TypeError {
1099 expected: "timestamp".into(),
1100 got: "object".into(),
1101 value: bson_doc.to_string(),
1102 });
1103 };
1104
1105 if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1106 {
1107 return Err(AccessError::TypeError {
1108 expected: "timestamp with valid seconds since epoch".into(),
1109 got: "object".into(),
1110 value: bson_doc.to_string(),
1111 });
1112 }
1113
1114 let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1115 expected: "timestamp with valid seconds since epoch".into(),
1116 got: "object".into(),
1117 value: bson_doc.to_string(),
1118 })?;
1119
1120 let chrono_datetime =
1121 chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1122 expected: type_expected.to_string(),
1123 got: "object".to_owned(),
1124 value: bson_doc.to_string(),
1125 })?;
1126
1127 let res = match type_expected {
1128 DataType::Date => {
1129 let naive = chrono_datetime.naive_local();
1130 let dt = naive.date();
1131 Some(ScalarImpl::Date(dt.into()))
1132 }
1133 DataType::Time => {
1134 let naive = chrono_datetime.naive_local();
1135 let dt = naive.time();
1136 Some(ScalarImpl::Time(dt.into()))
1137 }
1138 DataType::Timestamp => {
1139 let naive = chrono_datetime.naive_local();
1140 let dt = Timestamp::from(naive);
1141 Some(ScalarImpl::Timestamp(dt))
1142 }
1143 DataType::Timestamptz => {
1144 let dt = chrono_datetime.into();
1145 Some(ScalarImpl::Timestamptz(dt))
1146 }
1147 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1148 };
1149
1150 Ok(res)
1151}
1152
1153impl<A> MongoJsonAccess<A> {
1154 pub fn new(accessor: A, strong_schema: bool) -> Self {
1155 Self {
1156 accessor,
1157 strong_schema,
1158 }
1159 }
1160}
1161
1162impl<A> Access for MongoJsonAccess<A>
1163where
1164 A: Access,
1165{
1166 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1167 match path {
1168 ["after" | "before", "_id"] => {
1169 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1170 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1171 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1172 } else {
1173 Err(AccessError::Undefined {
1175 name: "_id".to_owned(),
1176 path: path[0].to_owned(),
1177 })?
1178 }
1179 }
1180
1181 ["after" | "before", "payload"] if !self.strong_schema => {
1182 self.access(&[path[0]], &DataType::Jsonb)
1183 }
1184
1185 ["after" | "before", field] if self.strong_schema => {
1186 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1187 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1188 Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1189 } else {
1190 Err(AccessError::Undefined {
1192 name: field.to_string(),
1193 path: path[0].to_owned(),
1194 })?
1195 }
1196 }
1197
1198 ["_id"] => {
1202 let ret = self.accessor.access(path, type_expected);
1203 if matches!(ret, Err(AccessError::Undefined { .. })) {
1204 let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1205 if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1206 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1207 } else {
1208 Err(AccessError::Undefined {
1210 name: "_id".to_owned(),
1211 path: "id".to_owned(),
1212 })?
1213 }
1214 } else {
1215 ret
1216 }
1217 }
1218 _ => self.accessor.access(path, type_expected),
1219 }
1220 }
1221}