risingwave_connector/parser/unified/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25    DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26    ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31    TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42    Standard,
43    // debezium converts postgres bytea to base64 format
44    Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48    Milli,
49    Micro,
50}
51
52#[derive(Clone, Debug)]
53pub enum TimestamptzHandling {
54    /// `"2024-04-11T02:00:00.123456Z"`
55    UtcString,
56    /// `"2024-04-11 02:00:00.123456"`
57    UtcWithoutSuffix,
58    /// `1712800800123`
59    Milli,
60    /// `1712800800123456`
61    Micro,
62    /// Both `1712800800123` (ms) and `1712800800123456` (us) maps to `2024-04-11`.
63    ///
64    /// Only works for `[1973-03-03 09:46:40, 5138-11-16 09:46:40)`.
65    ///
66    /// This option is backward compatible.
67    GuessNumberUnit,
68}
69
70impl TimestamptzHandling {
71    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
72
73    pub fn from_options(
74        options: &std::collections::BTreeMap<String, String>,
75    ) -> Result<Option<Self>, InvalidOptionError> {
76        let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
77            Some("utc_string") => Self::UtcString,
78            Some("utc_without_suffix") => Self::UtcWithoutSuffix,
79            Some("micro") => Self::Micro,
80            Some("milli") => Self::Milli,
81            Some("guess_number_unit") => Self::GuessNumberUnit,
82            Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
83            None => return Ok(None),
84        };
85        Ok(Some(mode))
86    }
87}
88
89#[derive(Clone, Debug)]
90pub enum JsonValueHandling {
91    AsValue,
92    AsString,
93}
94#[derive(Clone, Debug)]
95pub enum NumericHandling {
96    Strict,
97    // should integer be parsed to float
98    Relax {
99        // should "3.14" be parsed to 3.14 in float
100        string_parsing: bool,
101    },
102}
103#[derive(Clone, Debug)]
104pub enum BooleanHandling {
105    Strict,
106    // should integer 1,0 be parsed to boolean (debezium)
107    Relax {
108        // should "True" "False" be parsed to true or false in boolean
109        string_parsing: bool,
110        // should string "1" "0" be paesed to boolean (cannal + mysql)
111        string_integer_parsing: bool,
112    },
113}
114
115#[derive(Clone, Debug)]
116pub enum VarcharHandling {
117    // do not allow other types cast to varchar
118    Strict,
119    // allow Json Value (Null, Bool, I64, I128, U64, U128, F64) cast to varchar
120    OnlyPrimaryTypes,
121    // allow all type cast to varchar (inc. Array, Object)
122    AllTypes,
123}
124
125#[derive(Clone, Debug)]
126pub enum StructHandling {
127    // only allow object parsed to struct
128    Strict,
129    // allow string containing a serialized json object (like "{\"a\": 1, \"b\": 2}") parsed to
130    // struct
131    AllowJsonString,
132}
133
134#[derive(Clone, Debug)]
135pub struct JsonParseOptions {
136    pub bytea_handling: ByteaHandling,
137    pub time_handling: TimeHandling,
138    pub timestamptz_handling: TimestamptzHandling,
139    pub json_value_handling: JsonValueHandling,
140    pub numeric_handling: NumericHandling,
141    pub boolean_handling: BooleanHandling,
142    pub varchar_handling: VarcharHandling,
143    pub struct_handling: StructHandling,
144    pub ignoring_keycase: bool,
145}
146
147impl Default for JsonParseOptions {
148    fn default() -> Self {
149        Self::DEFAULT.clone()
150    }
151}
152
153impl JsonParseOptions {
154    pub const CANAL: JsonParseOptions = JsonParseOptions {
155        bytea_handling: ByteaHandling::Standard,
156        time_handling: TimeHandling::Micro,
157        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
158        json_value_handling: JsonValueHandling::AsValue,
159        numeric_handling: NumericHandling::Relax {
160            string_parsing: true,
161        },
162        boolean_handling: BooleanHandling::Relax {
163            string_parsing: true,
164            string_integer_parsing: true,
165        },
166        varchar_handling: VarcharHandling::Strict,
167        struct_handling: StructHandling::Strict,
168        ignoring_keycase: true,
169    };
170    pub const DEFAULT: JsonParseOptions = JsonParseOptions {
171        bytea_handling: ByteaHandling::Standard,
172        time_handling: TimeHandling::Micro,
173        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
174        json_value_handling: JsonValueHandling::AsValue,
175        numeric_handling: NumericHandling::Relax {
176            string_parsing: true,
177        },
178        boolean_handling: BooleanHandling::Strict,
179        varchar_handling: VarcharHandling::OnlyPrimaryTypes,
180        struct_handling: StructHandling::AllowJsonString,
181        ignoring_keycase: true,
182    };
183
184    pub fn new_for_debezium(timestamptz_handling: TimestamptzHandling) -> Self {
185        Self {
186            bytea_handling: ByteaHandling::Base64,
187            time_handling: TimeHandling::Micro,
188            timestamptz_handling,
189            json_value_handling: JsonValueHandling::AsString,
190            numeric_handling: NumericHandling::Relax {
191                string_parsing: false,
192            },
193            boolean_handling: BooleanHandling::Relax {
194                string_parsing: false,
195                string_integer_parsing: false,
196            },
197            varchar_handling: VarcharHandling::Strict,
198            struct_handling: StructHandling::Strict,
199            ignoring_keycase: true,
200        }
201    }
202
203    pub fn parse<'a>(
204        &self,
205        value: &'a BorrowedValue<'a>,
206        type_expected: &DataType,
207    ) -> AccessResult<DatumCow<'a>> {
208        let create_error = || AccessError::TypeError {
209            expected: format!("{:?}", type_expected),
210            got: value.value_type().to_string(),
211            value: value.to_string(),
212        };
213
214        let v: ScalarImpl = match (type_expected, value.value_type()) {
215            (_, ValueType::Null) => return Ok(DatumCow::NULL),
216            // ---- Boolean -----
217            (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
218
219            (
220                DataType::Boolean,
221                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
222            ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
223                && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
224            {
225                (value.as_i64() == Some(1i64)).into()
226            }
227
228            (DataType::Boolean, ValueType::String)
229                if matches!(
230                    self.boolean_handling,
231                    BooleanHandling::Relax {
232                        string_parsing: true,
233                        ..
234                    }
235                ) =>
236            {
237                match value.as_str().unwrap().to_lowercase().as_str() {
238                    "true" => true.into(),
239                    "false" => false.into(),
240                    c @ ("1" | "0")
241                        if matches!(
242                            self.boolean_handling,
243                            BooleanHandling::Relax {
244                                string_parsing: true,
245                                string_integer_parsing: true
246                            }
247                        ) =>
248                    {
249                        if c == "1" {
250                            true.into()
251                        } else {
252                            false.into()
253                        }
254                    }
255                    _ => Err(create_error())?,
256                }
257            }
258            // ---- Int16 -----
259            (
260                DataType::Int16,
261                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
262            ) => value.try_as_i16().map_err(|_| create_error())?.into(),
263
264            (DataType::Int16, ValueType::String)
265                if matches!(
266                    self.numeric_handling,
267                    NumericHandling::Relax {
268                        string_parsing: true
269                    }
270                ) =>
271            {
272                value
273                    .as_str()
274                    .unwrap()
275                    .parse::<i16>()
276                    .map_err(|_| create_error())?
277                    .into()
278            }
279            // ---- Int32 -----
280            (
281                DataType::Int32,
282                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
283            ) => value.try_as_i32().map_err(|_| create_error())?.into(),
284
285            (DataType::Int32, ValueType::String)
286                if matches!(
287                    self.numeric_handling,
288                    NumericHandling::Relax {
289                        string_parsing: true
290                    }
291                ) =>
292            {
293                value
294                    .as_str()
295                    .unwrap()
296                    .parse::<i32>()
297                    .map_err(|_| create_error())?
298                    .into()
299            }
300            // ---- Int64 -----
301            (
302                DataType::Int64,
303                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
304            ) => value.try_as_i64().map_err(|_| create_error())?.into(),
305
306            (DataType::Int64, ValueType::String)
307                if matches!(
308                    self.numeric_handling,
309                    NumericHandling::Relax {
310                        string_parsing: true
311                    }
312                ) =>
313            {
314                value
315                    .as_str()
316                    .unwrap()
317                    .parse::<i64>()
318                    .map_err(|_| create_error())?
319                    .into()
320            }
321            // ---- Float32 -----
322            (
323                DataType::Float32,
324                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
325            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
326                (value.try_as_i64().map_err(|_| create_error())? as f32).into()
327            }
328            (DataType::Float32, ValueType::String)
329                if matches!(
330                    self.numeric_handling,
331                    NumericHandling::Relax {
332                        string_parsing: true
333                    }
334                ) =>
335            {
336                value
337                    .as_str()
338                    .unwrap()
339                    .parse::<f32>()
340                    .map_err(|_| create_error())?
341                    .into()
342            }
343            (DataType::Float32, ValueType::F64) => {
344                value.try_as_f32().map_err(|_| create_error())?.into()
345            }
346            // ---- Float64 -----
347            (
348                DataType::Float64,
349                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
350            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
351                (value.try_as_i64().map_err(|_| create_error())? as f64).into()
352            }
353            (DataType::Float64, ValueType::String)
354                if matches!(
355                    self.numeric_handling,
356                    NumericHandling::Relax {
357                        string_parsing: true
358                    }
359                ) =>
360            {
361                value
362                    .as_str()
363                    .unwrap()
364                    .parse::<f64>()
365                    .map_err(|_| create_error())?
366                    .into()
367            }
368            (DataType::Float64, ValueType::F64) => {
369                value.try_as_f64().map_err(|_| create_error())?.into()
370            }
371            // ---- Decimal -----
372            (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
373                Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
374                    .map_err(|_| create_error())?
375                    .into()
376            }
377            (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
378                Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
379            }
380
381            (DataType::Decimal, ValueType::F64) => {
382                Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
383                    .map_err(|_| create_error())?
384                    .into()
385            }
386            (DataType::Decimal, ValueType::String) => {
387                let str = value.as_str().unwrap();
388                // the following values are special string generated by Debezium and should be handled separately
389                match str {
390                    "NAN" => ScalarImpl::Decimal(Decimal::NaN),
391                    "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf),
392                    "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf),
393                    _ => {
394                        ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?)
395                    }
396                }
397            }
398            (DataType::Decimal, ValueType::Object) => {
399                // ref https://github.com/risingwavelabs/risingwave/issues/10628
400                // handle debezium json (variable scale): {"scale": int, "value": bytes}
401                let scale = value
402                    .get("scale")
403                    .ok_or_else(create_error)?
404                    .as_i32()
405                    .unwrap();
406                let value = value
407                    .get("value")
408                    .ok_or_else(create_error)?
409                    .as_str()
410                    .unwrap()
411                    .as_bytes();
412                let unscaled = BigInt::from_signed_bytes_be(value);
413                let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
414                ScalarImpl::Decimal(Decimal::Normalized(decimal))
415            }
416            // ---- Date -----
417            (
418                DataType::Date,
419                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
420            ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
421                .map_err(|_| create_error())?
422                .into(),
423            (DataType::Date, ValueType::String) => value
424                .as_str()
425                .unwrap()
426                .parse::<Date>()
427                .map_err(|_| create_error())?
428                .into(),
429            // ---- Varchar -----
430            (DataType::Varchar, ValueType::String) => {
431                return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
432            }
433            (
434                DataType::Varchar,
435                ValueType::Bool
436                | ValueType::I64
437                | ValueType::I128
438                | ValueType::U64
439                | ValueType::U128
440                | ValueType::F64,
441            ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
442                value.to_string().into()
443            }
444            (
445                DataType::Varchar,
446                ValueType::Bool
447                | ValueType::I64
448                | ValueType::I128
449                | ValueType::U64
450                | ValueType::U128
451                | ValueType::F64
452                | ValueType::Array
453                | ValueType::Object,
454            ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
455                value.to_string().into()
456            }
457            // ---- Time -----
458            (DataType::Time, ValueType::String) => value
459                .as_str()
460                .unwrap()
461                .parse::<Time>()
462                .map_err(|_| create_error())?
463                .into(),
464            (
465                DataType::Time,
466                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
467            ) => value
468                .as_i64()
469                .map(|i| match self.time_handling {
470                    TimeHandling::Milli => Time::with_milli(i as u32),
471                    TimeHandling::Micro => Time::with_micro(i as u64),
472                })
473                .unwrap()
474                .map_err(|_| create_error())?
475                .into(),
476            // ---- Timestamp -----
477            (DataType::Timestamp, ValueType::String) => value
478                .as_str()
479                .unwrap()
480                .parse::<Timestamp>()
481                .map_err(|_| create_error())?
482                .into(),
483            (
484                DataType::Timestamp,
485                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
486            ) => i64_to_timestamp(value.as_i64().unwrap())
487                .map_err(|_| create_error())?
488                .into(),
489            // ---- Timestamptz -----
490            (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
491                TimestamptzHandling::UtcWithoutSuffix => value
492                    .as_str()
493                    .unwrap()
494                    .parse::<Timestamp>()
495                    .map(|naive_utc| {
496                        Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
497                    })
498                    .map_err(|_| create_error())?
499                    .into(),
500                // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
501                _ => value
502                    .as_str()
503                    .unwrap()
504                    .parse::<Timestamptz>()
505                    .map_err(|_| create_error())?
506                    .into(),
507            },
508            (
509                DataType::Timestamptz,
510                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
511            ) => value
512                .as_i64()
513                .and_then(|num| match self.timestamptz_handling {
514                    TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
515                    TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
516                    TimestamptzHandling::Milli => Timestamptz::from_millis(num),
517                    // When explicitly requested string format, number without units are rejected.
518                    TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
519                })
520                .ok_or_else(create_error)?
521                .into(),
522            // ---- Interval -----
523            (DataType::Interval, ValueType::String) => value
524                .as_str()
525                .unwrap()
526                .parse::<Interval>()
527                .map_err(|_| create_error())?
528                .into(),
529            // ---- Struct -----
530            (DataType::Struct(struct_type_info), ValueType::Object) => {
531                // Collecting into a Result<Vec<_>> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead.
532                // https://github.com/rust-lang/rust/issues/48994
533                let mut fields = Vec::with_capacity(struct_type_info.len());
534                for (field_name, field_type) in struct_type_info.iter() {
535                    let field_value = json_object_get_case_insensitive(value, field_name)
536                            .unwrap_or_else(|| {
537                                let error = AccessError::Undefined {
538                                    name: field_name.to_owned(),
539                                    path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
540                                };
541                                // TODO: is it possible to unify the logging with the one in `do_action`?
542                                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =  LazyLock::new(LogSuppresser::default);
543                                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
544                                    tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
545                                }
546                                &BorrowedValue::Static(simd_json::StaticNode::Null)
547                            });
548                    fields.push(
549                        self.parse(field_value, field_type)
550                            .map(|d| d.to_owned_datum())?,
551                    );
552                }
553                StructValue::new(fields).into()
554            }
555
556            // String containing json object, e.g. "{\"a\": 1, \"b\": 2}"
557            // Try to parse it as json object.
558            (DataType::Struct(_), ValueType::String)
559                if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
560            {
561                // TODO: avoid copy by accepting `&mut BorrowedValue` in `parse` method.
562                let mut value = value.as_str().unwrap().as_bytes().to_vec();
563                let value =
564                    simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
565                return self
566                    .parse(&value, type_expected)
567                    .map(|d| d.to_owned_datum().into());
568            }
569
570            // ---- List -----
571            (DataType::List(item_type), ValueType::Array) => ListValue::new({
572                let array = value.as_array().unwrap();
573                let mut builder = item_type.create_array_builder(array.len());
574                for v in array {
575                    let value = self.parse(v, item_type)?;
576                    builder.append(value);
577                }
578                builder.finish()
579            })
580            .into(),
581
582            // ---- Bytea -----
583            (DataType::Bytea, ValueType::String) => match self.bytea_handling {
584                ByteaHandling::Standard => str_to_bytea(value.as_str().unwrap())
585                    .map_err(|_| create_error())?
586                    .into(),
587                ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
588                    .decode(value.as_str().unwrap())
589                    .map_err(|_| create_error())?
590                    .into_boxed_slice()
591                    .into(),
592            },
593            // ---- Jsonb -----
594            (DataType::Jsonb, ValueType::String)
595                if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
596            {
597                JsonbVal::from_str(value.as_str().unwrap())
598                    .map_err(|_| create_error())?
599                    .into()
600            }
601            (DataType::Jsonb, _)
602                if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
603            {
604                let value: serde_json::Value =
605                    value.clone().try_into().map_err(|_| create_error())?;
606                JsonbVal::from(value).into()
607            }
608            // ---- Int256 -----
609            (
610                DataType::Int256,
611                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
612            ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
613
614            (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
615                .map_err(|_| create_error())?
616                .into(),
617
618            (_expected, _got) => Err(create_error())?,
619        };
620        Ok(DatumCow::Owned(Some(v)))
621    }
622}
623
624pub struct JsonAccess<'a> {
625    value: BorrowedValue<'a>,
626    options: &'a JsonParseOptions,
627}
628
629impl<'a> JsonAccess<'a> {
630    pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
631        Self { value, options }
632    }
633
634    pub fn new(value: BorrowedValue<'a>) -> Self {
635        Self::new_with_options(value, &JsonParseOptions::DEFAULT)
636    }
637}
638
639impl Access for JsonAccess<'_> {
640    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
641        let mut value = &self.value;
642
643        for (idx, &key) in path.iter().enumerate() {
644            if let Some(sub_value) = if self.options.ignoring_keycase {
645                json_object_get_case_insensitive(value, key)
646            } else {
647                value.get(key)
648            } {
649                value = sub_value;
650            } else {
651                Err(AccessError::Undefined {
652                    name: key.to_owned(),
653                    path: path.iter().take(idx).join("."),
654                })?;
655            }
656        }
657
658        self.options.parse(value, type_expected)
659    }
660}
661
662/// Get a value from a json object by key, case insensitive.
663///
664/// Returns `None` if the given json value is not an object, or the key is not found.
665fn json_object_get_case_insensitive<'b>(
666    v: &'b simd_json::BorrowedValue<'b>,
667    key: &str,
668) -> Option<&'b simd_json::BorrowedValue<'b>> {
669    let obj = v.as_object()?;
670    let value = obj.get(key);
671    if value.is_some() {
672        return value; // fast path
673    }
674    for (k, v) in obj {
675        if k.eq_ignore_ascii_case(key) {
676            return Some(v);
677        }
678    }
679    None
680}