1use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25 DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26 ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31 TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42 Standard,
43 Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48 Milli,
49 Micro,
50}
51
52#[derive(Clone, Debug)]
53pub enum TimestamptzHandling {
54 UtcString,
56 UtcWithoutSuffix,
58 Milli,
60 Micro,
62 GuessNumberUnit,
68}
69
70impl TimestamptzHandling {
71 pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
72
73 pub fn from_options(
74 options: &std::collections::BTreeMap<String, String>,
75 ) -> Result<Option<Self>, InvalidOptionError> {
76 let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
77 Some("utc_string") => Self::UtcString,
78 Some("utc_without_suffix") => Self::UtcWithoutSuffix,
79 Some("micro") => Self::Micro,
80 Some("milli") => Self::Milli,
81 Some("guess_number_unit") => Self::GuessNumberUnit,
82 Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
83 None => return Ok(None),
84 };
85 Ok(Some(mode))
86 }
87}
88
89#[derive(Clone, Debug)]
90pub enum JsonValueHandling {
91 AsValue,
92 AsString,
93}
94#[derive(Clone, Debug)]
95pub enum NumericHandling {
96 Strict,
97 Relax {
99 string_parsing: bool,
101 },
102}
103#[derive(Clone, Debug)]
104pub enum BooleanHandling {
105 Strict,
106 Relax {
108 string_parsing: bool,
110 string_integer_parsing: bool,
112 },
113}
114
115#[derive(Clone, Debug)]
116pub enum VarcharHandling {
117 Strict,
119 OnlyPrimaryTypes,
121 AllTypes,
123}
124
125#[derive(Clone, Debug)]
126pub enum StructHandling {
127 Strict,
129 AllowJsonString,
132}
133
134#[derive(Clone, Debug)]
135pub struct JsonParseOptions {
136 pub bytea_handling: ByteaHandling,
137 pub time_handling: TimeHandling,
138 pub timestamptz_handling: TimestamptzHandling,
139 pub json_value_handling: JsonValueHandling,
140 pub numeric_handling: NumericHandling,
141 pub boolean_handling: BooleanHandling,
142 pub varchar_handling: VarcharHandling,
143 pub struct_handling: StructHandling,
144 pub ignoring_keycase: bool,
145}
146
147impl Default for JsonParseOptions {
148 fn default() -> Self {
149 Self::DEFAULT.clone()
150 }
151}
152
153impl JsonParseOptions {
154 pub const CANAL: JsonParseOptions = JsonParseOptions {
155 bytea_handling: ByteaHandling::Standard,
156 time_handling: TimeHandling::Micro,
157 timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
159 numeric_handling: NumericHandling::Relax {
160 string_parsing: true,
161 },
162 boolean_handling: BooleanHandling::Relax {
163 string_parsing: true,
164 string_integer_parsing: true,
165 },
166 varchar_handling: VarcharHandling::Strict,
167 struct_handling: StructHandling::Strict,
168 ignoring_keycase: true,
169 };
170 pub const DEFAULT: JsonParseOptions = JsonParseOptions {
171 bytea_handling: ByteaHandling::Standard,
172 time_handling: TimeHandling::Micro,
173 timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
175 numeric_handling: NumericHandling::Relax {
176 string_parsing: true,
177 },
178 boolean_handling: BooleanHandling::Strict,
179 varchar_handling: VarcharHandling::OnlyPrimaryTypes,
180 struct_handling: StructHandling::AllowJsonString,
181 ignoring_keycase: true,
182 };
183
184 pub fn new_for_debezium(timestamptz_handling: TimestamptzHandling) -> Self {
185 Self {
186 bytea_handling: ByteaHandling::Base64,
187 time_handling: TimeHandling::Micro,
188 timestamptz_handling,
189 json_value_handling: JsonValueHandling::AsString,
190 numeric_handling: NumericHandling::Relax {
191 string_parsing: false,
192 },
193 boolean_handling: BooleanHandling::Relax {
194 string_parsing: false,
195 string_integer_parsing: false,
196 },
197 varchar_handling: VarcharHandling::Strict,
198 struct_handling: StructHandling::Strict,
199 ignoring_keycase: true,
200 }
201 }
202
203 pub fn parse<'a>(
204 &self,
205 value: &'a BorrowedValue<'a>,
206 type_expected: &DataType,
207 ) -> AccessResult<DatumCow<'a>> {
208 let create_error = || AccessError::TypeError {
209 expected: format!("{:?}", type_expected),
210 got: value.value_type().to_string(),
211 value: value.to_string(),
212 };
213
214 let v: ScalarImpl = match (type_expected, value.value_type()) {
215 (_, ValueType::Null) => return Ok(DatumCow::NULL),
216 (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
218
219 (
220 DataType::Boolean,
221 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
222 ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
223 && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
224 {
225 (value.as_i64() == Some(1i64)).into()
226 }
227
228 (DataType::Boolean, ValueType::String)
229 if matches!(
230 self.boolean_handling,
231 BooleanHandling::Relax {
232 string_parsing: true,
233 ..
234 }
235 ) =>
236 {
237 match value.as_str().unwrap().to_lowercase().as_str() {
238 "true" => true.into(),
239 "false" => false.into(),
240 c @ ("1" | "0")
241 if matches!(
242 self.boolean_handling,
243 BooleanHandling::Relax {
244 string_parsing: true,
245 string_integer_parsing: true
246 }
247 ) =>
248 {
249 if c == "1" {
250 true.into()
251 } else {
252 false.into()
253 }
254 }
255 _ => Err(create_error())?,
256 }
257 }
258 (
260 DataType::Int16,
261 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
262 ) => value.try_as_i16().map_err(|_| create_error())?.into(),
263
264 (DataType::Int16, ValueType::String)
265 if matches!(
266 self.numeric_handling,
267 NumericHandling::Relax {
268 string_parsing: true
269 }
270 ) =>
271 {
272 value
273 .as_str()
274 .unwrap()
275 .parse::<i16>()
276 .map_err(|_| create_error())?
277 .into()
278 }
279 (
281 DataType::Int32,
282 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
283 ) => value.try_as_i32().map_err(|_| create_error())?.into(),
284
285 (DataType::Int32, ValueType::String)
286 if matches!(
287 self.numeric_handling,
288 NumericHandling::Relax {
289 string_parsing: true
290 }
291 ) =>
292 {
293 value
294 .as_str()
295 .unwrap()
296 .parse::<i32>()
297 .map_err(|_| create_error())?
298 .into()
299 }
300 (
302 DataType::Int64,
303 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
304 ) => value.try_as_i64().map_err(|_| create_error())?.into(),
305
306 (DataType::Int64, ValueType::String)
307 if matches!(
308 self.numeric_handling,
309 NumericHandling::Relax {
310 string_parsing: true
311 }
312 ) =>
313 {
314 value
315 .as_str()
316 .unwrap()
317 .parse::<i64>()
318 .map_err(|_| create_error())?
319 .into()
320 }
321 (
323 DataType::Float32,
324 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
325 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
326 (value.try_as_i64().map_err(|_| create_error())? as f32).into()
327 }
328 (DataType::Float32, ValueType::String)
329 if matches!(
330 self.numeric_handling,
331 NumericHandling::Relax {
332 string_parsing: true
333 }
334 ) =>
335 {
336 value
337 .as_str()
338 .unwrap()
339 .parse::<f32>()
340 .map_err(|_| create_error())?
341 .into()
342 }
343 (DataType::Float32, ValueType::F64) => {
344 value.try_as_f32().map_err(|_| create_error())?.into()
345 }
346 (
348 DataType::Float64,
349 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
350 ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
351 (value.try_as_i64().map_err(|_| create_error())? as f64).into()
352 }
353 (DataType::Float64, ValueType::String)
354 if matches!(
355 self.numeric_handling,
356 NumericHandling::Relax {
357 string_parsing: true
358 }
359 ) =>
360 {
361 value
362 .as_str()
363 .unwrap()
364 .parse::<f64>()
365 .map_err(|_| create_error())?
366 .into()
367 }
368 (DataType::Float64, ValueType::F64) => {
369 value.try_as_f64().map_err(|_| create_error())?.into()
370 }
371 (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
373 Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
374 .map_err(|_| create_error())?
375 .into()
376 }
377 (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
378 Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
379 }
380
381 (DataType::Decimal, ValueType::F64) => {
382 Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
383 .map_err(|_| create_error())?
384 .into()
385 }
386 (DataType::Decimal, ValueType::String) => {
387 let str = value.as_str().unwrap();
388 match str {
390 "NAN" => ScalarImpl::Decimal(Decimal::NaN),
391 "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf),
392 "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf),
393 _ => {
394 ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?)
395 }
396 }
397 }
398 (DataType::Decimal, ValueType::Object) => {
399 let scale = value
402 .get("scale")
403 .ok_or_else(create_error)?
404 .as_i32()
405 .unwrap();
406 let value = value
407 .get("value")
408 .ok_or_else(create_error)?
409 .as_str()
410 .unwrap()
411 .as_bytes();
412 let unscaled = BigInt::from_signed_bytes_be(value);
413 let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
414 ScalarImpl::Decimal(Decimal::Normalized(decimal))
415 }
416 (
418 DataType::Date,
419 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
420 ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
421 .map_err(|_| create_error())?
422 .into(),
423 (DataType::Date, ValueType::String) => value
424 .as_str()
425 .unwrap()
426 .parse::<Date>()
427 .map_err(|_| create_error())?
428 .into(),
429 (DataType::Varchar, ValueType::String) => {
431 return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
432 }
433 (
434 DataType::Varchar,
435 ValueType::Bool
436 | ValueType::I64
437 | ValueType::I128
438 | ValueType::U64
439 | ValueType::U128
440 | ValueType::F64,
441 ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
442 value.to_string().into()
443 }
444 (
445 DataType::Varchar,
446 ValueType::Bool
447 | ValueType::I64
448 | ValueType::I128
449 | ValueType::U64
450 | ValueType::U128
451 | ValueType::F64
452 | ValueType::Array
453 | ValueType::Object,
454 ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
455 value.to_string().into()
456 }
457 (DataType::Time, ValueType::String) => value
459 .as_str()
460 .unwrap()
461 .parse::<Time>()
462 .map_err(|_| create_error())?
463 .into(),
464 (
465 DataType::Time,
466 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
467 ) => value
468 .as_i64()
469 .map(|i| match self.time_handling {
470 TimeHandling::Milli => Time::with_milli(i as u32),
471 TimeHandling::Micro => Time::with_micro(i as u64),
472 })
473 .unwrap()
474 .map_err(|_| create_error())?
475 .into(),
476 (DataType::Timestamp, ValueType::String) => value
478 .as_str()
479 .unwrap()
480 .parse::<Timestamp>()
481 .map_err(|_| create_error())?
482 .into(),
483 (
484 DataType::Timestamp,
485 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
486 ) => i64_to_timestamp(value.as_i64().unwrap())
487 .map_err(|_| create_error())?
488 .into(),
489 (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
491 TimestamptzHandling::UtcWithoutSuffix => value
492 .as_str()
493 .unwrap()
494 .parse::<Timestamp>()
495 .map(|naive_utc| {
496 Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
497 })
498 .map_err(|_| create_error())?
499 .into(),
500 _ => value
502 .as_str()
503 .unwrap()
504 .parse::<Timestamptz>()
505 .map_err(|_| create_error())?
506 .into(),
507 },
508 (
509 DataType::Timestamptz,
510 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
511 ) => value
512 .as_i64()
513 .and_then(|num| match self.timestamptz_handling {
514 TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
515 TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
516 TimestamptzHandling::Milli => Timestamptz::from_millis(num),
517 TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
519 })
520 .ok_or_else(create_error)?
521 .into(),
522 (DataType::Interval, ValueType::String) => value
524 .as_str()
525 .unwrap()
526 .parse::<Interval>()
527 .map_err(|_| create_error())?
528 .into(),
529 (DataType::Struct(struct_type_info), ValueType::Object) => {
531 let mut fields = Vec::with_capacity(struct_type_info.len());
534 for (field_name, field_type) in struct_type_info.iter() {
535 let field_value = json_object_get_case_insensitive(value, field_name)
536 .unwrap_or_else(|| {
537 let error = AccessError::Undefined {
538 name: field_name.to_owned(),
539 path: struct_type_info.to_string(), };
541 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
543 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
544 tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
545 }
546 &BorrowedValue::Static(simd_json::StaticNode::Null)
547 });
548 fields.push(
549 self.parse(field_value, field_type)
550 .map(|d| d.to_owned_datum())?,
551 );
552 }
553 StructValue::new(fields).into()
554 }
555
556 (DataType::Struct(_), ValueType::String)
559 if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
560 {
561 let mut value = value.as_str().unwrap().as_bytes().to_vec();
563 let value =
564 simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
565 return self
566 .parse(&value, type_expected)
567 .map(|d| d.to_owned_datum().into());
568 }
569
570 (DataType::List(item_type), ValueType::Array) => ListValue::new({
572 let array = value.as_array().unwrap();
573 let mut builder = item_type.create_array_builder(array.len());
574 for v in array {
575 let value = self.parse(v, item_type)?;
576 builder.append(value);
577 }
578 builder.finish()
579 })
580 .into(),
581
582 (DataType::Bytea, ValueType::String) => match self.bytea_handling {
584 ByteaHandling::Standard => str_to_bytea(value.as_str().unwrap())
585 .map_err(|_| create_error())?
586 .into(),
587 ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
588 .decode(value.as_str().unwrap())
589 .map_err(|_| create_error())?
590 .into_boxed_slice()
591 .into(),
592 },
593 (DataType::Jsonb, ValueType::String)
595 if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
596 {
597 JsonbVal::from_str(value.as_str().unwrap())
598 .map_err(|_| create_error())?
599 .into()
600 }
601 (DataType::Jsonb, _)
602 if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
603 {
604 let value: serde_json::Value =
605 value.clone().try_into().map_err(|_| create_error())?;
606 JsonbVal::from(value).into()
607 }
608 (
610 DataType::Int256,
611 ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
612 ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
613
614 (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
615 .map_err(|_| create_error())?
616 .into(),
617
618 (_expected, _got) => Err(create_error())?,
619 };
620 Ok(DatumCow::Owned(Some(v)))
621 }
622}
623
624pub struct JsonAccess<'a> {
625 value: BorrowedValue<'a>,
626 options: &'a JsonParseOptions,
627}
628
629impl<'a> JsonAccess<'a> {
630 pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
631 Self { value, options }
632 }
633
634 pub fn new(value: BorrowedValue<'a>) -> Self {
635 Self::new_with_options(value, &JsonParseOptions::DEFAULT)
636 }
637}
638
639impl Access for JsonAccess<'_> {
640 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
641 let mut value = &self.value;
642
643 for (idx, &key) in path.iter().enumerate() {
644 if let Some(sub_value) = if self.options.ignoring_keycase {
645 json_object_get_case_insensitive(value, key)
646 } else {
647 value.get(key)
648 } {
649 value = sub_value;
650 } else {
651 Err(AccessError::Undefined {
652 name: key.to_owned(),
653 path: path.iter().take(idx).join("."),
654 })?;
655 }
656 }
657
658 self.options.parse(value, type_expected)
659 }
660}
661
662fn json_object_get_case_insensitive<'b>(
666 v: &'b simd_json::BorrowedValue<'b>,
667 key: &str,
668) -> Option<&'b simd_json::BorrowedValue<'b>> {
669 let obj = v.as_object()?;
670 let value = obj.get(key);
671 if value.is_some() {
672 return value; }
674 for (k, v) in obj {
675 if k.eq_ignore_ascii_case(key) {
676 return Some(v);
677 }
678 }
679 None
680}