1use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25 DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26 ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31 TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42 Standard,
43 Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48 Milli,
49 Micro,
50}
51
52#[derive(Clone, Debug)]
53pub enum TimestamptzHandling {
54 UtcString,
56 UtcWithoutSuffix,
58 Milli,
60 Micro,
62 GuessNumberUnit,
68}
69
70impl TimestamptzHandling {
71 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
72
73 pub fn from_options(
74 options: &std::collections::BTreeMap<String, String>,
75 ) -> Result<Option<Self>, InvalidOptionError> {
76 let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
77 Some("utc_string") => Self::UtcString,
78 Some("utc_without_suffix") => Self::UtcWithoutSuffix,
79 Some("micro") => Self::Micro,
80 Some("milli") => Self::Milli,
81 Some("guess_number_unit") => Self::GuessNumberUnit,
82 Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
83 None => return Ok(None),
84 };
85 Ok(Some(mode))
86 }
87}
88
89#[derive(Clone, Debug)]
90pub enum TimestampHandling {
91 Milli,
92 GuessNumberUnit,
93}
94
95#[derive(Clone, Debug)]
96pub enum JsonValueHandling {
97 AsValue,
98 AsString,
99}
100#[derive(Clone, Debug)]
101pub enum NumericHandling {
102 Strict,
103 Relax {
105 string_parsing: bool,
107 },
108}
109#[derive(Clone, Debug)]
110pub enum BooleanHandling {
111 Strict,
112 Relax {
114 string_parsing: bool,
116 string_integer_parsing: bool,
118 },
119}
120
121#[derive(Clone, Debug)]
122pub enum VarcharHandling {
123 Strict,
125 OnlyPrimaryTypes,
127 AllTypes,
129}
130
131#[derive(Clone, Debug)]
132pub enum StructHandling {
133 Strict,
135 AllowJsonString,
138}
139
140#[derive(Clone, Debug)]
141pub struct JsonParseOptions {
142 pub bytea_handling: ByteaHandling,
143 pub time_handling: TimeHandling,
144 pub timestamp_handling: TimestampHandling,
145 pub timestamptz_handling: TimestamptzHandling,
146 pub json_value_handling: JsonValueHandling,
147 pub numeric_handling: NumericHandling,
148 pub boolean_handling: BooleanHandling,
149 pub varchar_handling: VarcharHandling,
150 pub struct_handling: StructHandling,
151 pub ignoring_keycase: bool,
152 pub handle_toast_columns: bool,
153}
154
155impl Default for JsonParseOptions {
156 fn default() -> Self {
157 Self::DEFAULT.clone()
158 }
159}
160
161impl JsonParseOptions {
162 pub const CANAL: JsonParseOptions = JsonParseOptions {
163 bytea_handling: ByteaHandling::Standard,
164 time_handling: TimeHandling::Micro,
165 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
168 numeric_handling: NumericHandling::Relax {
169 string_parsing: true,
170 },
171 boolean_handling: BooleanHandling::Relax {
172 string_parsing: true,
173 string_integer_parsing: true,
174 },
175 varchar_handling: VarcharHandling::Strict,
176 struct_handling: StructHandling::Strict,
177 ignoring_keycase: true,
178 handle_toast_columns: false,
179 };
180 pub const DEFAULT: JsonParseOptions = JsonParseOptions {
181 bytea_handling: ByteaHandling::Standard,
182 time_handling: TimeHandling::Micro,
183 timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
186 numeric_handling: NumericHandling::Relax {
187 string_parsing: true,
188 },
189 boolean_handling: BooleanHandling::Strict,
190 varchar_handling: VarcharHandling::OnlyPrimaryTypes,
191 struct_handling: StructHandling::AllowJsonString,
192 ignoring_keycase: true,
193 handle_toast_columns: false,
194 };
195
196 pub fn new_for_debezium(
197 timestamptz_handling: TimestamptzHandling,
198 timestamp_handling: TimestampHandling,
199 time_handling: TimeHandling,
200 handle_toast_columns: bool,
201 ) -> Self {
202 Self {
203 bytea_handling: ByteaHandling::Base64,
204 time_handling,
205 timestamp_handling,
206 timestamptz_handling,
207 json_value_handling: JsonValueHandling::AsString,
208 numeric_handling: NumericHandling::Relax {
209 string_parsing: false,
210 },
211 boolean_handling: BooleanHandling::Relax {
212 string_parsing: false,
213 string_integer_parsing: false,
214 },
215 varchar_handling: VarcharHandling::Strict,
216 struct_handling: StructHandling::Strict,
217 ignoring_keycase: true,
218 handle_toast_columns,
219 }
220 }
221
222 pub fn parse<'a>(
223 &self,
224 value: &'a BorrowedValue<'a>,
225 type_expected: &DataType,
226 ) -> AccessResult<DatumCow<'a>> {
227 let create_error = || AccessError::TypeError {
228 expected: format!("{:?}", type_expected),
229 got: value.value_type().to_string(),
230 value: value.to_string(),
231 };
232 let v: ScalarImpl = match (type_expected, value.value_type()) {
233 (_, ValueType::Null) => return Ok(DatumCow::NULL),
234 (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
236
237 (
238 DataType::Boolean,
239 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
240 ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
241 && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
242 {
243 (value.as_i64() == Some(1i64)).into()
244 }
245
246 (DataType::Boolean, ValueType::String)
247 if matches!(
248 self.boolean_handling,
249 BooleanHandling::Relax {
250 string_parsing: true,
251 ..
252 }
253 ) =>
254 {
255 match value.as_str().unwrap().to_lowercase().as_str() {
256 "true" => true.into(),
257 "false" => false.into(),
258 c @ ("1" | "0")
259 if matches!(
260 self.boolean_handling,
261 BooleanHandling::Relax {
262 string_parsing: true,
263 string_integer_parsing: true
264 }
265 ) =>
266 {
267 if c == "1" {
268 true.into()
269 } else {
270 false.into()
271 }
272 }
273 _ => Err(create_error())?,
274 }
275 }
276 (
278 DataType::Int16,
279 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
280 ) => value.try_as_i16().map_err(|_| create_error())?.into(),
281
282 (DataType::Int16, ValueType::String)
283 if matches!(
284 self.numeric_handling,
285 NumericHandling::Relax {
286 string_parsing: true
287 }
288 ) =>
289 {
290 value
291 .as_str()
292 .unwrap()
293 .parse::<i16>()
294 .map_err(|_| create_error())?
295 .into()
296 }
297 (
299 DataType::Int32,
300 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
301 ) => value.try_as_i32().map_err(|_| create_error())?.into(),
302
303 (DataType::Int32, ValueType::String)
304 if matches!(
305 self.numeric_handling,
306 NumericHandling::Relax {
307 string_parsing: true
308 }
309 ) =>
310 {
311 value
312 .as_str()
313 .unwrap()
314 .parse::<i32>()
315 .map_err(|_| create_error())?
316 .into()
317 }
318 (
320 DataType::Int64,
321 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
322 ) => value.try_as_i64().map_err(|_| create_error())?.into(),
323
324 (DataType::Int64, ValueType::String)
325 if matches!(
326 self.numeric_handling,
327 NumericHandling::Relax {
328 string_parsing: true
329 }
330 ) =>
331 {
332 value
333 .as_str()
334 .unwrap()
335 .parse::<i64>()
336 .map_err(|_| create_error())?
337 .into()
338 }
339 (
341 DataType::Float32,
342 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
343 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
344 (value.try_as_i64().map_err(|_| create_error())? as f32).into()
345 }
346 (DataType::Float32, ValueType::String)
347 if matches!(
348 self.numeric_handling,
349 NumericHandling::Relax {
350 string_parsing: true
351 }
352 ) =>
353 {
354 value
355 .as_str()
356 .unwrap()
357 .parse::<f32>()
358 .map_err(|_| create_error())?
359 .into()
360 }
361 (DataType::Float32, ValueType::F64) => {
362 value.try_as_f32().map_err(|_| create_error())?.into()
363 }
364 (
366 DataType::Float64,
367 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
368 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
369 (value.try_as_i64().map_err(|_| create_error())? as f64).into()
370 }
371 (DataType::Float64, ValueType::String)
372 if matches!(
373 self.numeric_handling,
374 NumericHandling::Relax {
375 string_parsing: true
376 }
377 ) =>
378 {
379 value
380 .as_str()
381 .unwrap()
382 .parse::<f64>()
383 .map_err(|_| create_error())?
384 .into()
385 }
386 (DataType::Float64, ValueType::F64) => {
387 value.try_as_f64().map_err(|_| create_error())?.into()
388 }
389 (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
391 Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
392 .map_err(|_| create_error())?
393 .into()
394 }
395 (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
396 Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
397 }
398
399 (DataType::Decimal, ValueType::F64) => {
400 Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
401 .map_err(|_| create_error())?
402 .into()
403 }
404 (DataType::Decimal, ValueType::String) => {
405 let str = value.as_str().unwrap();
406 match str {
408 "NAN" => ScalarImpl::Decimal(Decimal::NaN),
409 "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf),
410 "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf),
411 _ => {
412 ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?)
413 }
414 }
415 }
416 (DataType::Decimal, ValueType::Object) => {
417 let scale = value
420 .get("scale")
421 .ok_or_else(create_error)?
422 .as_i32()
423 .unwrap();
424 let value = value
425 .get("value")
426 .ok_or_else(create_error)?
427 .as_str()
428 .unwrap()
429 .as_bytes();
430 let unscaled = BigInt::from_signed_bytes_be(value);
431 let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
432 ScalarImpl::Decimal(Decimal::Normalized(decimal))
433 }
434 (
436 DataType::Date,
437 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
438 ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
439 .map_err(|_| create_error())?
440 .into(),
441 (DataType::Date, ValueType::String) => value
442 .as_str()
443 .unwrap()
444 .parse::<Date>()
445 .map_err(|_| create_error())?
446 .into(),
447 (DataType::Varchar, ValueType::String) => {
449 return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
450 }
451 (
452 DataType::Varchar,
453 ValueType::Bool
454 | ValueType::I64
455 | ValueType::I128
456 | ValueType::U64
457 | ValueType::U128
458 | ValueType::F64,
459 ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
460 value.to_string().into()
461 }
462 (
463 DataType::Varchar,
464 ValueType::Bool
465 | ValueType::I64
466 | ValueType::I128
467 | ValueType::U64
468 | ValueType::U128
469 | ValueType::F64
470 | ValueType::Array
471 | ValueType::Object,
472 ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
473 value.to_string().into()
474 }
475 (DataType::Time, ValueType::String) => value
477 .as_str()
478 .unwrap()
479 .parse::<Time>()
480 .map_err(|_| create_error())?
481 .into(),
482 (
483 DataType::Time,
484 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
485 ) => value
486 .as_i64()
487 .map(|i| match self.time_handling {
488 TimeHandling::Milli => Time::with_milli(i as u32),
489 TimeHandling::Micro => Time::with_micro(i as u64),
490 })
491 .unwrap()
492 .map_err(|_| create_error())?
493 .into(),
494 (DataType::Timestamp, ValueType::String) => value
496 .as_str()
497 .unwrap()
498 .parse::<Timestamp>()
499 .map_err(|_| create_error())?
500 .into(),
501 (
502 DataType::Timestamp,
503 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
504 ) => {
505 match self.timestamp_handling {
506 TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
509 .map_err(|_| create_error())?
510 .into(),
511 TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
512 .map_err(|_| create_error())?
513 .into(),
514 }
515 }
516 (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
518 TimestamptzHandling::UtcWithoutSuffix => value
519 .as_str()
520 .unwrap()
521 .parse::<Timestamp>()
522 .map(|naive_utc| {
523 Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
524 })
525 .map_err(|_| create_error())?
526 .into(),
527 _ => value
529 .as_str()
530 .unwrap()
531 .parse::<Timestamptz>()
532 .map_err(|_| create_error())?
533 .into(),
534 },
535 (
536 DataType::Timestamptz,
537 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
538 ) => value
539 .as_i64()
540 .and_then(|num| match self.timestamptz_handling {
541 TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
542 TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
543 TimestamptzHandling::Milli => Timestamptz::from_millis(num),
544 TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
546 })
547 .ok_or_else(create_error)?
548 .into(),
549 (DataType::Interval, ValueType::String) => value
551 .as_str()
552 .unwrap()
553 .parse::<Interval>()
554 .map_err(|_| create_error())?
555 .into(),
556 (DataType::Struct(struct_type_info), ValueType::Object) => {
558 let mut fields = Vec::with_capacity(struct_type_info.len());
561 for (field_name, field_type) in struct_type_info.iter() {
562 let field_value = json_object_get_case_insensitive(value, field_name)
563 .unwrap_or_else(|| {
564 let error = AccessError::Undefined {
565 name: field_name.to_owned(),
566 path: struct_type_info.to_string(), };
568 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
570 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
571 tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
572 }
573 &BorrowedValue::Static(simd_json::StaticNode::Null)
574 });
575 fields.push(
576 self.parse(field_value, field_type)
577 .map(|d| d.to_owned_datum())?,
578 );
579 }
580 StructValue::new(fields).into()
581 }
582
583 (DataType::Struct(_), ValueType::String)
586 if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
587 {
588 let mut value = value.as_str().unwrap().as_bytes().to_vec();
590 let value =
591 simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
592 return self
593 .parse(&value, type_expected)
594 .map(|d| d.to_owned_datum().into());
595 }
596
597 (DataType::List(item_type), ValueType::Array) => ListValue::new({
599 let array = value.as_array().unwrap();
600 let mut builder = item_type.create_array_builder(array.len());
601 for v in array {
602 let value = self.parse(v, item_type)?;
603 builder.append(value);
604 }
605 builder.finish()
606 })
607 .into(),
608
609 (DataType::Bytea, ValueType::String) => {
611 let value_str = value.as_str().unwrap();
612
613 match self.bytea_handling {
614 ByteaHandling::Standard => {
615 str_to_bytea(value_str).map_err(|_| create_error())?.into()
616 }
617 ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
618 .decode(value_str)
619 .map_err(|_| create_error())?
620 .into_boxed_slice()
621 .into(),
622 }
623 }
624 (DataType::Jsonb, ValueType::String)
626 if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
627 {
628 match self.handle_toast_columns {
632 true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
633 .map_err(|_| create_error())?
634 .into(),
635 false => JsonbVal::from_str(value.as_str().unwrap())
636 .map_err(|_| create_error())?
637 .into(),
638 }
639 }
640 (DataType::Jsonb, _)
641 if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
642 {
643 let value: serde_json::Value =
644 value.clone().try_into().map_err(|_| create_error())?;
645 JsonbVal::from(value).into()
646 }
647 (
649 DataType::Int256,
650 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
651 ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
652
653 (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
654 .map_err(|_| create_error())?
655 .into(),
656
657 (_expected, _got) => Err(create_error())?,
658 };
659 Ok(DatumCow::Owned(Some(v)))
660 }
661}
662
663pub struct JsonAccess<'a> {
664 value: BorrowedValue<'a>,
665 options: &'a JsonParseOptions,
666}
667
668impl<'a> JsonAccess<'a> {
669 pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
670 Self { value, options }
671 }
672
673 pub fn new(value: BorrowedValue<'a>) -> Self {
674 Self::new_with_options(value, &JsonParseOptions::DEFAULT)
675 }
676}
677
678impl Access for JsonAccess<'_> {
679 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
680 let mut value = &self.value;
681
682 for (idx, &key) in path.iter().enumerate() {
683 if let Some(sub_value) = if self.options.ignoring_keycase {
684 json_object_get_case_insensitive(value, key)
685 } else {
686 value.get(key)
687 } {
688 value = sub_value;
689 } else {
690 Err(AccessError::Undefined {
691 name: key.to_owned(),
692 path: path.iter().take(idx).join("."),
693 })?;
694 }
695 }
696
697 self.options.parse(value, type_expected)
698 }
699}
700
701fn json_object_get_case_insensitive<'b>(
705 v: &'b simd_json::BorrowedValue<'b>,
706 key: &str,
707) -> Option<&'b simd_json::BorrowedValue<'b>> {
708 let obj = v.as_object()?;
709 let value = obj.get(key);
710 if value.is_some() {
711 return value; }
713 for (k, v) in obj {
714 if k.eq_ignore_ascii_case(key) {
715 return Some(v);
716 }
717 }
718 None
719}