1use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25 DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26 ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31 TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42 Standard,
43 Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48 Milli,
49 Micro,
50}
51
52#[derive(Clone, Debug)]
53pub enum TimestamptzHandling {
54 UtcString,
56 UtcWithoutSuffix,
58 Milli,
60 Micro,
62 GuessNumberUnit,
68}
69
70impl TimestamptzHandling {
71 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
72
73 pub fn from_options(
74 options: &std::collections::BTreeMap<String, String>,
75 ) -> Result<Option<Self>, InvalidOptionError> {
76 let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
77 Some("utc_string") => Self::UtcString,
78 Some("utc_without_suffix") => Self::UtcWithoutSuffix,
79 Some("micro") => Self::Micro,
80 Some("milli") => Self::Milli,
81 Some("guess_number_unit") => Self::GuessNumberUnit,
82 Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
83 None => return Ok(None),
84 };
85 Ok(Some(mode))
86 }
87}
88
89#[derive(Clone, Debug)]
90pub enum TimestampHandling {
91 Milli,
92 GuessNumberUnit,
93}
94
95#[derive(Clone, Debug)]
96pub enum JsonValueHandling {
97 AsValue,
98 AsString,
99}
100#[derive(Clone, Debug)]
101pub enum NumericHandling {
102 Strict,
103 Relax {
105 string_parsing: bool,
107 },
108}
109#[derive(Clone, Debug)]
110pub enum BooleanHandling {
111 Strict,
112 Relax {
114 string_parsing: bool,
116 string_integer_parsing: bool,
118 },
119}
120
121#[derive(Clone, Debug)]
122pub enum VarcharHandling {
123 Strict,
125 OnlyPrimaryTypes,
127 AllTypes,
129}
130
131#[derive(Clone, Debug)]
132pub enum StructHandling {
133 Strict,
135 AllowJsonString,
138}
139
140#[derive(Clone, Debug)]
141pub struct JsonParseOptions {
142 pub bytea_handling: ByteaHandling,
143 pub time_handling: TimeHandling,
144 pub timestamp_handling: TimestampHandling,
145 pub timestamptz_handling: TimestamptzHandling,
146 pub json_value_handling: JsonValueHandling,
147 pub numeric_handling: NumericHandling,
148 pub boolean_handling: BooleanHandling,
149 pub varchar_handling: VarcharHandling,
150 pub struct_handling: StructHandling,
151 pub ignoring_keycase: bool,
152}
153
154impl Default for JsonParseOptions {
155 fn default() -> Self {
156 Self::DEFAULT.clone()
157 }
158}
159
160impl JsonParseOptions {
161 pub const CANAL: JsonParseOptions = JsonParseOptions {
162 bytea_handling: ByteaHandling::Standard,
163 time_handling: TimeHandling::Micro,
164 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
167 numeric_handling: NumericHandling::Relax {
168 string_parsing: true,
169 },
170 boolean_handling: BooleanHandling::Relax {
171 string_parsing: true,
172 string_integer_parsing: true,
173 },
174 varchar_handling: VarcharHandling::Strict,
175 struct_handling: StructHandling::Strict,
176 ignoring_keycase: true,
177 };
178 pub const DEFAULT: JsonParseOptions = JsonParseOptions {
179 bytea_handling: ByteaHandling::Standard,
180 time_handling: TimeHandling::Micro,
181 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
184 numeric_handling: NumericHandling::Relax {
185 string_parsing: true,
186 },
187 boolean_handling: BooleanHandling::Strict,
188 varchar_handling: VarcharHandling::OnlyPrimaryTypes,
189 struct_handling: StructHandling::AllowJsonString,
190 ignoring_keycase: true,
191 };
192
193 pub fn new_for_debezium(
194 timestamptz_handling: TimestamptzHandling,
195 timestamp_handling: TimestampHandling,
196 time_handling: TimeHandling,
197 ) -> Self {
198 Self {
199 bytea_handling: ByteaHandling::Base64,
200 time_handling,
201 timestamp_handling,
202 timestamptz_handling,
203 json_value_handling: JsonValueHandling::AsString,
204 numeric_handling: NumericHandling::Relax {
205 string_parsing: false,
206 },
207 boolean_handling: BooleanHandling::Relax {
208 string_parsing: false,
209 string_integer_parsing: false,
210 },
211 varchar_handling: VarcharHandling::Strict,
212 struct_handling: StructHandling::Strict,
213 ignoring_keycase: true,
214 }
215 }
216
217 pub fn parse<'a>(
218 &self,
219 value: &'a BorrowedValue<'a>,
220 type_expected: &DataType,
221 ) -> AccessResult<DatumCow<'a>> {
222 let create_error = || AccessError::TypeError {
223 expected: format!("{:?}", type_expected),
224 got: value.value_type().to_string(),
225 value: value.to_string(),
226 };
227
228 let v: ScalarImpl = match (type_expected, value.value_type()) {
229 (_, ValueType::Null) => return Ok(DatumCow::NULL),
230 (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
232
233 (
234 DataType::Boolean,
235 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
236 ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
237 && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
238 {
239 (value.as_i64() == Some(1i64)).into()
240 }
241
242 (DataType::Boolean, ValueType::String)
243 if matches!(
244 self.boolean_handling,
245 BooleanHandling::Relax {
246 string_parsing: true,
247 ..
248 }
249 ) =>
250 {
251 match value.as_str().unwrap().to_lowercase().as_str() {
252 "true" => true.into(),
253 "false" => false.into(),
254 c @ ("1" | "0")
255 if matches!(
256 self.boolean_handling,
257 BooleanHandling::Relax {
258 string_parsing: true,
259 string_integer_parsing: true
260 }
261 ) =>
262 {
263 if c == "1" {
264 true.into()
265 } else {
266 false.into()
267 }
268 }
269 _ => Err(create_error())?,
270 }
271 }
272 (
274 DataType::Int16,
275 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
276 ) => value.try_as_i16().map_err(|_| create_error())?.into(),
277
278 (DataType::Int16, ValueType::String)
279 if matches!(
280 self.numeric_handling,
281 NumericHandling::Relax {
282 string_parsing: true
283 }
284 ) =>
285 {
286 value
287 .as_str()
288 .unwrap()
289 .parse::<i16>()
290 .map_err(|_| create_error())?
291 .into()
292 }
293 (
295 DataType::Int32,
296 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
297 ) => value.try_as_i32().map_err(|_| create_error())?.into(),
298
299 (DataType::Int32, ValueType::String)
300 if matches!(
301 self.numeric_handling,
302 NumericHandling::Relax {
303 string_parsing: true
304 }
305 ) =>
306 {
307 value
308 .as_str()
309 .unwrap()
310 .parse::<i32>()
311 .map_err(|_| create_error())?
312 .into()
313 }
314 (
316 DataType::Int64,
317 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
318 ) => value.try_as_i64().map_err(|_| create_error())?.into(),
319
320 (DataType::Int64, ValueType::String)
321 if matches!(
322 self.numeric_handling,
323 NumericHandling::Relax {
324 string_parsing: true
325 }
326 ) =>
327 {
328 value
329 .as_str()
330 .unwrap()
331 .parse::<i64>()
332 .map_err(|_| create_error())?
333 .into()
334 }
335 (
337 DataType::Float32,
338 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
339 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
340 (value.try_as_i64().map_err(|_| create_error())? as f32).into()
341 }
342 (DataType::Float32, ValueType::String)
343 if matches!(
344 self.numeric_handling,
345 NumericHandling::Relax {
346 string_parsing: true
347 }
348 ) =>
349 {
350 value
351 .as_str()
352 .unwrap()
353 .parse::<f32>()
354 .map_err(|_| create_error())?
355 .into()
356 }
357 (DataType::Float32, ValueType::F64) => {
358 value.try_as_f32().map_err(|_| create_error())?.into()
359 }
360 (
362 DataType::Float64,
363 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
364 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
365 (value.try_as_i64().map_err(|_| create_error())? as f64).into()
366 }
367 (DataType::Float64, ValueType::String)
368 if matches!(
369 self.numeric_handling,
370 NumericHandling::Relax {
371 string_parsing: true
372 }
373 ) =>
374 {
375 value
376 .as_str()
377 .unwrap()
378 .parse::<f64>()
379 .map_err(|_| create_error())?
380 .into()
381 }
382 (DataType::Float64, ValueType::F64) => {
383 value.try_as_f64().map_err(|_| create_error())?.into()
384 }
385 (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
387 Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
388 .map_err(|_| create_error())?
389 .into()
390 }
391 (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
392 Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
393 }
394
395 (DataType::Decimal, ValueType::F64) => {
396 Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
397 .map_err(|_| create_error())?
398 .into()
399 }
400 (DataType::Decimal, ValueType::String) => {
401 let str = value.as_str().unwrap();
402 match str {
404 "NAN" => ScalarImpl::Decimal(Decimal::NaN),
405 "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf),
406 "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf),
407 _ => {
408 ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?)
409 }
410 }
411 }
412 (DataType::Decimal, ValueType::Object) => {
413 let scale = value
416 .get("scale")
417 .ok_or_else(create_error)?
418 .as_i32()
419 .unwrap();
420 let value = value
421 .get("value")
422 .ok_or_else(create_error)?
423 .as_str()
424 .unwrap()
425 .as_bytes();
426 let unscaled = BigInt::from_signed_bytes_be(value);
427 let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
428 ScalarImpl::Decimal(Decimal::Normalized(decimal))
429 }
430 (
432 DataType::Date,
433 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
434 ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
435 .map_err(|_| create_error())?
436 .into(),
437 (DataType::Date, ValueType::String) => value
438 .as_str()
439 .unwrap()
440 .parse::<Date>()
441 .map_err(|_| create_error())?
442 .into(),
443 (DataType::Varchar, ValueType::String) => {
445 return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
446 }
447 (
448 DataType::Varchar,
449 ValueType::Bool
450 | ValueType::I64
451 | ValueType::I128
452 | ValueType::U64
453 | ValueType::U128
454 | ValueType::F64,
455 ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
456 value.to_string().into()
457 }
458 (
459 DataType::Varchar,
460 ValueType::Bool
461 | ValueType::I64
462 | ValueType::I128
463 | ValueType::U64
464 | ValueType::U128
465 | ValueType::F64
466 | ValueType::Array
467 | ValueType::Object,
468 ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
469 value.to_string().into()
470 }
471 (DataType::Time, ValueType::String) => value
473 .as_str()
474 .unwrap()
475 .parse::<Time>()
476 .map_err(|_| create_error())?
477 .into(),
478 (
479 DataType::Time,
480 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
481 ) => value
482 .as_i64()
483 .map(|i| match self.time_handling {
484 TimeHandling::Milli => Time::with_milli(i as u32),
485 TimeHandling::Micro => Time::with_micro(i as u64),
486 })
487 .unwrap()
488 .map_err(|_| create_error())?
489 .into(),
490 (DataType::Timestamp, ValueType::String) => value
492 .as_str()
493 .unwrap()
494 .parse::<Timestamp>()
495 .map_err(|_| create_error())?
496 .into(),
497 (
498 DataType::Timestamp,
499 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
500 ) => {
501 match self.timestamp_handling {
502 TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
505 .map_err(|_| create_error())?
506 .into(),
507 TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
508 .map_err(|_| create_error())?
509 .into(),
510 }
511 }
512 (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
514 TimestamptzHandling::UtcWithoutSuffix => value
515 .as_str()
516 .unwrap()
517 .parse::<Timestamp>()
518 .map(|naive_utc| {
519 Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
520 })
521 .map_err(|_| create_error())?
522 .into(),
523 _ => value
525 .as_str()
526 .unwrap()
527 .parse::<Timestamptz>()
528 .map_err(|_| create_error())?
529 .into(),
530 },
531 (
532 DataType::Timestamptz,
533 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
534 ) => value
535 .as_i64()
536 .and_then(|num| match self.timestamptz_handling {
537 TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
538 TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
539 TimestamptzHandling::Milli => Timestamptz::from_millis(num),
540 TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
542 })
543 .ok_or_else(create_error)?
544 .into(),
545 (DataType::Interval, ValueType::String) => value
547 .as_str()
548 .unwrap()
549 .parse::<Interval>()
550 .map_err(|_| create_error())?
551 .into(),
552 (DataType::Struct(struct_type_info), ValueType::Object) => {
554 let mut fields = Vec::with_capacity(struct_type_info.len());
557 for (field_name, field_type) in struct_type_info.iter() {
558 let field_value = json_object_get_case_insensitive(value, field_name)
559 .unwrap_or_else(|| {
560 let error = AccessError::Undefined {
561 name: field_name.to_owned(),
562 path: struct_type_info.to_string(), };
564 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
566 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
567 tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
568 }
569 &BorrowedValue::Static(simd_json::StaticNode::Null)
570 });
571 fields.push(
572 self.parse(field_value, field_type)
573 .map(|d| d.to_owned_datum())?,
574 );
575 }
576 StructValue::new(fields).into()
577 }
578
579 (DataType::Struct(_), ValueType::String)
582 if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
583 {
584 let mut value = value.as_str().unwrap().as_bytes().to_vec();
586 let value =
587 simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
588 return self
589 .parse(&value, type_expected)
590 .map(|d| d.to_owned_datum().into());
591 }
592
593 (DataType::List(item_type), ValueType::Array) => ListValue::new({
595 let array = value.as_array().unwrap();
596 let mut builder = item_type.create_array_builder(array.len());
597 for v in array {
598 let value = self.parse(v, item_type)?;
599 builder.append(value);
600 }
601 builder.finish()
602 })
603 .into(),
604
605 (DataType::Bytea, ValueType::String) => match self.bytea_handling {
607 ByteaHandling::Standard => str_to_bytea(value.as_str().unwrap())
608 .map_err(|_| create_error())?
609 .into(),
610 ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
611 .decode(value.as_str().unwrap())
612 .map_err(|_| create_error())?
613 .into_boxed_slice()
614 .into(),
615 },
616 (DataType::Jsonb, ValueType::String)
618 if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
619 {
620 JsonbVal::from_str(value.as_str().unwrap())
621 .map_err(|_| create_error())?
622 .into()
623 }
624 (DataType::Jsonb, _)
625 if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
626 {
627 let value: serde_json::Value =
628 value.clone().try_into().map_err(|_| create_error())?;
629 JsonbVal::from(value).into()
630 }
631 (
633 DataType::Int256,
634 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
635 ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
636
637 (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
638 .map_err(|_| create_error())?
639 .into(),
640
641 (_expected, _got) => Err(create_error())?,
642 };
643 Ok(DatumCow::Owned(Some(v)))
644 }
645}
646
647pub struct JsonAccess<'a> {
648 value: BorrowedValue<'a>,
649 options: &'a JsonParseOptions,
650}
651
652impl<'a> JsonAccess<'a> {
653 pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
654 Self { value, options }
655 }
656
657 pub fn new(value: BorrowedValue<'a>) -> Self {
658 Self::new_with_options(value, &JsonParseOptions::DEFAULT)
659 }
660}
661
662impl Access for JsonAccess<'_> {
663 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
664 let mut value = &self.value;
665
666 for (idx, &key) in path.iter().enumerate() {
667 if let Some(sub_value) = if self.options.ignoring_keycase {
668 json_object_get_case_insensitive(value, key)
669 } else {
670 value.get(key)
671 } {
672 value = sub_value;
673 } else {
674 Err(AccessError::Undefined {
675 name: key.to_owned(),
676 path: path.iter().take(idx).join("."),
677 })?;
678 }
679 }
680
681 self.options.parse(value, type_expected)
682 }
683}
684
685fn json_object_get_case_insensitive<'b>(
689 v: &'b simd_json::BorrowedValue<'b>,
690 key: &str,
691) -> Option<&'b simd_json::BorrowedValue<'b>> {
692 let obj = v.as_object()?;
693 let value = obj.get(key);
694 if value.is_some() {
695 return value; }
697 for (k, v) in obj {
698 if k.eq_ignore_ascii_case(key) {
699 return Some(v);
700 }
701 }
702 None
703}