1use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25 DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26 ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31 TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42 Standard,
43 Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48 Milli,
49 Micro,
50}
51
52#[derive(Clone, Copy, Debug)]
53pub enum BigintUnsignedHandlingMode {
54 Long,
56 Precise,
58}
59
60#[derive(Clone, Debug)]
61pub enum TimestamptzHandling {
62 UtcString,
64 UtcWithoutSuffix,
66 Milli,
68 Micro,
70 GuessNumberUnit,
76}
77
78impl TimestamptzHandling {
79 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
80
81 pub fn from_options(
82 options: &std::collections::BTreeMap<String, String>,
83 ) -> Result<Option<Self>, InvalidOptionError> {
84 let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
85 Some("utc_string") => Self::UtcString,
86 Some("utc_without_suffix") => Self::UtcWithoutSuffix,
87 Some("micro") => Self::Micro,
88 Some("milli") => Self::Milli,
89 Some("guess_number_unit") => Self::GuessNumberUnit,
90 Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
91 None => return Ok(None),
92 };
93 Ok(Some(mode))
94 }
95}
96
97#[derive(Clone, Debug)]
98pub enum TimestampHandling {
99 Milli,
100 GuessNumberUnit,
101}
102
103#[derive(Clone, Debug)]
104pub enum JsonValueHandling {
105 AsValue,
106 AsString,
107}
108#[derive(Clone, Debug)]
109pub enum NumericHandling {
110 Strict,
111 Relax {
113 string_parsing: bool,
115 },
116}
117#[derive(Clone, Debug)]
118pub enum BooleanHandling {
119 Strict,
120 Relax {
122 string_parsing: bool,
124 string_integer_parsing: bool,
126 },
127}
128
129#[derive(Clone, Debug)]
130pub enum VarcharHandling {
131 Strict,
133 OnlyPrimaryTypes,
135 AllTypes,
137}
138
139#[derive(Clone, Debug)]
140pub enum StructHandling {
141 Strict,
143 AllowJsonString,
146}
147
148#[derive(Clone, Debug)]
149pub struct JsonParseOptions {
150 pub bytea_handling: ByteaHandling,
151 pub time_handling: TimeHandling,
152 pub timestamp_handling: TimestampHandling,
153 pub timestamptz_handling: TimestamptzHandling,
154 pub json_value_handling: JsonValueHandling,
155 pub numeric_handling: NumericHandling,
156 pub boolean_handling: BooleanHandling,
157 pub varchar_handling: VarcharHandling,
158 pub struct_handling: StructHandling,
159 pub bigint_unsigned_handling: BigintUnsignedHandlingMode,
160 pub ignoring_keycase: bool,
161 pub handle_toast_columns: bool,
162}
163
164impl Default for JsonParseOptions {
165 fn default() -> Self {
166 Self::DEFAULT.clone()
167 }
168}
169
170impl JsonParseOptions {
171 pub const CANAL: JsonParseOptions = JsonParseOptions {
172 bytea_handling: ByteaHandling::Standard,
173 time_handling: TimeHandling::Micro,
174 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
177 numeric_handling: NumericHandling::Relax {
178 string_parsing: true,
179 },
180 boolean_handling: BooleanHandling::Relax {
181 string_parsing: true,
182 string_integer_parsing: true,
183 },
184 varchar_handling: VarcharHandling::Strict,
185 struct_handling: StructHandling::Strict,
186 bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, ignoring_keycase: true,
188 handle_toast_columns: false,
189 };
190 pub const DEFAULT: JsonParseOptions = JsonParseOptions {
191 bytea_handling: ByteaHandling::Standard,
192 time_handling: TimeHandling::Micro,
193 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
196 numeric_handling: NumericHandling::Relax {
197 string_parsing: true,
198 },
199 boolean_handling: BooleanHandling::Strict,
200 varchar_handling: VarcharHandling::OnlyPrimaryTypes,
201 struct_handling: StructHandling::AllowJsonString,
202 bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, ignoring_keycase: true,
204 handle_toast_columns: false,
205 };
206
207 pub fn new_for_debezium(
208 timestamptz_handling: TimestamptzHandling,
209 timestamp_handling: TimestampHandling,
210 time_handling: TimeHandling,
211 bigint_unsigned_handling: BigintUnsignedHandlingMode,
212 handle_toast_columns: bool,
213 ) -> Self {
214 Self {
215 bytea_handling: ByteaHandling::Base64,
216 time_handling,
217 timestamp_handling,
218 timestamptz_handling,
219 json_value_handling: JsonValueHandling::AsString,
220 numeric_handling: NumericHandling::Relax {
221 string_parsing: false,
222 },
223 boolean_handling: BooleanHandling::Relax {
224 string_parsing: false,
225 string_integer_parsing: false,
226 },
227 varchar_handling: VarcharHandling::Strict,
228 struct_handling: StructHandling::Strict,
229 bigint_unsigned_handling,
230 ignoring_keycase: true,
231 handle_toast_columns,
232 }
233 }
234
235 pub fn parse<'a>(
236 &self,
237 value: &'a BorrowedValue<'a>,
238 type_expected: &DataType,
239 ) -> AccessResult<DatumCow<'a>> {
240 let create_error = || AccessError::TypeError {
241 expected: format!("{:?}", type_expected),
242 got: value.value_type().to_string(),
243 value: value.to_string(),
244 };
245 let v: ScalarImpl = match (type_expected, value.value_type()) {
246 (_, ValueType::Null) => return Ok(DatumCow::NULL),
247 (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
249
250 (
251 DataType::Boolean,
252 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
253 ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
254 && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
255 {
256 (value.as_i64() == Some(1i64)).into()
257 }
258
259 (DataType::Boolean, ValueType::String)
260 if matches!(
261 self.boolean_handling,
262 BooleanHandling::Relax {
263 string_parsing: true,
264 ..
265 }
266 ) =>
267 {
268 match value.as_str().unwrap().to_lowercase().as_str() {
269 "true" => true.into(),
270 "false" => false.into(),
271 c @ ("1" | "0")
272 if matches!(
273 self.boolean_handling,
274 BooleanHandling::Relax {
275 string_parsing: true,
276 string_integer_parsing: true
277 }
278 ) =>
279 {
280 if c == "1" {
281 true.into()
282 } else {
283 false.into()
284 }
285 }
286 _ => Err(create_error())?,
287 }
288 }
289 (
291 DataType::Int16,
292 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
293 ) => value.try_as_i16().map_err(|_| create_error())?.into(),
294
295 (DataType::Int16, ValueType::String)
296 if matches!(
297 self.numeric_handling,
298 NumericHandling::Relax {
299 string_parsing: true
300 }
301 ) =>
302 {
303 value
304 .as_str()
305 .unwrap()
306 .parse::<i16>()
307 .map_err(|_| create_error())?
308 .into()
309 }
310 (
312 DataType::Int32,
313 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
314 ) => value.try_as_i32().map_err(|_| create_error())?.into(),
315
316 (DataType::Int32, ValueType::String)
317 if matches!(
318 self.numeric_handling,
319 NumericHandling::Relax {
320 string_parsing: true
321 }
322 ) =>
323 {
324 value
325 .as_str()
326 .unwrap()
327 .parse::<i32>()
328 .map_err(|_| create_error())?
329 .into()
330 }
331 (
333 DataType::Int64,
334 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
335 ) => value.try_as_i64().map_err(|_| create_error())?.into(),
336
337 (DataType::Int64, ValueType::String)
338 if matches!(
339 self.numeric_handling,
340 NumericHandling::Relax {
341 string_parsing: true
342 }
343 ) =>
344 {
345 value
346 .as_str()
347 .unwrap()
348 .parse::<i64>()
349 .map_err(|_| create_error())?
350 .into()
351 }
352 (
354 DataType::Float32,
355 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
356 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
357 (value.try_as_i64().map_err(|_| create_error())? as f32).into()
358 }
359 (DataType::Float32, ValueType::String)
360 if matches!(
361 self.numeric_handling,
362 NumericHandling::Relax {
363 string_parsing: true
364 }
365 ) =>
366 {
367 value
368 .as_str()
369 .unwrap()
370 .parse::<f32>()
371 .map_err(|_| create_error())?
372 .into()
373 }
374 (DataType::Float32, ValueType::F64) => {
375 value.try_as_f32().map_err(|_| create_error())?.into()
376 }
377 (
379 DataType::Float64,
380 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
381 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
382 (value.try_as_i64().map_err(|_| create_error())? as f64).into()
383 }
384 (DataType::Float64, ValueType::String)
385 if matches!(
386 self.numeric_handling,
387 NumericHandling::Relax {
388 string_parsing: true
389 }
390 ) =>
391 {
392 value
393 .as_str()
394 .unwrap()
395 .parse::<f64>()
396 .map_err(|_| create_error())?
397 .into()
398 }
399 (DataType::Float64, ValueType::F64) => {
400 value.try_as_f64().map_err(|_| create_error())?.into()
401 }
402 (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
404 Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
405 .map_err(|_| create_error())?
406 .into()
407 }
408 (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
409 let i64_val = value.try_as_i64().map_err(|_| create_error())?;
410 Decimal::from(i64_val).into()
411 }
412 (DataType::Decimal, ValueType::String) => {
413 let str_val = value.as_str().unwrap();
414 match str_val {
416 "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))),
417 "POSITIVE_INFINITY" => {
418 return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
419 Decimal::PositiveInf,
420 ))));
421 }
422 "NEGATIVE_INFINITY" => {
423 return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
424 Decimal::NegativeInf,
425 ))));
426 }
427 _ => {}
428 }
429
430 Decimal::from_str(str_val)
431 .or_else(|_err| {
432 try_base64_decode_decimal(
433 str_val,
434 self.bigint_unsigned_handling,
435 create_error,
436 )
437 })?
438 .into()
439 }
440
441 (DataType::Decimal, ValueType::F64) => {
442 Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
443 .map_err(|_| create_error())?
444 .into()
445 }
446 (DataType::Decimal, ValueType::Object) => {
447 let scale = value
450 .get("scale")
451 .ok_or_else(create_error)?
452 .as_i32()
453 .unwrap();
454 let value = value
455 .get("value")
456 .ok_or_else(create_error)?
457 .as_str()
458 .unwrap()
459 .as_bytes();
460 let unscaled = BigInt::from_signed_bytes_be(value);
461 let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
462 ScalarImpl::Decimal(Decimal::Normalized(decimal))
463 }
464 (
466 DataType::Date,
467 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
468 ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
469 .map_err(|_| create_error())?
470 .into(),
471 (DataType::Date, ValueType::String) => value
472 .as_str()
473 .unwrap()
474 .parse::<Date>()
475 .map_err(|_| create_error())?
476 .into(),
477 (DataType::Varchar, ValueType::String) => {
479 return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
480 }
481 (
482 DataType::Varchar,
483 ValueType::Bool
484 | ValueType::I64
485 | ValueType::I128
486 | ValueType::U64
487 | ValueType::U128
488 | ValueType::F64,
489 ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
490 value.to_string().into()
491 }
492 (
493 DataType::Varchar,
494 ValueType::Bool
495 | ValueType::I64
496 | ValueType::I128
497 | ValueType::U64
498 | ValueType::U128
499 | ValueType::F64
500 | ValueType::Array
501 | ValueType::Object,
502 ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
503 value.to_string().into()
504 }
505 (DataType::Time, ValueType::String) => value
507 .as_str()
508 .unwrap()
509 .parse::<Time>()
510 .map_err(|_| create_error())?
511 .into(),
512 (
513 DataType::Time,
514 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
515 ) => value
516 .as_i64()
517 .map(|i| match self.time_handling {
518 TimeHandling::Milli => Time::with_milli(i as u32),
519 TimeHandling::Micro => Time::with_micro(i as u64),
520 })
521 .unwrap()
522 .map_err(|_| create_error())?
523 .into(),
524 (DataType::Timestamp, ValueType::String) => value
526 .as_str()
527 .unwrap()
528 .parse::<Timestamp>()
529 .map_err(|_| create_error())?
530 .into(),
531 (
532 DataType::Timestamp,
533 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
534 ) => {
535 match self.timestamp_handling {
536 TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
539 .map_err(|_| create_error())?
540 .into(),
541 TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
542 .map_err(|_| create_error())?
543 .into(),
544 }
545 }
546 (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
548 TimestamptzHandling::UtcWithoutSuffix => value
549 .as_str()
550 .unwrap()
551 .parse::<Timestamp>()
552 .map(|naive_utc| {
553 Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
554 })
555 .map_err(|_| create_error())?
556 .into(),
557 _ => value
559 .as_str()
560 .unwrap()
561 .parse::<Timestamptz>()
562 .map_err(|_| create_error())?
563 .into(),
564 },
565 (
566 DataType::Timestamptz,
567 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
568 ) => value
569 .as_i64()
570 .and_then(|num| match self.timestamptz_handling {
571 TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
572 TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
573 TimestamptzHandling::Milli => Timestamptz::from_millis(num),
574 TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
576 })
577 .ok_or_else(create_error)?
578 .into(),
579 (DataType::Interval, ValueType::String) => value
581 .as_str()
582 .unwrap()
583 .parse::<Interval>()
584 .map_err(|_| create_error())?
585 .into(),
586 (DataType::Struct(struct_type_info), ValueType::Object) => {
588 let mut fields = Vec::with_capacity(struct_type_info.len());
591 for (field_name, field_type) in struct_type_info.iter() {
592 let field_value = json_object_get_case_insensitive(value, field_name)
593 .unwrap_or_else(|| {
594 let error = AccessError::Undefined {
595 name: field_name.to_owned(),
596 path: struct_type_info.to_string(), };
598 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
600 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
601 tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
602 }
603 &BorrowedValue::Static(simd_json::StaticNode::Null)
604 });
605 fields.push(
606 self.parse(field_value, field_type)
607 .map(|d| d.to_owned_datum())?,
608 );
609 }
610 StructValue::new(fields).into()
611 }
612
613 (DataType::Struct(_), ValueType::String)
616 if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
617 {
618 let mut value = value.as_str().unwrap().as_bytes().to_vec();
620 let value =
621 simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
622 return self
623 .parse(&value, type_expected)
624 .map(|d| d.to_owned_datum().into());
625 }
626
627 (DataType::List(list_type), ValueType::Array) => ListValue::new({
629 let item_type = list_type.elem();
630 let array = value.as_array().unwrap();
631 let mut builder = item_type.create_array_builder(array.len());
632 for v in array {
633 let value = self.parse(v, item_type)?;
634 builder.append(value);
635 }
636 builder.finish()
637 })
638 .into(),
639
640 (DataType::Bytea, ValueType::String) => {
642 let value_str = value.as_str().unwrap();
643
644 match self.bytea_handling {
645 ByteaHandling::Standard => {
646 str_to_bytea(value_str).map_err(|_| create_error())?.into()
647 }
648 ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
649 .decode(value_str)
650 .map_err(|_| create_error())?
651 .into_boxed_slice()
652 .into(),
653 }
654 }
655 (DataType::Jsonb, ValueType::String)
657 if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
658 {
659 match self.handle_toast_columns {
663 true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
664 .map_err(|_| create_error())?
665 .into(),
666 false => JsonbVal::from_str(value.as_str().unwrap())
667 .map_err(|_| create_error())?
668 .into(),
669 }
670 }
671 (DataType::Jsonb, _)
672 if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
673 {
674 let value: serde_json::Value =
675 value.clone().try_into().map_err(|_| create_error())?;
676 JsonbVal::from(value).into()
677 }
678 (
680 DataType::Int256,
681 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
682 ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
683
684 (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
685 .map_err(|_| create_error())?
686 .into(),
687
688 (_expected, _got) => Err(create_error())?,
689 };
690 Ok(DatumCow::Owned(Some(v)))
691 }
692}
693
694fn try_base64_decode_decimal(
702 str_val: &str,
703 bigint_unsigned_handling: BigintUnsignedHandlingMode,
704 create_error: impl Fn() -> AccessError,
705) -> Result<Decimal, AccessError> {
706 match bigint_unsigned_handling {
707 BigintUnsignedHandlingMode::Precise => {
708 let value = base64::engine::general_purpose::STANDARD
713 .decode(str_val)
714 .map_err(|_| create_error())?;
715 let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value);
716 Decimal::from_str(&unscaled.to_string()).map_err(|_| create_error())
717 }
718 BigintUnsignedHandlingMode::Long => {
719 Err(create_error())
721 }
722 }
723}
724
725pub struct JsonAccess<'a> {
726 value: BorrowedValue<'a>,
727 options: &'a JsonParseOptions,
728}
729
730impl<'a> JsonAccess<'a> {
731 pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
732 Self { value, options }
733 }
734
735 pub fn new(value: BorrowedValue<'a>) -> Self {
736 Self::new_with_options(value, &JsonParseOptions::DEFAULT)
737 }
738}
739
740impl Access for JsonAccess<'_> {
741 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
742 let mut value = &self.value;
743
744 for (idx, &key) in path.iter().enumerate() {
745 if let Some(sub_value) = if self.options.ignoring_keycase {
746 json_object_get_case_insensitive(value, key)
747 } else {
748 value.get(key)
749 } {
750 value = sub_value;
751 } else {
752 Err(AccessError::Undefined {
753 name: key.to_owned(),
754 path: path.iter().take(idx).join("."),
755 })?;
756 }
757 }
758
759 self.options.parse(value, type_expected)
760 }
761}
762
763fn json_object_get_case_insensitive<'b>(
767 v: &'b simd_json::BorrowedValue<'b>,
768 key: &str,
769) -> Option<&'b simd_json::BorrowedValue<'b>> {
770 let obj = v.as_object()?;
771 let value = obj.get(key);
772 if value.is_some() {
773 return value; }
775 for (k, v) in obj {
776 if k.eq_ignore_ascii_case(key) {
777 return Some(v);
778 }
779 }
780 None
781}