1use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppressor;
24use risingwave_common::types::{
25 DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26 ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31 TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40fn try_parse_debezium_geometry_as_bytea(
58 value: &BorrowedValue<'_>,
59 create_error: impl Fn() -> AccessError,
60) -> AccessResult<Option<Box<[u8]>>> {
61 let obj = match value.as_object() {
62 Some(obj) => obj,
63 None => return Ok(None),
64 };
65
66 let srid = obj.get("srid").and_then(|v| v.as_i64());
69 let wkb = obj.get("wkb").and_then(|v| v.as_str());
70
71 let (Some(_srid), Some(wkb)) = (srid, wkb) else {
72 return Ok(None);
73 };
74
75 let bytes = base64::engine::general_purpose::STANDARD
76 .decode(wkb)
77 .map_err(|_| create_error())?
78 .into_boxed_slice();
79
80 Ok(Some(bytes))
81}
82
83#[derive(Clone, Debug)]
84pub enum ByteaHandling {
85 Standard,
86 Base64,
88}
89#[derive(Clone, Debug)]
90pub enum TimeHandling {
91 Milli,
92 Micro,
93}
94
95#[derive(Clone, Copy, Debug)]
96pub enum BigintUnsignedHandlingMode {
97 Long,
99 Precise,
101}
102
103#[derive(Clone, Debug)]
104pub enum TimestamptzHandling {
105 UtcString,
107 UtcWithoutSuffix,
109 Milli,
111 Micro,
113 GuessNumberUnit,
119}
120
121impl TimestamptzHandling {
122 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
123
124 pub fn from_options(
125 options: &std::collections::BTreeMap<String, String>,
126 ) -> Result<Option<Self>, InvalidOptionError> {
127 let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
128 Some("utc_string") => Self::UtcString,
129 Some("utc_without_suffix") => Self::UtcWithoutSuffix,
130 Some("micro") => Self::Micro,
131 Some("milli") => Self::Milli,
132 Some("guess_number_unit") => Self::GuessNumberUnit,
133 Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
134 None => return Ok(None),
135 };
136 Ok(Some(mode))
137 }
138}
139
140#[derive(Clone, Debug)]
141pub enum TimestampHandling {
142 Milli,
143 GuessNumberUnit,
144}
145
146#[derive(Clone, Debug)]
147pub enum JsonValueHandling {
148 AsValue,
149 AsString,
150}
151#[derive(Clone, Debug)]
152pub enum NumericHandling {
153 Strict,
154 Relax {
156 string_parsing: bool,
158 },
159}
160#[derive(Clone, Debug)]
161pub enum BooleanHandling {
162 Strict,
163 Relax {
165 string_parsing: bool,
167 string_integer_parsing: bool,
169 },
170}
171
172#[derive(Clone, Debug)]
173pub enum VarcharHandling {
174 Strict,
176 OnlyPrimaryTypes,
178 AllTypes,
180}
181
182#[derive(Clone, Debug)]
183pub enum StructHandling {
184 Strict,
186 AllowJsonString,
189}
190
191#[derive(Clone, Debug)]
192pub struct JsonParseOptions {
193 pub bytea_handling: ByteaHandling,
194 pub time_handling: TimeHandling,
195 pub timestamp_handling: TimestampHandling,
196 pub timestamptz_handling: TimestamptzHandling,
197 pub json_value_handling: JsonValueHandling,
198 pub numeric_handling: NumericHandling,
199 pub boolean_handling: BooleanHandling,
200 pub varchar_handling: VarcharHandling,
201 pub struct_handling: StructHandling,
202 pub bigint_unsigned_handling: BigintUnsignedHandlingMode,
203 pub ignoring_keycase: bool,
204 pub handle_toast_columns: bool,
205}
206
207impl Default for JsonParseOptions {
208 fn default() -> Self {
209 Self::DEFAULT.clone()
210 }
211}
212
213impl JsonParseOptions {
214 pub const CANAL: JsonParseOptions = JsonParseOptions {
215 bytea_handling: ByteaHandling::Standard,
216 time_handling: TimeHandling::Micro,
217 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
220 numeric_handling: NumericHandling::Relax {
221 string_parsing: true,
222 },
223 boolean_handling: BooleanHandling::Relax {
224 string_parsing: true,
225 string_integer_parsing: true,
226 },
227 varchar_handling: VarcharHandling::Strict,
228 struct_handling: StructHandling::Strict,
229 bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, ignoring_keycase: true,
231 handle_toast_columns: false,
232 };
233 pub const DEFAULT: JsonParseOptions = JsonParseOptions {
234 bytea_handling: ByteaHandling::Standard,
235 time_handling: TimeHandling::Micro,
236 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
239 numeric_handling: NumericHandling::Relax {
240 string_parsing: true,
241 },
242 boolean_handling: BooleanHandling::Strict,
243 varchar_handling: VarcharHandling::OnlyPrimaryTypes,
244 struct_handling: StructHandling::AllowJsonString,
245 bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, ignoring_keycase: true,
247 handle_toast_columns: false,
248 };
249
250 pub fn new_for_debezium(
251 timestamptz_handling: TimestamptzHandling,
252 timestamp_handling: TimestampHandling,
253 time_handling: TimeHandling,
254 bigint_unsigned_handling: BigintUnsignedHandlingMode,
255 handle_toast_columns: bool,
256 ) -> Self {
257 Self {
258 bytea_handling: ByteaHandling::Base64,
259 time_handling,
260 timestamp_handling,
261 timestamptz_handling,
262 json_value_handling: JsonValueHandling::AsString,
263 numeric_handling: NumericHandling::Relax {
264 string_parsing: false,
265 },
266 boolean_handling: BooleanHandling::Relax {
267 string_parsing: false,
268 string_integer_parsing: false,
269 },
270 varchar_handling: VarcharHandling::Strict,
271 struct_handling: StructHandling::Strict,
272 bigint_unsigned_handling,
273 ignoring_keycase: true,
274 handle_toast_columns,
275 }
276 }
277
278 pub fn parse<'a>(
279 &self,
280 value: &'a BorrowedValue<'a>,
281 type_expected: &DataType,
282 ) -> AccessResult<DatumCow<'a>> {
283 let create_error = || AccessError::TypeError {
284 expected: format!("{:?}", type_expected),
285 got: value.value_type().to_string(),
286 value: value.to_string(),
287 };
288 let v: ScalarImpl = match (type_expected, value.value_type()) {
289 (_, ValueType::Null) => return Ok(DatumCow::NULL),
290 (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
292
293 (
294 DataType::Boolean,
295 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
296 ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
297 && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
298 {
299 (value.as_i64() == Some(1i64)).into()
300 }
301
302 (DataType::Boolean, ValueType::String)
303 if matches!(
304 self.boolean_handling,
305 BooleanHandling::Relax {
306 string_parsing: true,
307 ..
308 }
309 ) =>
310 {
311 match value.as_str().unwrap().to_lowercase().as_str() {
312 "true" => true.into(),
313 "false" => false.into(),
314 c @ ("1" | "0")
315 if matches!(
316 self.boolean_handling,
317 BooleanHandling::Relax {
318 string_parsing: true,
319 string_integer_parsing: true
320 }
321 ) =>
322 {
323 if c == "1" {
324 true.into()
325 } else {
326 false.into()
327 }
328 }
329 _ => Err(create_error())?,
330 }
331 }
332 (
334 DataType::Int16,
335 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
336 ) => value.try_as_i16().map_err(|_| create_error())?.into(),
337
338 (DataType::Int16, ValueType::String)
339 if matches!(
340 self.numeric_handling,
341 NumericHandling::Relax {
342 string_parsing: true
343 }
344 ) =>
345 {
346 value
347 .as_str()
348 .unwrap()
349 .parse::<i16>()
350 .map_err(|_| create_error())?
351 .into()
352 }
353 (
355 DataType::Int32,
356 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
357 ) => value.try_as_i32().map_err(|_| create_error())?.into(),
358
359 (DataType::Int32, ValueType::String)
360 if matches!(
361 self.numeric_handling,
362 NumericHandling::Relax {
363 string_parsing: true
364 }
365 ) =>
366 {
367 value
368 .as_str()
369 .unwrap()
370 .parse::<i32>()
371 .map_err(|_| create_error())?
372 .into()
373 }
374 (
376 DataType::Int64,
377 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
378 ) => value.try_as_i64().map_err(|_| create_error())?.into(),
379
380 (DataType::Int64, ValueType::String)
381 if matches!(
382 self.numeric_handling,
383 NumericHandling::Relax {
384 string_parsing: true
385 }
386 ) =>
387 {
388 value
389 .as_str()
390 .unwrap()
391 .parse::<i64>()
392 .map_err(|_| create_error())?
393 .into()
394 }
395 (
397 DataType::Float32,
398 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
399 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
400 (value.try_as_i64().map_err(|_| create_error())? as f32).into()
401 }
402 (DataType::Float32, ValueType::String)
403 if matches!(
404 self.numeric_handling,
405 NumericHandling::Relax {
406 string_parsing: true
407 }
408 ) =>
409 {
410 value
411 .as_str()
412 .unwrap()
413 .parse::<f32>()
414 .map_err(|_| create_error())?
415 .into()
416 }
417 (DataType::Float32, ValueType::F64) => {
418 value.try_as_f32().map_err(|_| create_error())?.into()
419 }
420 (
422 DataType::Float64,
423 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
424 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
425 (value.try_as_i64().map_err(|_| create_error())? as f64).into()
426 }
427 (DataType::Float64, ValueType::String)
428 if matches!(
429 self.numeric_handling,
430 NumericHandling::Relax {
431 string_parsing: true
432 }
433 ) =>
434 {
435 value
436 .as_str()
437 .unwrap()
438 .parse::<f64>()
439 .map_err(|_| create_error())?
440 .into()
441 }
442 (DataType::Float64, ValueType::F64) => {
443 value.try_as_f64().map_err(|_| create_error())?.into()
444 }
445 (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
447 Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
448 .map_err(|_| create_error())?
449 .into()
450 }
451 (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
452 let i64_val = value.try_as_i64().map_err(|_| create_error())?;
453 Decimal::from(i64_val).into()
454 }
455 (DataType::Decimal, ValueType::String) => {
456 let str_val = value.as_str().unwrap();
457 match str_val {
459 "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))),
460 "POSITIVE_INFINITY" => {
461 return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
462 Decimal::PositiveInf,
463 ))));
464 }
465 "NEGATIVE_INFINITY" => {
466 return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
467 Decimal::NegativeInf,
468 ))));
469 }
470 _ => {}
471 }
472
473 Decimal::from_str(str_val)
474 .or_else(|_err| {
475 try_base64_decode_decimal(
476 str_val,
477 self.bigint_unsigned_handling,
478 create_error,
479 )
480 })?
481 .into()
482 }
483
484 (DataType::Decimal, ValueType::F64) => {
485 Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
486 .map_err(|_| create_error())?
487 .into()
488 }
489 (DataType::Decimal, ValueType::Object) => {
490 let scale = value
493 .get("scale")
494 .ok_or_else(create_error)?
495 .as_i32()
496 .unwrap();
497 let value = value
498 .get("value")
499 .ok_or_else(create_error)?
500 .as_str()
501 .unwrap()
502 .as_bytes();
503 let unscaled = BigInt::from_signed_bytes_be(value);
504 let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
505 ScalarImpl::Decimal(Decimal::Normalized(decimal))
506 }
507 (
509 DataType::Date,
510 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
511 ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
512 .map_err(|_| create_error())?
513 .into(),
514 (DataType::Date, ValueType::String) => value
515 .as_str()
516 .unwrap()
517 .parse::<Date>()
518 .map_err(|_| create_error())?
519 .into(),
520 (DataType::Varchar, ValueType::String) => {
522 return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
523 }
524 (
525 DataType::Varchar,
526 ValueType::Bool
527 | ValueType::I64
528 | ValueType::I128
529 | ValueType::U64
530 | ValueType::U128
531 | ValueType::F64,
532 ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
533 value.to_string().into()
534 }
535 (
536 DataType::Varchar,
537 ValueType::Bool
538 | ValueType::I64
539 | ValueType::I128
540 | ValueType::U64
541 | ValueType::U128
542 | ValueType::F64
543 | ValueType::Array
544 | ValueType::Object,
545 ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
546 value.to_string().into()
547 }
548 (DataType::Time, ValueType::String) => value
550 .as_str()
551 .unwrap()
552 .parse::<Time>()
553 .map_err(|_| create_error())?
554 .into(),
555 (
556 DataType::Time,
557 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
558 ) => value
559 .as_i64()
560 .map(|i| match self.time_handling {
561 TimeHandling::Milli => Time::with_milli(i as u32),
562 TimeHandling::Micro => Time::with_micro(i as u64),
563 })
564 .unwrap()
565 .map_err(|_| create_error())?
566 .into(),
567 (DataType::Timestamp, ValueType::String) => value
569 .as_str()
570 .unwrap()
571 .parse::<Timestamp>()
572 .map_err(|_| create_error())?
573 .into(),
574 (
575 DataType::Timestamp,
576 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
577 ) => {
578 match self.timestamp_handling {
579 TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
582 .map_err(|_| create_error())?
583 .into(),
584 TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
585 .map_err(|_| create_error())?
586 .into(),
587 }
588 }
589 (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
591 TimestamptzHandling::UtcWithoutSuffix => value
592 .as_str()
593 .unwrap()
594 .parse::<Timestamp>()
595 .map(|naive_utc| {
596 Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
597 })
598 .map_err(|_| create_error())?
599 .into(),
600 _ => value
602 .as_str()
603 .unwrap()
604 .parse::<Timestamptz>()
605 .map_err(|_| create_error())?
606 .into(),
607 },
608 (
609 DataType::Timestamptz,
610 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
611 ) => value
612 .as_i64()
613 .and_then(|num| match self.timestamptz_handling {
614 TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
615 TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
616 TimestamptzHandling::Milli => Timestamptz::from_millis(num),
617 TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
619 })
620 .ok_or_else(create_error)?
621 .into(),
622 (DataType::Interval, ValueType::String) => value
624 .as_str()
625 .unwrap()
626 .parse::<Interval>()
627 .map_err(|_| create_error())?
628 .into(),
629 (DataType::Struct(struct_type_info), ValueType::Object) => {
631 let mut fields = Vec::with_capacity(struct_type_info.len());
634 for (field_name, field_type) in struct_type_info.iter() {
635 let field_value = json_object_get_case_insensitive(value, field_name)
636 .unwrap_or_else(|| {
637 let error = AccessError::Undefined {
638 name: field_name.to_owned(),
639 path: struct_type_info.to_string(), };
641 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
643 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
644 tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
645 }
646 &BorrowedValue::Static(simd_json::StaticNode::Null)
647 });
648 fields.push(
649 self.parse(field_value, field_type)
650 .map(|d| d.to_owned_datum())?,
651 );
652 }
653 StructValue::new(fields).into()
654 }
655
656 (DataType::Struct(_), ValueType::String)
659 if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
660 {
661 let mut value = value.as_str().unwrap().as_bytes().to_vec();
663 let value =
664 simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
665 return self
666 .parse(&value, type_expected)
667 .map(|d| d.to_owned_datum().into());
668 }
669
670 (DataType::List(list_type), ValueType::Array) => ListValue::new({
672 let item_type = list_type.elem();
673 let array = value.as_array().unwrap();
674 let mut builder = item_type.create_array_builder(array.len());
675 for v in array {
676 let value = self.parse(v, item_type)?;
677 builder.append(value);
678 }
679 builder.finish()
680 })
681 .into(),
682
683 (DataType::Bytea, ValueType::String) => {
685 let value_str = value.as_str().unwrap();
686
687 match self.bytea_handling {
688 ByteaHandling::Standard => {
689 let mut buf = Vec::new();
690 str_to_bytea(value_str, &mut buf).map_err(|_| create_error())?;
691 buf.into()
692 }
693 ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
694 .decode(value_str)
695 .map_err(|_| create_error())?
696 .into_boxed_slice()
697 .into(),
698 }
699 }
700 (DataType::Bytea, ValueType::Object) => {
703 match try_parse_debezium_geometry_as_bytea(value, create_error)? {
704 Some(bytes) => bytes.into(),
705 None => Err(create_error())?,
706 }
707 }
708 (DataType::Jsonb, ValueType::String)
710 if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
711 {
712 match self.handle_toast_columns {
716 true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
717 .map_err(|_| create_error())?
718 .into(),
719 false => JsonbVal::from_str(value.as_str().unwrap())
720 .map_err(|_| create_error())?
721 .into(),
722 }
723 }
724 (DataType::Jsonb, _)
725 if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
726 {
727 let value: serde_json::Value =
728 value.clone().try_into().map_err(|_| create_error())?;
729 JsonbVal::from(value).into()
730 }
731 (
733 DataType::Int256,
734 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
735 ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
736
737 (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
738 .map_err(|_| create_error())?
739 .into(),
740
741 (_expected, _got) => Err(create_error())?,
742 };
743 Ok(DatumCow::Owned(Some(v)))
744 }
745}
746
747fn try_base64_decode_decimal(
755 str_val: &str,
756 bigint_unsigned_handling: BigintUnsignedHandlingMode,
757 create_error: impl Fn() -> AccessError,
758) -> Result<Decimal, AccessError> {
759 match bigint_unsigned_handling {
760 BigintUnsignedHandlingMode::Precise => {
761 let value = base64::engine::general_purpose::STANDARD
766 .decode(str_val)
767 .map_err(|_| create_error())?;
768 let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value);
769 Decimal::from_str(&unscaled.to_string()).map_err(|_| create_error())
770 }
771 BigintUnsignedHandlingMode::Long => {
772 Err(create_error())
774 }
775 }
776}
777
778pub struct JsonAccess<'a> {
779 value: BorrowedValue<'a>,
780 options: &'a JsonParseOptions,
781}
782
783impl<'a> JsonAccess<'a> {
784 pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
785 Self { value, options }
786 }
787
788 pub fn new(value: BorrowedValue<'a>) -> Self {
789 Self::new_with_options(value, &JsonParseOptions::DEFAULT)
790 }
791}
792
793impl Access for JsonAccess<'_> {
794 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
795 let mut value = &self.value;
796
797 for (idx, &key) in path.iter().enumerate() {
798 if let Some(sub_value) = if self.options.ignoring_keycase {
799 json_object_get_case_insensitive(value, key)
800 } else {
801 value.get(key)
802 } {
803 value = sub_value;
804 } else {
805 Err(AccessError::Undefined {
806 name: key.to_owned(),
807 path: path.iter().take(idx).join("."),
808 })?;
809 }
810 }
811
812 self.options.parse(value, type_expected)
813 }
814}
815
816fn json_object_get_case_insensitive<'b>(
820 v: &'b simd_json::BorrowedValue<'b>,
821 key: &str,
822) -> Option<&'b simd_json::BorrowedValue<'b>> {
823 let obj = v.as_object()?;
824 let value = obj.get(key);
825 if value.is_some() {
826 return value; }
828 for (k, v) in obj {
829 if k.eq_ignore_ascii_case(key) {
830 return Some(v);
831 }
832 }
833 None
834}