1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::id::SourceId;
20use risingwave_common::types::{
21 DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
22 Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
23};
24use risingwave_connector_codec::decoder::AccessExt;
25use risingwave_pb::plan_common::additional_column::ColumnType;
26use thiserror_ext::AsReport;
27
28use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
29use crate::parser::TransactionControl;
30use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
31use crate::parser::schema_change::TableChangeType;
32use crate::source::cdc::build_cdc_table_id;
33use crate::source::cdc::external::mysql::{
34 mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
35};
36use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
37use crate::source::{ConnectorProperties, SourceColumnDesc};
38
39fn is_bigserial_default(default_value_expression: &str) -> bool {
54 if default_value_expression.trim().is_empty() {
55 return false;
56 }
57
58 let expr = default_value_expression.trim();
59
60 expr.starts_with("nextval(")
62}
63
64pub struct DebeziumChangeEvent<A> {
98 value_accessor: Option<A>,
99 key_accessor: Option<A>,
100 is_mongodb: bool,
101}
102
103const BEFORE: &str = "before";
104const AFTER: &str = "after";
105
106const UPSTREAM_DDL: &str = "ddl";
107const SOURCE: &str = "source";
108const SOURCE_TS_MS: &str = "ts_ms";
109const SOURCE_DB: &str = "db";
110const SOURCE_SCHEMA: &str = "schema";
111const SOURCE_TABLE: &str = "table";
112const SOURCE_COLLECTION: &str = "collection";
113
114const OP: &str = "op";
115pub const TRANSACTION_STATUS: &str = "status";
116pub const TRANSACTION_ID: &str = "id";
117
118pub const TABLE_CHANGES: &str = "tableChanges";
119
120pub const DEBEZIUM_READ_OP: &str = "r";
121pub const DEBEZIUM_CREATE_OP: &str = "c";
122pub const DEBEZIUM_UPDATE_OP: &str = "u";
123pub const DEBEZIUM_DELETE_OP: &str = "d";
124
125pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
126pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
127
128pub fn parse_transaction_meta(
129 accessor: &impl Access,
130 connector_props: &ConnectorProperties,
131) -> AccessResult<TransactionControl> {
132 if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
133 accessor
134 .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
135 .to_datum_ref(),
136 accessor
137 .access(&[TRANSACTION_ID], &DataType::Varchar)?
138 .to_datum_ref(),
139 ) {
140 match status {
145 DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
146 ConnectorProperties::PostgresCdc(_) => {
147 let (tx_id, _) = id.split_once(':').unwrap();
148 return Ok(TransactionControl::Begin { id: tx_id.into() });
149 }
150 ConnectorProperties::MysqlCdc(_) => {
151 return Ok(TransactionControl::Begin { id: id.into() });
152 }
153 ConnectorProperties::SqlServerCdc(_) => {
154 return Ok(TransactionControl::Begin { id: id.into() });
155 }
156 _ => {}
157 },
158 DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
159 ConnectorProperties::PostgresCdc(_) => {
160 let (tx_id, _) = id.split_once(':').unwrap();
161 return Ok(TransactionControl::Commit { id: tx_id.into() });
162 }
163 ConnectorProperties::MysqlCdc(_) => {
164 return Ok(TransactionControl::Commit { id: id.into() });
165 }
166 ConnectorProperties::SqlServerCdc(_) => {
167 return Ok(TransactionControl::Commit { id: id.into() });
168 }
169 _ => {}
170 },
171 _ => {}
172 }
173 }
174
175 Err(AccessError::Undefined {
176 name: "transaction status".into(),
177 path: TRANSACTION_STATUS.into(),
178 })
179}
180
181macro_rules! jsonb_access_field {
182 ($col:expr, $field:expr, $as_type:tt) => {
183 $crate::paste! {
184 $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
185 }
186 };
187}
188
189pub fn parse_schema_change(
193 accessor: &impl Access,
194 source_id: SourceId,
195 source_name: &str,
196 connector_props: &ConnectorProperties,
197) -> AccessResult<SchemaChangeEnvelope> {
198 let mut schema_changes = vec![];
199
200 let upstream_ddl: String = accessor
201 .access(&[UPSTREAM_DDL], &DataType::Varchar)?
202 .to_owned_datum()
203 .unwrap()
204 .as_utf8()
205 .to_string();
206
207 if let Some(ScalarRefImpl::List(table_changes)) = accessor
208 .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
209 .to_datum_ref()
210 {
211 for datum in table_changes.iter() {
212 let jsonb = match datum {
213 Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
214 _ => unreachable!(""),
215 };
216 let id: String = jsonb_access_field!(jsonb, "id", string);
217 let ty = jsonb_access_field!(jsonb, "type", string);
218
219 let table_name = id.trim_matches('"').to_owned();
220 let ddl_type: TableChangeType = ty.as_str().into();
221 if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
222 tracing::debug!("skip table schema change for create/drop command");
223 continue;
224 }
225
226 let mut column_descs: Vec<ColumnDesc> = vec![];
227 if let Some(table) = jsonb.access_object_field("table")
228 && let Some(columns) = table.access_object_field("columns")
229 {
230 for col in columns.array_elements().unwrap() {
231 let name = jsonb_access_field!(col, "name", string);
232 let type_name = jsonb_access_field!(col, "typeName", string);
233 let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
235 let data_type = match *connector_props {
236 ConnectorProperties::PostgresCdc(_) => {
237 let ty = type_name_to_pg_type(type_name.as_str());
238 if is_enum {
239 tracing::debug!(target: "auto_schema_change",
240 "Convert PostgreSQL user defined enum type '{}' to VARCHAR", type_name);
241 DataType::Varchar
242 } else {
243 match ty {
244 Some(ty) => match pg_type_to_rw_type(&ty) {
245 Ok(data_type) => data_type,
246 Err(err) => {
247 tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
248 return Err(AccessError::CdcAutoSchemaChangeError {
249 ty: type_name,
250 table_name: format!(
251 "{}.{}",
252 source_name, table_name
253 ),
254 });
255 }
256 },
257 None => {
258 return Err(AccessError::CdcAutoSchemaChangeError {
259 ty: type_name,
260 table_name: format!("{}.{}", source_name, table_name),
261 });
262 }
263 }
264 }
265 }
266 ConnectorProperties::MysqlCdc(_) => {
267 let ty = type_name_to_mysql_type(type_name.as_str());
268 match ty {
269 Some(ty) => match mysql_type_to_rw_type(&ty) {
270 Ok(data_type) => data_type,
271 Err(err) => {
272 tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
273 return Err(AccessError::CdcAutoSchemaChangeError {
274 ty: type_name,
275 table_name: format!("{}.{}", source_name, table_name),
276 });
277 }
278 },
279 None => {
280 return Err(AccessError::CdcAutoSchemaChangeError {
281 ty: type_name,
282 table_name: format!("{}.{}", source_name, table_name),
283 });
284 }
285 }
286 }
287 _ => {
288 unreachable!()
289 }
290 };
291
292 let column_desc = match col.access_object_field("defaultValueExpression") {
294 Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
295 let default_val_expr_str = default_val_expr_str.as_str().unwrap();
296 if is_bigserial_default(default_val_expr_str) {
298 tracing::warn!(target: "auto_schema_change",
299 "Ignoring unsupported BIGSERIAL default value expression: {}", default_val_expr_str);
300 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
301 } else {
302 let value_text: Option<String>;
303 match *connector_props {
304 ConnectorProperties::PostgresCdc(_) => {
305 match default_val_expr_str
308 .split("::")
309 .map(|s| s.trim_matches('\''))
310 .next()
311 {
312 None => {
313 value_text = None;
314 }
315 Some(val_text) => {
316 value_text = Some(val_text.to_owned());
317 }
318 }
319 }
320 ConnectorProperties::MysqlCdc(_) => {
321 if data_type == DataType::Timestamptz {
324 value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
325 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
326 AccessError::TypeError {
327 expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
328 got: data_type.to_string(),
329 value: default_val_expr_str.to_owned(),
330 }
331 })?);
332 } else {
333 value_text = Some(default_val_expr_str.to_owned());
334 }
335 }
336 _ => {
337 unreachable!("connector doesn't support schema change")
338 }
339 }
340
341 let snapshot_value: Datum = if let Some(value_text) = value_text {
342 Some(ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
343 |err| {
344 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
345 AccessError::TypeError {
346 expected: "constant expression".into(),
347 got: data_type.to_string(),
348 value: value_text,
349 }
350 },
351 )?)
352 } else {
353 None
354 };
355
356 if snapshot_value.is_none() {
357 tracing::warn!(target: "auto_schema_change", "failed to parse default value expression: {}", default_val_expr_str);
358 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
359 } else {
360 ColumnDesc::named_with_default_value(
361 name,
362 ColumnId::placeholder(),
363 data_type,
364 snapshot_value,
365 )
366 }
367 }
368 }
369 _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
370 };
371 column_descs.push(column_desc);
372 }
373 }
374
375 let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
377 schema_changes.push(TableSchemaChange {
378 cdc_table_id,
379 columns: column_descs
380 .into_iter()
381 .map(|column_desc| ColumnCatalog {
382 column_desc,
383 is_hidden: false,
384 })
385 .collect_vec(),
386 change_type: ty.as_str().into(),
387 upstream_ddl: upstream_ddl.clone(),
388 });
389 }
390
391 Ok(SchemaChangeEnvelope {
392 table_changes: schema_changes,
393 })
394 } else {
395 Err(AccessError::Undefined {
396 name: "table schema change".into(),
397 path: TABLE_CHANGES.into(),
398 })
399 }
400}
401
402impl<A> DebeziumChangeEvent<A>
403where
404 A: Access,
405{
406 pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
408 assert!(key_accessor.is_some() || value_accessor.is_some());
409 Self {
410 value_accessor,
411 key_accessor,
412 is_mongodb: false,
413 }
414 }
415
416 pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
417 assert!(key_accessor.is_some() || value_accessor.is_some());
418 Self {
419 value_accessor,
420 key_accessor,
421 is_mongodb: true,
422 }
423 }
424
425 pub(crate) fn transaction_control(
429 &self,
430 connector_props: &ConnectorProperties,
431 ) -> Option<TransactionControl> {
432 self.value_accessor
435 .as_ref()
436 .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
437 }
438}
439
440impl<A> ChangeEvent for DebeziumChangeEvent<A>
441where
442 A: Access,
443{
444 fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
445 match self.op()? {
446 ChangeEventOperation::Delete => {
447 if self.is_mongodb && desc.name == "_id" {
450 return self
451 .key_accessor
452 .as_ref()
453 .expect("key_accessor must be provided for delete operation")
454 .access(&[&desc.name], &desc.data_type);
455 }
456
457 if let Some(va) = self.value_accessor.as_ref() {
458 va.access(&[BEFORE, &desc.name], &desc.data_type)
459 } else {
460 self.key_accessor
461 .as_ref()
462 .unwrap()
463 .access(&[&desc.name], &desc.data_type)
464 }
465 }
466
467 ChangeEventOperation::Upsert => {
469 desc.additional_column.column_type.as_ref().map_or_else(
471 || {
472 self.value_accessor
473 .as_ref()
474 .expect("value_accessor must be provided for upsert operation")
475 .access(&[AFTER, &desc.name], &desc.data_type)
476 },
477 |additional_column_type| {
478 match *additional_column_type {
479 ColumnType::Timestamp(_) => {
480 let ts_ms = self
482 .value_accessor
483 .as_ref()
484 .expect("value_accessor must be provided for upsert operation")
485 .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
486 Ok(DatumCow::Owned(ts_ms.map(|scalar| {
487 Timestamptz::from_millis(scalar.into_int64())
488 .expect("source.ts_ms must in millisecond")
489 .to_scalar_value()
490 })))
491 }
492 ColumnType::DatabaseName(_) => self
493 .value_accessor
494 .as_ref()
495 .expect("value_accessor must be provided for upsert operation")
496 .access(&[SOURCE, SOURCE_DB], &desc.data_type),
497 ColumnType::SchemaName(_) => self
498 .value_accessor
499 .as_ref()
500 .expect("value_accessor must be provided for upsert operation")
501 .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
502 ColumnType::TableName(_) => self
503 .value_accessor
504 .as_ref()
505 .expect("value_accessor must be provided for upsert operation")
506 .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
507 ColumnType::CollectionName(_) => self
508 .value_accessor
509 .as_ref()
510 .expect("value_accessor must be provided for upsert operation")
511 .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
512 _ => Err(AccessError::UnsupportedAdditionalColumn {
513 name: desc.name.clone(),
514 }),
515 }
516 },
517 )
518 }
519 }
520 }
521
522 fn op(&self) -> Result<ChangeEventOperation, AccessError> {
523 if let Some(accessor) = &self.value_accessor {
524 if let Some(ScalarRefImpl::Utf8(op)) =
525 accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
526 {
527 match op {
528 DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
529 return Ok(ChangeEventOperation::Upsert);
530 }
531 DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
532 _ => (),
533 }
534 }
535 Err(super::AccessError::Undefined {
536 name: "op".into(),
537 path: Default::default(),
538 })
539 } else {
540 Ok(ChangeEventOperation::Delete)
541 }
542 }
543}
544
545pub struct MongoJsonAccess<A> {
549 accessor: A,
550 strong_schema: bool,
551}
552
553pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
554 let id_field = if let Some(value) = bson_doc.get("_id") {
555 value
556 } else {
557 bson_doc
558 };
559
560 let type_error = || AccessError::TypeError {
561 expected: id_type.to_string(),
562 got: match id_field {
563 serde_json::Value::Null => "null",
564 serde_json::Value::Bool(_) => "bool",
565 serde_json::Value::Number(_) => "number",
566 serde_json::Value::String(_) => "string",
567 serde_json::Value::Array(_) => "array",
568 serde_json::Value::Object(_) => "object",
569 }
570 .to_owned(),
571 value: id_field.to_string(),
572 };
573
574 let id: Datum = match id_type {
575 DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
576 DataType::Varchar => match id_field {
577 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
578 serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
579 obj["$oid"].as_str().unwrap_or_default().into(),
580 )),
581 _ => return Err(type_error()),
582 },
583 DataType::Int32 => {
584 if let serde_json::Value::Object(obj) = id_field
585 && obj.contains_key("$numberInt")
586 {
587 let int_str = obj["$numberInt"].as_str().unwrap_or_default();
588 Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
589 } else {
590 return Err(type_error());
591 }
592 }
593 DataType::Int64 => {
594 if let serde_json::Value::Object(obj) = id_field
595 && obj.contains_key("$numberLong")
596 {
597 let int_str = obj["$numberLong"].as_str().unwrap_or_default();
598 Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
599 } else {
600 return Err(type_error());
601 }
602 }
603 _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
604 };
605 Ok(id)
606}
607
608pub fn extract_bson_field(
621 type_expected: &DataType,
622 bson_doc: &serde_json::Value,
623 field: Option<&str>,
624) -> AccessResult {
625 let type_error = |datum: &serde_json::Value| AccessError::TypeError {
626 expected: type_expected.to_string(),
627 got: match bson_doc {
628 serde_json::Value::Null => "null",
629 serde_json::Value::Bool(_) => "bool",
630 serde_json::Value::Number(_) => "number",
631 serde_json::Value::String(_) => "string",
632 serde_json::Value::Array(_) => "array",
633 serde_json::Value::Object(_) => "object",
634 }
635 .to_owned(),
636 value: datum.to_string(),
637 };
638
639 let datum = if let Some(field) = field {
640 let Some(bson_doc) = bson_doc.get(field) else {
641 return Err(type_error(bson_doc));
642 };
643 bson_doc
644 } else {
645 bson_doc
646 };
647
648 if datum.is_null() {
649 return Ok(None);
650 }
651
652 let field_datum: Datum = match type_expected {
653 DataType::Boolean => {
654 if datum.is_boolean() {
655 Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
656 } else {
657 return Err(type_error(datum));
658 }
659 }
660 DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
661 DataType::Varchar => match datum {
662 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
663 serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
664 obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
665 }
666 _ => return Err(type_error(datum)),
667 },
668 DataType::Int16
669 | DataType::Int32
670 | DataType::Int64
671 | DataType::Int256
672 | DataType::Float32
673 | DataType::Float64 => {
674 if !datum.is_object() {
675 return Err(type_error(datum));
676 };
677
678 bson_extract_number(datum, type_expected)?
679 }
680
681 DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
682 if let serde_json::Value::Object(mp) = datum {
683 if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
684 bson_extract_timestamp(datum, type_expected)?
685 } else if mp.contains_key("$date") {
686 bson_extract_date(datum, type_expected)?
687 } else {
688 return Err(type_error(datum));
689 }
690 } else {
691 return Err(type_error(datum));
692 }
693 }
694 DataType::Decimal => {
695 if let serde_json::Value::Object(obj) = datum
696 && obj.contains_key("$numberDecimal")
697 && obj["$numberDecimal"].is_string()
698 {
699 let number = obj["$numberDecimal"].as_str().unwrap();
700
701 let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
702 AccessError::TypeError {
703 expected: type_expected.to_string(),
704 got: "unparsable string".into(),
705 value: number.to_owned(),
706 }
707 })?;
708 Some(ScalarImpl::Decimal(dec))
709 } else {
710 return Err(type_error(datum));
711 }
712 }
713
714 DataType::Bytea => {
715 if let serde_json::Value::Object(obj) = datum
716 && obj.contains_key("$binary")
717 && obj["$binary"].is_object()
718 {
719 use base64::Engine;
720
721 let binary = obj["$binary"].as_object().unwrap();
722
723 if !binary.contains_key("$base64")
724 || !binary["$base64"].is_string()
725 || !binary.contains_key("$subType")
726 || !binary["$subType"].is_string()
727 {
728 return Err(AccessError::TypeError {
729 expected: type_expected.to_string(),
730 got: "object".into(),
731 value: datum.to_string(),
732 });
733 }
734
735 let b64_str = binary["$base64"]
736 .as_str()
737 .ok_or_else(|| AccessError::TypeError {
738 expected: type_expected.to_string(),
739 got: "object".into(),
740 value: datum.to_string(),
741 })?;
742
743 let _type_str =
745 binary["$subType"]
746 .as_str()
747 .ok_or_else(|| AccessError::TypeError {
748 expected: type_expected.to_string(),
749 got: "object".into(),
750 value: datum.to_string(),
751 })?;
752
753 let bytes = base64::prelude::BASE64_STANDARD
754 .decode(b64_str)
755 .map_err(|_| AccessError::TypeError {
756 expected: "$binary object with $base64 string and $subType string field"
757 .to_owned(),
758 got: "string".to_owned(),
759 value: bson_doc.to_string(),
760 })?;
761 let bytea = ScalarImpl::Bytea(bytes.into());
762 Some(bytea)
763 } else {
764 return Err(type_error(datum));
765 }
766 }
767
768 DataType::Struct(struct_fields) => {
769 let mut datums = vec![];
770 for (field_name, field_type) in struct_fields.iter() {
771 let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
772 datums.push(field_datum);
773 }
774 let value = StructValue::new(datums);
775
776 Some(ScalarImpl::Struct(value))
777 }
778
779 DataType::List(list_type) => {
780 let elem_type = list_type.elem();
781 let Some(d_array) = datum.as_array() else {
782 return Err(type_error(datum));
783 };
784
785 let mut builder = elem_type.create_array_builder(d_array.len());
786 for item in d_array {
787 builder.append(extract_bson_field(elem_type, item, None)?);
788 }
789 Some(ScalarImpl::from(ListValue::new(builder.finish())))
790 }
791
792 _ => {
793 if let Some(field_name) = field {
794 unreachable!(
795 "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
796 )
797 } else {
798 let type_expected = type_expected.to_string();
799 unreachable!(
800 "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
801 )
802 }
803 }
804 };
805 Ok(field_datum)
806}
807
808fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
809 let field_name = match type_expected {
810 DataType::Int16 => "$numberInt",
811 DataType::Int32 => "$numberInt",
812 DataType::Int64 => "$numberLong",
813 DataType::Int256 => "$numberLong",
814 DataType::Float32 => "$numberDouble",
815 DataType::Float64 => "$numberDouble",
816 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
817 };
818
819 let datum = bson_doc.get(field_name);
820 if datum.is_none() {
821 return Err(AccessError::TypeError {
822 expected: type_expected.to_string(),
823 got: "object".into(),
824 value: bson_doc.to_string(),
825 });
826 }
827
828 let datum = datum.unwrap();
829
830 if datum.is_string() {
831 let Some(num_str) = datum.as_str() else {
832 return Err(AccessError::TypeError {
833 expected: type_expected.to_string(),
834 got: "string".into(),
835 value: datum.to_string(),
836 });
837 };
838 if [DataType::Float32, DataType::Float64].contains(type_expected) {
840 match (num_str, type_expected) {
841 ("Infinity", DataType::Float64) => {
842 return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
843 }
844 ("Infinity", DataType::Float32) => {
845 return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
846 }
847 ("-Infinity", DataType::Float64) => {
848 return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
849 }
850 ("-Infinity", DataType::Float32) => {
851 return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
852 }
853 ("NaN", DataType::Float64) => {
854 return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
855 }
856 ("NaN", DataType::Float32) => {
857 return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
858 }
859 _ => {}
860 }
861
862 let parsed_num: f64 = match num_str.parse() {
863 Ok(n) => n,
864 Err(_e) => {
865 return Err(AccessError::TypeError {
866 expected: type_expected.to_string(),
867 got: "string".into(),
868 value: num_str.to_owned(),
869 });
870 }
871 };
872 if *type_expected == DataType::Float64 {
873 return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
874 } else {
875 let parsed_num = parsed_num as f32;
876 return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
877 }
878 }
879 if *type_expected == DataType::Int256 {
881 let parsed_num = match Int256::from_str(num_str) {
882 Ok(n) => n,
883 Err(_) => {
884 return Err(AccessError::TypeError {
885 expected: type_expected.to_string(),
886 got: "string".into(),
887 value: num_str.to_owned(),
888 });
889 }
890 };
891 return Ok(Some(ScalarImpl::Int256(parsed_num)));
892 }
893
894 let parsed_num: i64 = match num_str.parse() {
896 Ok(n) => n,
897 Err(_e) => {
898 return Err(AccessError::TypeError {
899 expected: type_expected.to_string(),
900 got: "string".into(),
901 value: num_str.to_owned(),
902 });
903 }
904 };
905 match type_expected {
906 DataType::Int16 => {
907 if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
908 return Err(AccessError::TypeError {
909 expected: type_expected.to_string(),
910 got: "string".into(),
911 value: num_str.to_owned(),
912 });
913 }
914 return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
915 }
916 DataType::Int32 => {
917 if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
918 return Err(AccessError::TypeError {
919 expected: type_expected.to_string(),
920 got: "string".into(),
921 value: num_str.to_owned(),
922 });
923 }
924 return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
925 }
926 DataType::Int64 => {
927 return Ok(Some(ScalarImpl::Int64(parsed_num)));
928 }
929 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
930 }
931 }
932 if datum.is_null() {
933 return Err(AccessError::TypeError {
934 expected: type_expected.to_string(),
935 got: "null".into(),
936 value: bson_doc.to_string(),
937 });
938 }
939
940 if datum.is_array() {
941 return Err(AccessError::TypeError {
942 expected: type_expected.to_string(),
943 got: "array".to_owned(),
944 value: datum.to_string(),
945 });
946 }
947
948 if datum.is_object() {
949 return Err(AccessError::TypeError {
950 expected: type_expected.to_string(),
951 got: "object".to_owned(),
952 value: datum.to_string(),
953 });
954 }
955
956 if datum.is_boolean() {
957 return Err(AccessError::TypeError {
958 expected: type_expected.to_string(),
959 got: "boolean".into(),
960 value: bson_doc.to_string(),
961 });
962 }
963
964 if datum.is_number() {
965 let got_type = if datum.is_f64() { "f64" } else { "i64" };
966 return Err(AccessError::TypeError {
967 expected: type_expected.to_string(),
968 got: got_type.into(),
969 value: bson_doc.to_string(),
970 });
971 }
972
973 Err(AccessError::TypeError {
974 expected: type_expected.to_string(),
975 got: "unknown".into(),
976 value: bson_doc.to_string(),
977 })
978}
979
980fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
981 let datum = &bson_doc["$date"];
996
997 let type_error = || AccessError::TypeError {
998 expected: type_expected.to_string(),
999 got: match bson_doc {
1000 serde_json::Value::Null => "null",
1001 serde_json::Value::Bool(_) => "bool",
1002 serde_json::Value::Number(_) => "number",
1003 serde_json::Value::String(_) => "string",
1004 serde_json::Value::Array(_) => "array",
1005 serde_json::Value::Object(_) => "object",
1006 }
1007 .to_owned(),
1008 value: datum.to_string(),
1009 };
1010
1011 let millis = match datum {
1013 serde_json::Value::Object(obj)
1015 if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1016 {
1017 obj["$numberLong"]
1018 .as_str()
1019 .unwrap()
1020 .parse::<i64>()
1021 .map_err(|_| AccessError::TypeError {
1022 expected: "timestamp".into(),
1023 got: "object".into(),
1024 value: datum.to_string(),
1025 })?
1026 }
1027 serde_json::Value::String(s) => {
1029 let dt =
1030 chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1031 expected: "valid ISO-8601 date string".into(),
1032 got: "string".into(),
1033 value: datum.to_string(),
1034 })?;
1035 dt.timestamp_millis()
1036 }
1037
1038 serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1041 expected: "timestamp".into(),
1042 got: "number".into(),
1043 value: datum.to_string(),
1044 })?,
1045
1046 _ => return Err(type_error()),
1047 };
1048
1049 let datetime =
1050 chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1051 expected: "timestamp".into(),
1052 got: "object".into(),
1053 value: datum.to_string(),
1054 })?;
1055
1056 let res = match type_expected {
1057 DataType::Date => {
1058 let naive = datetime.naive_local();
1059 let dt = naive.date();
1060 Some(ScalarImpl::Date(dt.into()))
1061 }
1062 DataType::Time => {
1063 let naive = datetime.naive_local();
1064 let dt = naive.time();
1065 Some(ScalarImpl::Time(dt.into()))
1066 }
1067 DataType::Timestamp => {
1068 let naive = datetime.naive_local();
1069 let dt = Timestamp::from(naive);
1070 Some(ScalarImpl::Timestamp(dt))
1071 }
1072 DataType::Timestamptz => {
1073 let dt = datetime.into();
1074 Some(ScalarImpl::Timestamptz(dt))
1075 }
1076 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1077 };
1078 Ok(res)
1079}
1080
1081fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1082 let Some(obj) = bson_doc["$timestamp"].as_object() else {
1100 return Err(AccessError::TypeError {
1101 expected: "timestamp".into(),
1102 got: "object".into(),
1103 value: bson_doc.to_string(),
1104 });
1105 };
1106
1107 if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1108 {
1109 return Err(AccessError::TypeError {
1110 expected: "timestamp with valid seconds since epoch".into(),
1111 got: "object".into(),
1112 value: bson_doc.to_string(),
1113 });
1114 }
1115
1116 let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1117 expected: "timestamp with valid seconds since epoch".into(),
1118 got: "object".into(),
1119 value: bson_doc.to_string(),
1120 })?;
1121
1122 let chrono_datetime =
1123 chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1124 expected: type_expected.to_string(),
1125 got: "object".to_owned(),
1126 value: bson_doc.to_string(),
1127 })?;
1128
1129 let res = match type_expected {
1130 DataType::Date => {
1131 let naive = chrono_datetime.naive_local();
1132 let dt = naive.date();
1133 Some(ScalarImpl::Date(dt.into()))
1134 }
1135 DataType::Time => {
1136 let naive = chrono_datetime.naive_local();
1137 let dt = naive.time();
1138 Some(ScalarImpl::Time(dt.into()))
1139 }
1140 DataType::Timestamp => {
1141 let naive = chrono_datetime.naive_local();
1142 let dt = Timestamp::from(naive);
1143 Some(ScalarImpl::Timestamp(dt))
1144 }
1145 DataType::Timestamptz => {
1146 let dt = chrono_datetime.into();
1147 Some(ScalarImpl::Timestamptz(dt))
1148 }
1149 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1150 };
1151
1152 Ok(res)
1153}
1154
1155impl<A> MongoJsonAccess<A> {
1156 pub fn new(accessor: A, strong_schema: bool) -> Self {
1157 Self {
1158 accessor,
1159 strong_schema,
1160 }
1161 }
1162}
1163
1164impl<A> Access for MongoJsonAccess<A>
1165where
1166 A: Access,
1167{
1168 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1169 match path {
1170 ["after" | "before", "_id"] => {
1171 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1172 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1173 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1174 } else {
1175 Err(AccessError::Undefined {
1177 name: "_id".to_owned(),
1178 path: path[0].to_owned(),
1179 })?
1180 }
1181 }
1182
1183 ["after" | "before", "payload"] if !self.strong_schema => {
1184 self.access(&[path[0]], &DataType::Jsonb)
1185 }
1186
1187 ["after" | "before", field] if self.strong_schema => {
1188 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1189 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1190 Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1191 } else {
1192 Err(AccessError::Undefined {
1194 name: field.to_string(),
1195 path: path[0].to_owned(),
1196 })?
1197 }
1198 }
1199
1200 ["_id"] => {
1204 let ret = self.accessor.access(path, type_expected);
1205 if matches!(ret, Err(AccessError::Undefined { .. })) {
1206 let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1207 if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1208 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1209 } else {
1210 Err(AccessError::Undefined {
1212 name: "_id".to_owned(),
1213 path: "id".to_owned(),
1214 })?
1215 }
1216 } else {
1217 ret
1218 }
1219 }
1220 _ => self.accessor.access(path, type_expected),
1221 }
1222 }
1223}