1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
19use risingwave_common::id::SourceId;
20use risingwave_common::types::{
21 DataType, Datum, DatumCow, Int256, ListValue, Scalar, ScalarImpl, ScalarRefImpl, StructValue,
22 Timestamp, Timestamptz, ToDatumRef, ToOwnedDatum,
23};
24use risingwave_connector_codec::decoder::AccessExt;
25use risingwave_pb::plan_common::additional_column::ColumnType;
26use thiserror_ext::AsReport;
27
28use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
29
30mod debezium_sql_types {
32 pub const STRUCT: i32 = 2002;
33 pub const ARRAY: i32 = 2003;
34}
35use crate::connector_common::{create_pg_client_from_properties, discover_pgvector_dimensions};
36use crate::parser::TransactionControl;
37use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
38use crate::parser::schema_change::TableChangeType;
39use crate::source::cdc::build_cdc_table_id;
40use crate::source::cdc::external::mysql::{
41 mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
42};
43use crate::source::cdc::external::postgres::{pg_type_to_rw_type, type_name_to_pg_type};
44use crate::source::{ConnectorProperties, SourceColumnDesc};
45
46fn parse_schema_table_from_debezium_id(id: &str) -> Option<(String, String)> {
61 let trimmed = id.trim();
62 if trimmed.contains("\".\"") {
63 let parts = trimmed
64 .split("\".\"")
65 .map(|s| s.trim_matches('"').trim())
66 .collect_vec();
67 if parts.len() >= 2 {
68 let schema = parts[parts.len() - 2].to_owned();
69 let table = parts[parts.len() - 1].to_owned();
70 if !schema.is_empty() && !table.is_empty() {
71 return Some((schema, table));
72 }
73 }
74 }
75
76 let cleaned = trimmed.trim_matches('"');
77 let parts = cleaned.split('.').collect_vec();
78 if parts.len() >= 2 {
79 let schema = parts[parts.len() - 2].trim().to_owned();
80 let table = parts[parts.len() - 1].trim().to_owned();
81 if !schema.is_empty() && !table.is_empty() {
82 return Some((schema, table));
83 }
84 }
85 None
86}
87
88async fn fetch_pgvector_dimensions_for_table(
89 connector_props: &ConnectorProperties,
90 schema: &str,
91 table: &str,
92) -> AccessResult<std::collections::HashMap<String, usize>> {
93 let ConnectorProperties::PostgresCdc(cdc_props) = connector_props else {
94 return Ok(std::collections::HashMap::new());
95 };
96
97 let client = create_pg_client_from_properties(&cdc_props.properties, None)
98 .await
99 .map_err(|err| AccessError::Uncategorized {
100 message: format!(
101 "failed to connect upstream postgres for schema change lookup: {}",
102 err.as_report()
103 ),
104 })?;
105
106 discover_pgvector_dimensions(&client, schema, table)
107 .await
108 .map_err(|err| AccessError::Uncategorized {
109 message: format!(
110 "failed to query upstream postgres schema for {schema}.{table}: {}",
111 err.as_report()
112 ),
113 })
114}
115
116async fn can_fallback_array_to_varchar(
132 connector_props: &ConnectorProperties,
133 array_type_name: &str,
134) -> AccessResult<bool> {
135 let ConnectorProperties::PostgresCdc(cdc_props) = connector_props else {
136 return Ok(false);
137 };
138
139 let client = create_pg_client_from_properties(&cdc_props.properties, None)
140 .await
141 .map_err(|err| AccessError::Uncategorized {
142 message: format!(
143 "failed to connect upstream postgres for schema change lookup: {}",
144 err.as_report()
145 ),
146 })?;
147
148 let row = client
149 .query_opt(
150 "SELECT t_elem.typtype = 'e' \
151 FROM pg_type t \
152 JOIN pg_type t_elem ON t.typelem = t_elem.oid \
153 WHERE t.typname = $1 \
154 LIMIT 1",
155 &[&array_type_name],
156 )
157 .await
158 .map_err(|err| AccessError::Uncategorized {
159 message: format!(
160 "failed to query upstream postgres for array element type of `{array_type_name}`: {}",
161 err.as_report()
162 ),
163 })?;
164
165 Ok(row.map(|r| r.get::<_, bool>(0)).unwrap_or(false))
166}
167
168pub struct DebeziumChangeEvent<A> {
202 value_accessor: Option<A>,
203 key_accessor: Option<A>,
204 is_mongodb: bool,
205}
206
207const BEFORE: &str = "before";
208const AFTER: &str = "after";
209
210const UPSTREAM_DDL: &str = "ddl";
211const SOURCE: &str = "source";
212const SOURCE_TS_MS: &str = "ts_ms";
213const SOURCE_DB: &str = "db";
214const SOURCE_SCHEMA: &str = "schema";
215const SOURCE_TABLE: &str = "table";
216const SOURCE_COLLECTION: &str = "collection";
217
218const OP: &str = "op";
219pub const TRANSACTION_STATUS: &str = "status";
220pub const TRANSACTION_ID: &str = "id";
221
222pub const TABLE_CHANGES: &str = "tableChanges";
223
224pub const DEBEZIUM_READ_OP: &str = "r";
225pub const DEBEZIUM_CREATE_OP: &str = "c";
226pub const DEBEZIUM_UPDATE_OP: &str = "u";
227pub const DEBEZIUM_DELETE_OP: &str = "d";
228
229pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN";
230pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END";
231
232pub fn parse_transaction_meta(
233 accessor: &impl Access,
234 connector_props: &ConnectorProperties,
235) -> AccessResult<TransactionControl> {
236 if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
237 accessor
238 .access(&[TRANSACTION_STATUS], &DataType::Varchar)?
239 .to_datum_ref(),
240 accessor
241 .access(&[TRANSACTION_ID], &DataType::Varchar)?
242 .to_datum_ref(),
243 ) {
244 match status {
249 DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props {
250 ConnectorProperties::PostgresCdc(_) => {
251 let (tx_id, _) = id.split_once(':').unwrap();
252 return Ok(TransactionControl::Begin { id: tx_id.into() });
253 }
254 ConnectorProperties::MysqlCdc(_) => {
255 return Ok(TransactionControl::Begin { id: id.into() });
256 }
257 ConnectorProperties::SqlServerCdc(_) => {
258 return Ok(TransactionControl::Begin { id: id.into() });
259 }
260 _ => {}
261 },
262 DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props {
263 ConnectorProperties::PostgresCdc(_) => {
264 let (tx_id, _) = id.split_once(':').unwrap();
265 return Ok(TransactionControl::Commit { id: tx_id.into() });
266 }
267 ConnectorProperties::MysqlCdc(_) => {
268 return Ok(TransactionControl::Commit { id: id.into() });
269 }
270 ConnectorProperties::SqlServerCdc(_) => {
271 return Ok(TransactionControl::Commit { id: id.into() });
272 }
273 _ => {}
274 },
275 _ => {}
276 }
277 }
278
279 Err(AccessError::Undefined {
280 name: "transaction status".into(),
281 path: TRANSACTION_STATUS.into(),
282 })
283}
284
285macro_rules! jsonb_access_field {
286 ($col:expr, $field:expr, $as_type:tt) => {
287 $crate::paste! {
288 $col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
289 }
290 };
291}
292
293pub async fn parse_schema_change(
297 accessor: &impl Access,
298 source_id: SourceId,
299 source_name: &str,
300 connector_props: &ConnectorProperties,
301) -> AccessResult<SchemaChangeEnvelope> {
302 let mut schema_changes = vec![];
303 let mut pgvector_dims_cache: std::collections::HashMap<
304 (String, String),
305 std::collections::HashMap<String, usize>,
306 > = std::collections::HashMap::new();
307
308 let upstream_ddl: String = accessor
309 .access(&[UPSTREAM_DDL], &DataType::Varchar)?
310 .to_owned_datum()
311 .unwrap()
312 .as_utf8()
313 .to_string();
314
315 if let Some(ScalarRefImpl::List(table_changes)) = accessor
316 .access(&[TABLE_CHANGES], &DataType::Jsonb.list())?
317 .to_datum_ref()
318 {
319 for datum in table_changes.iter() {
320 let jsonb = match datum {
321 Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb,
322 _ => unreachable!(""),
323 };
324 let id: String = jsonb_access_field!(jsonb, "id", string);
325 let ty = jsonb_access_field!(jsonb, "type", string);
326
327 let table_name = id.trim_matches('"').to_owned();
328 let ddl_type: TableChangeType = ty.as_str().into();
329 if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
330 tracing::debug!("skip table schema change for create/drop command");
331 continue;
332 }
333
334 let mut column_descs: Vec<ColumnDesc> = vec![];
335 if let Some(table) = jsonb.access_object_field("table")
336 && let Some(columns) = table.access_object_field("columns")
337 {
338 for col in columns.array_elements().unwrap() {
339 let name = jsonb_access_field!(col, "name", string);
340 let type_name = jsonb_access_field!(col, "typeName", string);
341 let is_enum = matches!(col.access_object_field("enumValues"), Some(val) if !val.is_jsonb_null());
349 let jdbc_type = col
350 .access_object_field("jdbcType")
351 .and_then(|v| v.as_number().ok())
352 .map(|n| n.0 as i32);
353 let is_composite = jdbc_type == Some(debezium_sql_types::STRUCT);
354
355 let data_type = match *connector_props {
356 ConnectorProperties::PostgresCdc(_) => {
357 if is_composite || is_enum {
358 tracing::debug!(target: "auto_schema_change",
359 "Convert PostgreSQL user-defined type '{}' ({}) to VARCHAR",
360 type_name,
361 if is_composite { "composite" } else { "enum" });
362 DataType::Varchar
363 } else if type_name.eq_ignore_ascii_case("vector") {
364 let Some((schema_name, table_name_only)) =
365 parse_schema_table_from_debezium_id(id.as_str())
366 else {
367 return Err(AccessError::CdcAutoSchemaChangeError {
368 ty: type_name,
369 table_name: format!("{}.{}", source_name, table_name),
370 });
371 };
372
373 let cache_key = (schema_name, table_name_only);
375 if !pgvector_dims_cache.contains_key(&cache_key) {
376 let fetched = fetch_pgvector_dimensions_for_table(
377 connector_props,
378 &cache_key.0,
379 &cache_key.1,
380 )
381 .await?;
382 pgvector_dims_cache.insert(cache_key.clone(), fetched);
383 }
384
385 match pgvector_dims_cache
386 .get(&cache_key)
387 .and_then(|m| m.get(name.as_str()).copied())
388 {
389 Some(dim) if (1..=DataType::VEC_MAX_SIZE).contains(&dim) => {
390 DataType::Vector(dim)
391 }
392 _ => {
393 return Err(AccessError::CdcAutoSchemaChangeError {
396 ty: type_name,
397 table_name: format!("{}.{}", source_name, table_name),
398 });
399 }
400 }
401 } else {
402 let ty = type_name_to_pg_type(type_name.as_str());
409 match ty {
410 Some(ty) => match pg_type_to_rw_type(&ty) {
411 Ok(data_type) => data_type,
412 Err(err) => {
413 tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
414 return Err(AccessError::CdcAutoSchemaChangeError {
415 ty: type_name,
416 table_name: format!(
417 "{}.{}",
418 source_name, table_name
419 ),
420 });
421 }
422 },
423 None if jdbc_type == Some(debezium_sql_types::ARRAY)
429 && can_fallback_array_to_varchar(
430 connector_props,
431 type_name.as_str(),
432 )
433 .await? =>
434 {
435 tracing::debug!(target: "auto_schema_change",
436 "Fall back PostgreSQL array type '{}' to VARCHAR[]",
437 type_name);
438 DataType::Varchar.list()
439 }
440 None => {
441 return Err(AccessError::CdcAutoSchemaChangeError {
442 ty: type_name,
443 table_name: format!("{}.{}", source_name, table_name),
444 });
445 }
446 }
447 }
448 }
449 ConnectorProperties::MysqlCdc(_) => {
450 let ty = type_name_to_mysql_type(type_name.as_str());
451 match ty {
452 Some(ty) => match mysql_type_to_rw_type(&ty) {
453 Ok(data_type) => data_type,
454 Err(err) => {
455 tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
456 return Err(AccessError::CdcAutoSchemaChangeError {
457 ty: type_name,
458 table_name: format!("{}.{}", source_name, table_name),
459 });
460 }
461 },
462 None => {
463 return Err(AccessError::CdcAutoSchemaChangeError {
464 ty: type_name,
465 table_name: format!("{}.{}", source_name, table_name),
466 });
467 }
468 }
469 }
470 _ => {
471 unreachable!()
472 }
473 };
474
475 let column_desc = match col.access_object_field("defaultValueExpression") {
490 Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
491 let default_val_expr_str = default_val_expr_str.as_str().unwrap();
492 let value_text: Option<String>;
493 match *connector_props {
494 ConnectorProperties::PostgresCdc(_) => {
495 match default_val_expr_str
498 .split("::")
499 .map(|s| s.trim_matches('\''))
500 .next()
501 {
502 None => {
503 value_text = None;
504 }
505 Some(val_text) => {
506 value_text = Some(val_text.to_owned());
507 }
508 }
509 }
510 ConnectorProperties::MysqlCdc(_) => {
511 if data_type == DataType::Timestamptz {
514 value_text = Some(timestamp_val_to_timestamptz(default_val_expr_str).map_err(|err| {
515 tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
516 AccessError::TypeError {
517 expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
518 got: data_type.to_string(),
519 value: default_val_expr_str.to_owned(),
520 }
521 })?);
522 } else {
523 value_text = Some(default_val_expr_str.to_owned());
524 }
525 }
526 _ => {
527 unreachable!("connector doesn't support schema change")
528 }
529 }
530
531 let snapshot_value: Datum = value_text.and_then(|value_text| {
532 ScalarImpl::from_text(value_text.as_str(), &data_type)
533 .inspect_err(|err| {
534 tracing::warn!(
535 target: "auto_schema_change",
536 error = %err.as_report(),
537 column = %name,
538 data_type = %data_type,
539 default_value_expression = default_val_expr_str,
540 upstream_ddl = %upstream_ddl,
541 "non-constant default expression, column added without default. \
542 If this column is not newly added by this schema change, it is safe to ignore this warning. \
543 If this column is newly added by this schema change, existing rows will be NULL in this column — consider using COALESCE in queries to provide a fallback value."
544 );
545 })
546 .ok()
547 });
548
549 if snapshot_value.is_none() {
550 ColumnDesc::named(name, ColumnId::placeholder(), data_type)
551 } else {
552 ColumnDesc::named_with_default_value(
553 name,
554 ColumnId::placeholder(),
555 data_type,
556 snapshot_value,
557 )
558 }
559 }
560 _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
561 };
562 column_descs.push(column_desc);
563 }
564 }
565
566 let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
568 schema_changes.push(TableSchemaChange {
569 cdc_table_id,
570 columns: column_descs
571 .into_iter()
572 .map(|column_desc| ColumnCatalog {
573 column_desc,
574 is_hidden: false,
575 })
576 .collect_vec(),
577 change_type: ty.as_str().into(),
578 upstream_ddl: upstream_ddl.clone(),
579 });
580 }
581
582 Ok(SchemaChangeEnvelope {
583 table_changes: schema_changes,
584 })
585 } else {
586 Err(AccessError::Undefined {
587 name: "table schema change".into(),
588 path: TABLE_CHANGES.into(),
589 })
590 }
591}
592
593impl<A> DebeziumChangeEvent<A>
594where
595 A: Access,
596{
597 pub fn new(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
599 assert!(key_accessor.is_some() || value_accessor.is_some());
600 Self {
601 value_accessor,
602 key_accessor,
603 is_mongodb: false,
604 }
605 }
606
607 pub fn new_mongodb_event(key_accessor: Option<A>, value_accessor: Option<A>) -> Self {
608 assert!(key_accessor.is_some() || value_accessor.is_some());
609 Self {
610 value_accessor,
611 key_accessor,
612 is_mongodb: true,
613 }
614 }
615
616 pub(crate) fn transaction_control(
620 &self,
621 connector_props: &ConnectorProperties,
622 ) -> Option<TransactionControl> {
623 self.value_accessor
626 .as_ref()
627 .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok())
628 }
629}
630
631impl<A> ChangeEvent for DebeziumChangeEvent<A>
632where
633 A: Access,
634{
635 fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
636 match self.op()? {
637 ChangeEventOperation::Delete => {
638 if self.is_mongodb && desc.name == "_id" {
641 return self
642 .key_accessor
643 .as_ref()
644 .expect("key_accessor must be provided for delete operation")
645 .access(&[&desc.name], &desc.data_type);
646 }
647
648 if let Some(va) = self.value_accessor.as_ref() {
649 va.access(&[BEFORE, &desc.name], &desc.data_type)
650 } else {
651 self.key_accessor
652 .as_ref()
653 .unwrap()
654 .access(&[&desc.name], &desc.data_type)
655 }
656 }
657
658 ChangeEventOperation::Upsert => {
660 desc.additional_column.column_type.as_ref().map_or_else(
662 || {
663 self.value_accessor
664 .as_ref()
665 .expect("value_accessor must be provided for upsert operation")
666 .access(&[AFTER, &desc.name], &desc.data_type)
667 },
668 |additional_column_type| {
669 match *additional_column_type {
670 ColumnType::Timestamp(_) => {
671 let ts_ms = self
673 .value_accessor
674 .as_ref()
675 .expect("value_accessor must be provided for upsert operation")
676 .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
677 Ok(DatumCow::Owned(ts_ms.map(|scalar| {
678 Timestamptz::from_millis(scalar.into_int64())
679 .expect("source.ts_ms must in millisecond")
680 .to_scalar_value()
681 })))
682 }
683 ColumnType::DatabaseName(_) => self
684 .value_accessor
685 .as_ref()
686 .expect("value_accessor must be provided for upsert operation")
687 .access(&[SOURCE, SOURCE_DB], &desc.data_type),
688 ColumnType::SchemaName(_) => self
689 .value_accessor
690 .as_ref()
691 .expect("value_accessor must be provided for upsert operation")
692 .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
693 ColumnType::TableName(_) => self
694 .value_accessor
695 .as_ref()
696 .expect("value_accessor must be provided for upsert operation")
697 .access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
698 ColumnType::CollectionName(_) => self
699 .value_accessor
700 .as_ref()
701 .expect("value_accessor must be provided for upsert operation")
702 .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
703 _ => Err(AccessError::UnsupportedAdditionalColumn {
704 name: desc.name.clone(),
705 }),
706 }
707 },
708 )
709 }
710 }
711 }
712
713 fn op(&self) -> Result<ChangeEventOperation, AccessError> {
714 if let Some(accessor) = &self.value_accessor {
715 if let Some(ScalarRefImpl::Utf8(op)) =
716 accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
717 {
718 match op {
719 DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
720 return Ok(ChangeEventOperation::Upsert);
721 }
722 DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete),
723 _ => (),
724 }
725 }
726 Err(super::AccessError::Undefined {
727 name: "op".into(),
728 path: Default::default(),
729 })
730 } else {
731 Ok(ChangeEventOperation::Delete)
732 }
733 }
734}
735
736pub struct MongoJsonAccess<A> {
740 accessor: A,
741 strong_schema: bool,
742}
743
744pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult {
745 let id_field = if let Some(value) = bson_doc.get("_id") {
746 value
747 } else {
748 bson_doc
749 };
750
751 let type_error = || AccessError::TypeError {
752 expected: id_type.to_string(),
753 got: match id_field {
754 serde_json::Value::Null => "null",
755 serde_json::Value::Bool(_) => "bool",
756 serde_json::Value::Number(_) => "number",
757 serde_json::Value::String(_) => "string",
758 serde_json::Value::Array(_) => "array",
759 serde_json::Value::Object(_) => "object",
760 }
761 .to_owned(),
762 value: id_field.to_string(),
763 };
764
765 let id: Datum = match id_type {
766 DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
767 DataType::Varchar => match id_field {
768 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
769 serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
770 obj["$oid"].as_str().unwrap_or_default().into(),
771 )),
772 _ => return Err(type_error()),
773 },
774 DataType::Int32 => {
775 if let serde_json::Value::Object(obj) = id_field
776 && obj.contains_key("$numberInt")
777 {
778 let int_str = obj["$numberInt"].as_str().unwrap_or_default();
779 Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
780 } else {
781 return Err(type_error());
782 }
783 }
784 DataType::Int64 => {
785 if let serde_json::Value::Object(obj) = id_field
786 && obj.contains_key("$numberLong")
787 {
788 let int_str = obj["$numberLong"].as_str().unwrap_or_default();
789 Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
790 } else {
791 return Err(type_error());
792 }
793 }
794 _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
795 };
796 Ok(id)
797}
798
799pub fn extract_bson_field(
812 type_expected: &DataType,
813 bson_doc: &serde_json::Value,
814 field: Option<&str>,
815) -> AccessResult {
816 let type_error = |datum: &serde_json::Value| AccessError::TypeError {
817 expected: type_expected.to_string(),
818 got: match bson_doc {
819 serde_json::Value::Null => "null",
820 serde_json::Value::Bool(_) => "bool",
821 serde_json::Value::Number(_) => "number",
822 serde_json::Value::String(_) => "string",
823 serde_json::Value::Array(_) => "array",
824 serde_json::Value::Object(_) => "object",
825 }
826 .to_owned(),
827 value: datum.to_string(),
828 };
829
830 let datum = if let Some(field) = field {
831 let Some(bson_doc) = bson_doc.get(field) else {
832 return Err(type_error(bson_doc));
833 };
834 bson_doc
835 } else {
836 bson_doc
837 };
838
839 if datum.is_null() {
840 return Ok(None);
841 }
842
843 let field_datum: Datum = match type_expected {
844 DataType::Boolean => {
845 if datum.is_boolean() {
846 Some(ScalarImpl::Bool(datum.as_bool().unwrap()))
847 } else {
848 return Err(type_error(datum));
849 }
850 }
851 DataType::Jsonb => ScalarImpl::Jsonb(datum.clone().into()).into(),
852 DataType::Varchar => match datum {
853 serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
854 serde_json::Value::Object(obj) if obj.contains_key("$oid") && field == Some("_id") => {
855 obj["oid"].as_str().map(|s| ScalarImpl::Utf8(s.into()))
856 }
857 _ => return Err(type_error(datum)),
858 },
859 DataType::Int16
860 | DataType::Int32
861 | DataType::Int64
862 | DataType::Int256
863 | DataType::Float32
864 | DataType::Float64 => {
865 if !datum.is_object() {
866 return Err(type_error(datum));
867 };
868
869 bson_extract_number(datum, type_expected)?
870 }
871
872 DataType::Date | DataType::Timestamp | DataType::Timestamptz => {
873 if let serde_json::Value::Object(mp) = datum {
874 if mp.contains_key("$timestamp") && mp["$timestamp"].is_object() {
875 bson_extract_timestamp(datum, type_expected)?
876 } else if mp.contains_key("$date") {
877 bson_extract_date(datum, type_expected)?
878 } else {
879 return Err(type_error(datum));
880 }
881 } else {
882 return Err(type_error(datum));
883 }
884 }
885 DataType::Decimal => {
886 if let serde_json::Value::Object(obj) = datum
887 && obj.contains_key("$numberDecimal")
888 && obj["$numberDecimal"].is_string()
889 {
890 let number = obj["$numberDecimal"].as_str().unwrap();
891
892 let dec = risingwave_common::types::Decimal::from_str(number).map_err(|_| {
893 AccessError::TypeError {
894 expected: type_expected.to_string(),
895 got: "unparsable string".into(),
896 value: number.to_owned(),
897 }
898 })?;
899 Some(ScalarImpl::Decimal(dec))
900 } else {
901 return Err(type_error(datum));
902 }
903 }
904
905 DataType::Bytea => {
906 if let serde_json::Value::Object(obj) = datum
907 && obj.contains_key("$binary")
908 && obj["$binary"].is_object()
909 {
910 use base64::Engine;
911
912 let binary = obj["$binary"].as_object().unwrap();
913
914 if !binary.contains_key("$base64")
915 || !binary["$base64"].is_string()
916 || !binary.contains_key("$subType")
917 || !binary["$subType"].is_string()
918 {
919 return Err(AccessError::TypeError {
920 expected: type_expected.to_string(),
921 got: "object".into(),
922 value: datum.to_string(),
923 });
924 }
925
926 let b64_str = binary["$base64"]
927 .as_str()
928 .ok_or_else(|| AccessError::TypeError {
929 expected: type_expected.to_string(),
930 got: "object".into(),
931 value: datum.to_string(),
932 })?;
933
934 let _type_str =
936 binary["$subType"]
937 .as_str()
938 .ok_or_else(|| AccessError::TypeError {
939 expected: type_expected.to_string(),
940 got: "object".into(),
941 value: datum.to_string(),
942 })?;
943
944 let bytes = base64::prelude::BASE64_STANDARD
945 .decode(b64_str)
946 .map_err(|_| AccessError::TypeError {
947 expected: "$binary object with $base64 string and $subType string field"
948 .to_owned(),
949 got: "string".to_owned(),
950 value: bson_doc.to_string(),
951 })?;
952 let bytea = ScalarImpl::Bytea(bytes.into());
953 Some(bytea)
954 } else {
955 return Err(type_error(datum));
956 }
957 }
958
959 DataType::Struct(struct_fields) => {
960 let mut datums = vec![];
961 for (field_name, field_type) in struct_fields.iter() {
962 let field_datum = extract_bson_field(field_type, datum, Some(field_name))?;
963 datums.push(field_datum);
964 }
965 let value = StructValue::new(datums);
966
967 Some(ScalarImpl::Struct(value))
968 }
969
970 DataType::List(list_type) => {
971 let elem_type = list_type.elem();
972 let Some(d_array) = datum.as_array() else {
973 return Err(type_error(datum));
974 };
975
976 let mut builder = elem_type.create_array_builder(d_array.len());
977 for item in d_array {
978 builder.append(extract_bson_field(elem_type, item, None)?);
979 }
980 Some(ScalarImpl::from(ListValue::new(builder.finish())))
981 }
982
983 _ => {
984 if let Some(field_name) = field {
985 unreachable!(
986 "DebeziumMongoJsonParser::new must ensure {field_name} column datatypes."
987 )
988 } else {
989 let type_expected = type_expected.to_string();
990 unreachable!(
991 "DebeziumMongoJsonParser::new must ensure type of `{type_expected}` matches datum `{datum}`"
992 )
993 }
994 }
995 };
996 Ok(field_datum)
997}
998
999fn bson_extract_number(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1000 let field_name = match type_expected {
1001 DataType::Int16 => "$numberInt",
1002 DataType::Int32 => "$numberInt",
1003 DataType::Int64 => "$numberLong",
1004 DataType::Int256 => "$numberLong",
1005 DataType::Float32 => "$numberDouble",
1006 DataType::Float64 => "$numberDouble",
1007 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1008 };
1009
1010 let datum = bson_doc.get(field_name);
1011 if datum.is_none() {
1012 return Err(AccessError::TypeError {
1013 expected: type_expected.to_string(),
1014 got: "object".into(),
1015 value: bson_doc.to_string(),
1016 });
1017 }
1018
1019 let datum = datum.unwrap();
1020
1021 if datum.is_string() {
1022 let Some(num_str) = datum.as_str() else {
1023 return Err(AccessError::TypeError {
1024 expected: type_expected.to_string(),
1025 got: "string".into(),
1026 value: datum.to_string(),
1027 });
1028 };
1029 if [DataType::Float32, DataType::Float64].contains(type_expected) {
1031 match (num_str, type_expected) {
1032 ("Infinity", DataType::Float64) => {
1033 return Ok(Some(ScalarImpl::Float64(f64::INFINITY.into())));
1034 }
1035 ("Infinity", DataType::Float32) => {
1036 return Ok(Some(ScalarImpl::Float32(f32::INFINITY.into())));
1037 }
1038 ("-Infinity", DataType::Float64) => {
1039 return Ok(Some(ScalarImpl::Float64(f64::NEG_INFINITY.into())));
1040 }
1041 ("-Infinity", DataType::Float32) => {
1042 return Ok(Some(ScalarImpl::Float32(f32::NEG_INFINITY.into())));
1043 }
1044 ("NaN", DataType::Float64) => {
1045 return Ok(Some(ScalarImpl::Float64(f64::NAN.into())));
1046 }
1047 ("NaN", DataType::Float32) => {
1048 return Ok(Some(ScalarImpl::Float32(f32::NAN.into())));
1049 }
1050 _ => {}
1051 }
1052
1053 let parsed_num: f64 = match num_str.parse() {
1054 Ok(n) => n,
1055 Err(_e) => {
1056 return Err(AccessError::TypeError {
1057 expected: type_expected.to_string(),
1058 got: "string".into(),
1059 value: num_str.to_owned(),
1060 });
1061 }
1062 };
1063 if *type_expected == DataType::Float64 {
1064 return Ok(Some(ScalarImpl::Float64(parsed_num.into())));
1065 } else {
1066 let parsed_num = parsed_num as f32;
1067 return Ok(Some(ScalarImpl::Float32(parsed_num.into())));
1068 }
1069 }
1070 if *type_expected == DataType::Int256 {
1072 let parsed_num = match Int256::from_str(num_str) {
1073 Ok(n) => n,
1074 Err(_) => {
1075 return Err(AccessError::TypeError {
1076 expected: type_expected.to_string(),
1077 got: "string".into(),
1078 value: num_str.to_owned(),
1079 });
1080 }
1081 };
1082 return Ok(Some(ScalarImpl::Int256(parsed_num)));
1083 }
1084
1085 let parsed_num: i64 = match num_str.parse() {
1087 Ok(n) => n,
1088 Err(_e) => {
1089 return Err(AccessError::TypeError {
1090 expected: type_expected.to_string(),
1091 got: "string".into(),
1092 value: num_str.to_owned(),
1093 });
1094 }
1095 };
1096 match type_expected {
1097 DataType::Int16 => {
1098 if parsed_num < i16::MIN as i64 || parsed_num > i16::MAX as i64 {
1099 return Err(AccessError::TypeError {
1100 expected: type_expected.to_string(),
1101 got: "string".into(),
1102 value: num_str.to_owned(),
1103 });
1104 }
1105 return Ok(Some(ScalarImpl::Int16(parsed_num as i16)));
1106 }
1107 DataType::Int32 => {
1108 if parsed_num < i32::MIN as i64 || parsed_num > i32::MAX as i64 {
1109 return Err(AccessError::TypeError {
1110 expected: type_expected.to_string(),
1111 got: "string".into(),
1112 value: num_str.to_owned(),
1113 });
1114 }
1115 return Ok(Some(ScalarImpl::Int32(parsed_num as i32)));
1116 }
1117 DataType::Int64 => {
1118 return Ok(Some(ScalarImpl::Int64(parsed_num)));
1119 }
1120 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1121 }
1122 }
1123 if datum.is_null() {
1124 return Err(AccessError::TypeError {
1125 expected: type_expected.to_string(),
1126 got: "null".into(),
1127 value: bson_doc.to_string(),
1128 });
1129 }
1130
1131 if datum.is_array() {
1132 return Err(AccessError::TypeError {
1133 expected: type_expected.to_string(),
1134 got: "array".to_owned(),
1135 value: datum.to_string(),
1136 });
1137 }
1138
1139 if datum.is_object() {
1140 return Err(AccessError::TypeError {
1141 expected: type_expected.to_string(),
1142 got: "object".to_owned(),
1143 value: datum.to_string(),
1144 });
1145 }
1146
1147 if datum.is_boolean() {
1148 return Err(AccessError::TypeError {
1149 expected: type_expected.to_string(),
1150 got: "boolean".into(),
1151 value: bson_doc.to_string(),
1152 });
1153 }
1154
1155 if datum.is_number() {
1156 let got_type = if datum.is_f64() { "f64" } else { "i64" };
1157 return Err(AccessError::TypeError {
1158 expected: type_expected.to_string(),
1159 got: got_type.into(),
1160 value: bson_doc.to_string(),
1161 });
1162 }
1163
1164 Err(AccessError::TypeError {
1165 expected: type_expected.to_string(),
1166 got: "unknown".into(),
1167 value: bson_doc.to_string(),
1168 })
1169}
1170
1171fn bson_extract_date(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1172 let datum = &bson_doc["$date"];
1187
1188 let type_error = || AccessError::TypeError {
1189 expected: type_expected.to_string(),
1190 got: match bson_doc {
1191 serde_json::Value::Null => "null",
1192 serde_json::Value::Bool(_) => "bool",
1193 serde_json::Value::Number(_) => "number",
1194 serde_json::Value::String(_) => "string",
1195 serde_json::Value::Array(_) => "array",
1196 serde_json::Value::Object(_) => "object",
1197 }
1198 .to_owned(),
1199 value: datum.to_string(),
1200 };
1201
1202 let millis = match datum {
1204 serde_json::Value::Object(obj)
1206 if obj.contains_key("$numberLong") && obj["$numberLong"].is_string() =>
1207 {
1208 obj["$numberLong"]
1209 .as_str()
1210 .unwrap()
1211 .parse::<i64>()
1212 .map_err(|_| AccessError::TypeError {
1213 expected: "timestamp".into(),
1214 got: "object".into(),
1215 value: datum.to_string(),
1216 })?
1217 }
1218 serde_json::Value::String(s) => {
1220 let dt =
1221 chrono::DateTime::parse_from_rfc3339(s).map_err(|_| AccessError::TypeError {
1222 expected: "valid ISO-8601 date string".into(),
1223 got: "string".into(),
1224 value: datum.to_string(),
1225 })?;
1226 dt.timestamp_millis()
1227 }
1228
1229 serde_json::Value::Number(num) => num.as_i64().ok_or_else(|| AccessError::TypeError {
1232 expected: "timestamp".into(),
1233 got: "number".into(),
1234 value: datum.to_string(),
1235 })?,
1236
1237 _ => return Err(type_error()),
1238 };
1239
1240 let datetime =
1241 chrono::DateTime::from_timestamp_millis(millis).ok_or_else(|| AccessError::TypeError {
1242 expected: "timestamp".into(),
1243 got: "object".into(),
1244 value: datum.to_string(),
1245 })?;
1246
1247 let res = match type_expected {
1248 DataType::Date => {
1249 let naive = datetime.naive_local();
1250 let dt = naive.date();
1251 Some(ScalarImpl::Date(dt.into()))
1252 }
1253 DataType::Time => {
1254 let naive = datetime.naive_local();
1255 let dt = naive.time();
1256 Some(ScalarImpl::Time(dt.into()))
1257 }
1258 DataType::Timestamp => {
1259 let naive = datetime.naive_local();
1260 let dt = Timestamp::from(naive);
1261 Some(ScalarImpl::Timestamp(dt))
1262 }
1263 DataType::Timestamptz => {
1264 let dt = datetime.into();
1265 Some(ScalarImpl::Timestamptz(dt))
1266 }
1267 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1268 };
1269 Ok(res)
1270}
1271
1272fn bson_extract_timestamp(bson_doc: &serde_json::Value, type_expected: &DataType) -> AccessResult {
1273 let Some(obj) = bson_doc["$timestamp"].as_object() else {
1291 return Err(AccessError::TypeError {
1292 expected: "timestamp".into(),
1293 got: "object".into(),
1294 value: bson_doc.to_string(),
1295 });
1296 };
1297
1298 if !obj.contains_key("t") || !obj["t"].is_u64() || !obj.contains_key("i") || !obj["i"].is_u64()
1299 {
1300 return Err(AccessError::TypeError {
1301 expected: "timestamp with valid seconds since epoch".into(),
1302 got: "object".into(),
1303 value: bson_doc.to_string(),
1304 });
1305 }
1306
1307 let since_epoch = obj["t"].as_i64().ok_or_else(|| AccessError::TypeError {
1308 expected: "timestamp with valid seconds since epoch".into(),
1309 got: "object".into(),
1310 value: bson_doc.to_string(),
1311 })?;
1312
1313 let chrono_datetime =
1314 chrono::DateTime::from_timestamp(since_epoch, 0).ok_or_else(|| AccessError::TypeError {
1315 expected: type_expected.to_string(),
1316 got: "object".to_owned(),
1317 value: bson_doc.to_string(),
1318 })?;
1319
1320 let res = match type_expected {
1321 DataType::Date => {
1322 let naive = chrono_datetime.naive_local();
1323 let dt = naive.date();
1324 Some(ScalarImpl::Date(dt.into()))
1325 }
1326 DataType::Time => {
1327 let naive = chrono_datetime.naive_local();
1328 let dt = naive.time();
1329 Some(ScalarImpl::Time(dt.into()))
1330 }
1331 DataType::Timestamp => {
1332 let naive = chrono_datetime.naive_local();
1333 let dt = Timestamp::from(naive);
1334 Some(ScalarImpl::Timestamp(dt))
1335 }
1336 DataType::Timestamptz => {
1337 let dt = chrono_datetime.into();
1338 Some(ScalarImpl::Timestamptz(dt))
1339 }
1340 _ => unreachable!("DebeziumMongoJsonParser::new must ensure column datatypes."),
1341 };
1342
1343 Ok(res)
1344}
1345
1346impl<A> MongoJsonAccess<A> {
1347 pub fn new(accessor: A, strong_schema: bool) -> Self {
1348 Self {
1349 accessor,
1350 strong_schema,
1351 }
1352 }
1353}
1354
1355impl<A> Access for MongoJsonAccess<A>
1356where
1357 A: Access,
1358{
1359 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
1360 match path {
1361 ["after" | "before", "_id"] => {
1362 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1363 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1364 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1365 } else {
1366 Err(AccessError::Undefined {
1368 name: "_id".to_owned(),
1369 path: path[0].to_owned(),
1370 })?
1371 }
1372 }
1373
1374 ["after" | "before", "payload"] if !self.strong_schema => {
1375 self.access(&[path[0]], &DataType::Jsonb)
1376 }
1377
1378 ["after" | "before", field] if self.strong_schema => {
1379 let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
1380 if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
1381 Ok(extract_bson_field(type_expected, &bson_doc.take(), Some(field))?.into())
1382 } else {
1383 Err(AccessError::Undefined {
1385 name: field.to_string(),
1386 path: path[0].to_owned(),
1387 })?
1388 }
1389 }
1390
1391 ["_id"] => {
1395 let ret = self.accessor.access(path, type_expected);
1396 if matches!(ret, Err(AccessError::Undefined { .. })) {
1397 let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
1398 if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
1399 Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
1400 } else {
1401 Err(AccessError::Undefined {
1403 name: "_id".to_owned(),
1404 path: "id".to_owned(),
1405 })?
1406 }
1407 } else {
1408 ret
1409 }
1410 }
1411 _ => self.accessor.access(path, type_expected),
1412 }
1413 }
1414}