1use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25    DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26    ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31    TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42    Standard,
43    Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48    Milli,
49    Micro,
50}
51
52#[derive(Clone, Copy, Debug)]
53pub enum BigintUnsignedHandlingMode {
54    Long,
56    Precise,
58}
59
60#[derive(Clone, Debug)]
61pub enum TimestamptzHandling {
62    UtcString,
64    UtcWithoutSuffix,
66    Milli,
68    Micro,
70    GuessNumberUnit,
76}
77
78impl TimestamptzHandling {
79    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
80
81    pub fn from_options(
82        options: &std::collections::BTreeMap<String, String>,
83    ) -> Result<Option<Self>, InvalidOptionError> {
84        let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
85            Some("utc_string") => Self::UtcString,
86            Some("utc_without_suffix") => Self::UtcWithoutSuffix,
87            Some("micro") => Self::Micro,
88            Some("milli") => Self::Milli,
89            Some("guess_number_unit") => Self::GuessNumberUnit,
90            Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
91            None => return Ok(None),
92        };
93        Ok(Some(mode))
94    }
95}
96
97#[derive(Clone, Debug)]
98pub enum TimestampHandling {
99    Milli,
100    GuessNumberUnit,
101}
102
103#[derive(Clone, Debug)]
104pub enum JsonValueHandling {
105    AsValue,
106    AsString,
107}
108#[derive(Clone, Debug)]
109pub enum NumericHandling {
110    Strict,
111    Relax {
113        string_parsing: bool,
115    },
116}
117#[derive(Clone, Debug)]
118pub enum BooleanHandling {
119    Strict,
120    Relax {
122        string_parsing: bool,
124        string_integer_parsing: bool,
126    },
127}
128
129#[derive(Clone, Debug)]
130pub enum VarcharHandling {
131    Strict,
133    OnlyPrimaryTypes,
135    AllTypes,
137}
138
139#[derive(Clone, Debug)]
140pub enum StructHandling {
141    Strict,
143    AllowJsonString,
146}
147
148#[derive(Clone, Debug)]
149pub struct JsonParseOptions {
150    pub bytea_handling: ByteaHandling,
151    pub time_handling: TimeHandling,
152    pub timestamp_handling: TimestampHandling,
153    pub timestamptz_handling: TimestamptzHandling,
154    pub json_value_handling: JsonValueHandling,
155    pub numeric_handling: NumericHandling,
156    pub boolean_handling: BooleanHandling,
157    pub varchar_handling: VarcharHandling,
158    pub struct_handling: StructHandling,
159    pub bigint_unsigned_handling: BigintUnsignedHandlingMode,
160    pub ignoring_keycase: bool,
161    pub handle_toast_columns: bool,
162}
163
164impl Default for JsonParseOptions {
165    fn default() -> Self {
166        Self::DEFAULT.clone()
167    }
168}
169
170impl JsonParseOptions {
171    pub const CANAL: JsonParseOptions = JsonParseOptions {
172        bytea_handling: ByteaHandling::Standard,
173        time_handling: TimeHandling::Micro,
174        timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
177        numeric_handling: NumericHandling::Relax {
178            string_parsing: true,
179        },
180        boolean_handling: BooleanHandling::Relax {
181            string_parsing: true,
182            string_integer_parsing: true,
183        },
184        varchar_handling: VarcharHandling::Strict,
185        struct_handling: StructHandling::Strict,
186        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, ignoring_keycase: true,
188        handle_toast_columns: false,
189    };
190    pub const DEFAULT: JsonParseOptions = JsonParseOptions {
191        bytea_handling: ByteaHandling::Standard,
192        time_handling: TimeHandling::Micro,
193        timestamp_handling: TimestampHandling::GuessNumberUnit, timestamptz_handling: TimestamptzHandling::GuessNumberUnit, json_value_handling: JsonValueHandling::AsValue,
196        numeric_handling: NumericHandling::Relax {
197            string_parsing: true,
198        },
199        boolean_handling: BooleanHandling::Strict,
200        varchar_handling: VarcharHandling::OnlyPrimaryTypes,
201        struct_handling: StructHandling::AllowJsonString,
202        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, ignoring_keycase: true,
204        handle_toast_columns: false,
205    };
206
207    pub fn new_for_debezium(
208        timestamptz_handling: TimestamptzHandling,
209        timestamp_handling: TimestampHandling,
210        time_handling: TimeHandling,
211        bigint_unsigned_handling: BigintUnsignedHandlingMode,
212        handle_toast_columns: bool,
213    ) -> Self {
214        Self {
215            bytea_handling: ByteaHandling::Base64,
216            time_handling,
217            timestamp_handling,
218            timestamptz_handling,
219            json_value_handling: JsonValueHandling::AsString,
220            numeric_handling: NumericHandling::Relax {
221                string_parsing: false,
222            },
223            boolean_handling: BooleanHandling::Relax {
224                string_parsing: false,
225                string_integer_parsing: false,
226            },
227            varchar_handling: VarcharHandling::Strict,
228            struct_handling: StructHandling::Strict,
229            bigint_unsigned_handling,
230            ignoring_keycase: true,
231            handle_toast_columns,
232        }
233    }
234
235    pub fn parse<'a>(
236        &self,
237        value: &'a BorrowedValue<'a>,
238        type_expected: &DataType,
239    ) -> AccessResult<DatumCow<'a>> {
240        let create_error = || AccessError::TypeError {
241            expected: format!("{:?}", type_expected),
242            got: value.value_type().to_string(),
243            value: value.to_string(),
244        };
245        let v: ScalarImpl = match (type_expected, value.value_type()) {
246            (_, ValueType::Null) => return Ok(DatumCow::NULL),
247            (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
249
250            (
251                DataType::Boolean,
252                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
253            ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
254                && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
255            {
256                (value.as_i64() == Some(1i64)).into()
257            }
258
259            (DataType::Boolean, ValueType::String)
260                if matches!(
261                    self.boolean_handling,
262                    BooleanHandling::Relax {
263                        string_parsing: true,
264                        ..
265                    }
266                ) =>
267            {
268                match value.as_str().unwrap().to_lowercase().as_str() {
269                    "true" => true.into(),
270                    "false" => false.into(),
271                    c @ ("1" | "0")
272                        if matches!(
273                            self.boolean_handling,
274                            BooleanHandling::Relax {
275                                string_parsing: true,
276                                string_integer_parsing: true
277                            }
278                        ) =>
279                    {
280                        if c == "1" {
281                            true.into()
282                        } else {
283                            false.into()
284                        }
285                    }
286                    _ => Err(create_error())?,
287                }
288            }
289            (
291                DataType::Int16,
292                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
293            ) => value.try_as_i16().map_err(|_| create_error())?.into(),
294
295            (DataType::Int16, ValueType::String)
296                if matches!(
297                    self.numeric_handling,
298                    NumericHandling::Relax {
299                        string_parsing: true
300                    }
301                ) =>
302            {
303                value
304                    .as_str()
305                    .unwrap()
306                    .parse::<i16>()
307                    .map_err(|_| create_error())?
308                    .into()
309            }
310            (
312                DataType::Int32,
313                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
314            ) => value.try_as_i32().map_err(|_| create_error())?.into(),
315
316            (DataType::Int32, ValueType::String)
317                if matches!(
318                    self.numeric_handling,
319                    NumericHandling::Relax {
320                        string_parsing: true
321                    }
322                ) =>
323            {
324                value
325                    .as_str()
326                    .unwrap()
327                    .parse::<i32>()
328                    .map_err(|_| create_error())?
329                    .into()
330            }
331            (
333                DataType::Int64,
334                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
335            ) => value.try_as_i64().map_err(|_| create_error())?.into(),
336
337            (DataType::Int64, ValueType::String)
338                if matches!(
339                    self.numeric_handling,
340                    NumericHandling::Relax {
341                        string_parsing: true
342                    }
343                ) =>
344            {
345                value
346                    .as_str()
347                    .unwrap()
348                    .parse::<i64>()
349                    .map_err(|_| create_error())?
350                    .into()
351            }
352            (
354                DataType::Float32,
355                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
356            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
357                (value.try_as_i64().map_err(|_| create_error())? as f32).into()
358            }
359            (DataType::Float32, ValueType::String)
360                if matches!(
361                    self.numeric_handling,
362                    NumericHandling::Relax {
363                        string_parsing: true
364                    }
365                ) =>
366            {
367                value
368                    .as_str()
369                    .unwrap()
370                    .parse::<f32>()
371                    .map_err(|_| create_error())?
372                    .into()
373            }
374            (DataType::Float32, ValueType::F64) => {
375                value.try_as_f32().map_err(|_| create_error())?.into()
376            }
377            (
379                DataType::Float64,
380                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
381            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
382                (value.try_as_i64().map_err(|_| create_error())? as f64).into()
383            }
384            (DataType::Float64, ValueType::String)
385                if matches!(
386                    self.numeric_handling,
387                    NumericHandling::Relax {
388                        string_parsing: true
389                    }
390                ) =>
391            {
392                value
393                    .as_str()
394                    .unwrap()
395                    .parse::<f64>()
396                    .map_err(|_| create_error())?
397                    .into()
398            }
399            (DataType::Float64, ValueType::F64) => {
400                value.try_as_f64().map_err(|_| create_error())?.into()
401            }
402            (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
404                Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
405                    .map_err(|_| create_error())?
406                    .into()
407            }
408            (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
409                let i64_val = value.try_as_i64().map_err(|_| create_error())?;
410                Decimal::from(i64_val).into()
411            }
412            (DataType::Decimal, ValueType::String) => {
413                let str_val = value.as_str().unwrap();
414                match str_val {
416                    "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))),
417                    "POSITIVE_INFINITY" => {
418                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
419                            Decimal::PositiveInf,
420                        ))));
421                    }
422                    "NEGATIVE_INFINITY" => {
423                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
424                            Decimal::NegativeInf,
425                        ))));
426                    }
427                    _ => {}
428                }
429
430                Decimal::from_str(str_val)
431                    .or_else(|_err| {
432                        try_base64_decode_decimal(
433                            str_val,
434                            self.bigint_unsigned_handling,
435                            create_error,
436                        )
437                    })?
438                    .into()
439            }
440
441            (DataType::Decimal, ValueType::F64) => {
442                Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
443                    .map_err(|_| create_error())?
444                    .into()
445            }
446            (DataType::Decimal, ValueType::Object) => {
447                let scale = value
450                    .get("scale")
451                    .ok_or_else(create_error)?
452                    .as_i32()
453                    .unwrap();
454                let value = value
455                    .get("value")
456                    .ok_or_else(create_error)?
457                    .as_str()
458                    .unwrap()
459                    .as_bytes();
460                let unscaled = BigInt::from_signed_bytes_be(value);
461                let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
462                ScalarImpl::Decimal(Decimal::Normalized(decimal))
463            }
464            (
466                DataType::Date,
467                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
468            ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
469                .map_err(|_| create_error())?
470                .into(),
471            (DataType::Date, ValueType::String) => value
472                .as_str()
473                .unwrap()
474                .parse::<Date>()
475                .map_err(|_| create_error())?
476                .into(),
477            (DataType::Varchar, ValueType::String) => {
479                return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
480            }
481            (
482                DataType::Varchar,
483                ValueType::Bool
484                | ValueType::I64
485                | ValueType::I128
486                | ValueType::U64
487                | ValueType::U128
488                | ValueType::F64,
489            ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
490                value.to_string().into()
491            }
492            (
493                DataType::Varchar,
494                ValueType::Bool
495                | ValueType::I64
496                | ValueType::I128
497                | ValueType::U64
498                | ValueType::U128
499                | ValueType::F64
500                | ValueType::Array
501                | ValueType::Object,
502            ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
503                value.to_string().into()
504            }
505            (DataType::Time, ValueType::String) => value
507                .as_str()
508                .unwrap()
509                .parse::<Time>()
510                .map_err(|_| create_error())?
511                .into(),
512            (
513                DataType::Time,
514                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
515            ) => value
516                .as_i64()
517                .map(|i| match self.time_handling {
518                    TimeHandling::Milli => Time::with_milli(i as u32),
519                    TimeHandling::Micro => Time::with_micro(i as u64),
520                })
521                .unwrap()
522                .map_err(|_| create_error())?
523                .into(),
524            (DataType::Timestamp, ValueType::String) => value
526                .as_str()
527                .unwrap()
528                .parse::<Timestamp>()
529                .map_err(|_| create_error())?
530                .into(),
531            (
532                DataType::Timestamp,
533                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
534            ) => {
535                match self.timestamp_handling {
536                    TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
539                        .map_err(|_| create_error())?
540                        .into(),
541                    TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
542                        .map_err(|_| create_error())?
543                        .into(),
544                }
545            }
546            (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
548                TimestamptzHandling::UtcWithoutSuffix => value
549                    .as_str()
550                    .unwrap()
551                    .parse::<Timestamp>()
552                    .map(|naive_utc| {
553                        Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
554                    })
555                    .map_err(|_| create_error())?
556                    .into(),
557                _ => value
559                    .as_str()
560                    .unwrap()
561                    .parse::<Timestamptz>()
562                    .map_err(|_| create_error())?
563                    .into(),
564            },
565            (
566                DataType::Timestamptz,
567                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
568            ) => value
569                .as_i64()
570                .and_then(|num| match self.timestamptz_handling {
571                    TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
572                    TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
573                    TimestamptzHandling::Milli => Timestamptz::from_millis(num),
574                    TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
576                })
577                .ok_or_else(create_error)?
578                .into(),
579            (DataType::Interval, ValueType::String) => value
581                .as_str()
582                .unwrap()
583                .parse::<Interval>()
584                .map_err(|_| create_error())?
585                .into(),
586            (DataType::Struct(struct_type_info), ValueType::Object) => {
588                let mut fields = Vec::with_capacity(struct_type_info.len());
591                for (field_name, field_type) in struct_type_info.iter() {
592                    let field_value = json_object_get_case_insensitive(value, field_name)
593                            .unwrap_or_else(|| {
594                                let error = AccessError::Undefined {
595                                    name: field_name.to_owned(),
596                                    path: struct_type_info.to_string(), };
598                                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =  LazyLock::new(LogSuppresser::default);
600                                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
601                                    tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
602                                }
603                                &BorrowedValue::Static(simd_json::StaticNode::Null)
604                            });
605                    fields.push(
606                        self.parse(field_value, field_type)
607                            .map(|d| d.to_owned_datum())?,
608                    );
609                }
610                StructValue::new(fields).into()
611            }
612
613            (DataType::Struct(_), ValueType::String)
616                if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
617            {
618                let mut value = value.as_str().unwrap().as_bytes().to_vec();
620                let value =
621                    simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
622                return self
623                    .parse(&value, type_expected)
624                    .map(|d| d.to_owned_datum().into());
625            }
626
627            (DataType::List(list_type), ValueType::Array) => ListValue::new({
629                let item_type = list_type.elem();
630                let array = value.as_array().unwrap();
631                let mut builder = item_type.create_array_builder(array.len());
632                for v in array {
633                    let value = self.parse(v, item_type)?;
634                    builder.append(value);
635                }
636                builder.finish()
637            })
638            .into(),
639
640            (DataType::Bytea, ValueType::String) => {
642                let value_str = value.as_str().unwrap();
643
644                match self.bytea_handling {
645                    ByteaHandling::Standard => {
646                        str_to_bytea(value_str).map_err(|_| create_error())?.into()
647                    }
648                    ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
649                        .decode(value_str)
650                        .map_err(|_| create_error())?
651                        .into_boxed_slice()
652                        .into(),
653                }
654            }
655            (DataType::Jsonb, ValueType::String)
657                if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
658            {
659                match self.handle_toast_columns {
663                    true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
664                        .map_err(|_| create_error())?
665                        .into(),
666                    false => JsonbVal::from_str(value.as_str().unwrap())
667                        .map_err(|_| create_error())?
668                        .into(),
669                }
670            }
671            (DataType::Jsonb, _)
672                if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
673            {
674                let value: serde_json::Value =
675                    value.clone().try_into().map_err(|_| create_error())?;
676                JsonbVal::from(value).into()
677            }
678            (
680                DataType::Int256,
681                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
682            ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
683
684            (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
685                .map_err(|_| create_error())?
686                .into(),
687
688            (_expected, _got) => Err(create_error())?,
689        };
690        Ok(DatumCow::Owned(Some(v)))
691    }
692}
693
694fn try_base64_decode_decimal(
702    str_val: &str,
703    bigint_unsigned_handling: BigintUnsignedHandlingMode,
704    create_error: impl Fn() -> AccessError,
705) -> Result<Decimal, AccessError> {
706    match bigint_unsigned_handling {
707        BigintUnsignedHandlingMode::Precise => {
708            let value = base64::engine::general_purpose::STANDARD
713                .decode(str_val)
714                .map_err(|_| create_error())?;
715            let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value);
716            Decimal::from_str(&unscaled.to_string()).map_err(|_| create_error())
717        }
718        BigintUnsignedHandlingMode::Long => {
719            Err(create_error())
721        }
722    }
723}
724
725pub struct JsonAccess<'a> {
726    value: BorrowedValue<'a>,
727    options: &'a JsonParseOptions,
728}
729
730impl<'a> JsonAccess<'a> {
731    pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
732        Self { value, options }
733    }
734
735    pub fn new(value: BorrowedValue<'a>) -> Self {
736        Self::new_with_options(value, &JsonParseOptions::DEFAULT)
737    }
738}
739
740impl Access for JsonAccess<'_> {
741    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
742        let mut value = &self.value;
743
744        for (idx, &key) in path.iter().enumerate() {
745            if let Some(sub_value) = if self.options.ignoring_keycase {
746                json_object_get_case_insensitive(value, key)
747            } else {
748                value.get(key)
749            } {
750                value = sub_value;
751            } else {
752                Err(AccessError::Undefined {
753                    name: key.to_owned(),
754                    path: path.iter().take(idx).join("."),
755                })?;
756            }
757        }
758
759        self.options.parse(value, type_expected)
760    }
761}
762
763fn json_object_get_case_insensitive<'b>(
767    v: &'b simd_json::BorrowedValue<'b>,
768    key: &str,
769) -> Option<&'b simd_json::BorrowedValue<'b>> {
770    let obj = v.as_object()?;
771    let value = obj.get(key);
772    if value.is_some() {
773        return value; }
775    for (k, v) in obj {
776        if k.eq_ignore_ascii_case(key) {
777            return Some(v);
778        }
779    }
780    None
781}