risingwave_connector/parser/unified/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25    DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26    ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31    TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42    Standard,
43    // debezium converts postgres bytea to base64 format
44    Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48    Milli,
49    Micro,
50}
51
52#[derive(Clone, Copy, Debug)]
53pub enum BigintUnsignedHandlingMode {
54    /// Convert unsigned bigint to signed bigint (default)
55    Long,
56    /// Use base64-encoded decimal for unsigned bigint (Debezium precise mode)
57    Precise,
58}
59
60#[derive(Clone, Debug)]
61pub enum TimestamptzHandling {
62    /// `"2024-04-11T02:00:00.123456Z"`
63    UtcString,
64    /// `"2024-04-11 02:00:00.123456"`
65    UtcWithoutSuffix,
66    /// `1712800800123`
67    Milli,
68    /// `1712800800123456`
69    Micro,
70    /// Both `1712800800123` (ms) and `1712800800123456` (us) maps to `2024-04-11`.
71    ///
72    /// Only works for `[1973-03-03 09:46:40, 5138-11-16 09:46:40)`.
73    ///
74    /// This option is backward compatible.
75    GuessNumberUnit,
76}
77
78impl TimestamptzHandling {
79    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
80
81    pub fn from_options(
82        options: &std::collections::BTreeMap<String, String>,
83    ) -> Result<Option<Self>, InvalidOptionError> {
84        let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
85            Some("utc_string") => Self::UtcString,
86            Some("utc_without_suffix") => Self::UtcWithoutSuffix,
87            Some("micro") => Self::Micro,
88            Some("milli") => Self::Milli,
89            Some("guess_number_unit") => Self::GuessNumberUnit,
90            Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
91            None => return Ok(None),
92        };
93        Ok(Some(mode))
94    }
95}
96
97#[derive(Clone, Debug)]
98pub enum TimestampHandling {
99    Milli,
100    GuessNumberUnit,
101}
102
103#[derive(Clone, Debug)]
104pub enum JsonValueHandling {
105    AsValue,
106    AsString,
107}
108#[derive(Clone, Debug)]
109pub enum NumericHandling {
110    Strict,
111    // should integer be parsed to float
112    Relax {
113        // should "3.14" be parsed to 3.14 in float
114        string_parsing: bool,
115    },
116}
117#[derive(Clone, Debug)]
118pub enum BooleanHandling {
119    Strict,
120    // should integer 1,0 be parsed to boolean (debezium)
121    Relax {
122        // should "True" "False" be parsed to true or false in boolean
123        string_parsing: bool,
124        // should string "1" "0" be paesed to boolean (cannal + mysql)
125        string_integer_parsing: bool,
126    },
127}
128
129#[derive(Clone, Debug)]
130pub enum VarcharHandling {
131    // do not allow other types cast to varchar
132    Strict,
133    // allow Json Value (Null, Bool, I64, I128, U64, U128, F64) cast to varchar
134    OnlyPrimaryTypes,
135    // allow all type cast to varchar (inc. Array, Object)
136    AllTypes,
137}
138
139#[derive(Clone, Debug)]
140pub enum StructHandling {
141    // only allow object parsed to struct
142    Strict,
143    // allow string containing a serialized json object (like "{\"a\": 1, \"b\": 2}") parsed to
144    // struct
145    AllowJsonString,
146}
147
148#[derive(Clone, Debug)]
149pub struct JsonParseOptions {
150    pub bytea_handling: ByteaHandling,
151    pub time_handling: TimeHandling,
152    pub timestamp_handling: TimestampHandling,
153    pub timestamptz_handling: TimestamptzHandling,
154    pub json_value_handling: JsonValueHandling,
155    pub numeric_handling: NumericHandling,
156    pub boolean_handling: BooleanHandling,
157    pub varchar_handling: VarcharHandling,
158    pub struct_handling: StructHandling,
159    pub bigint_unsigned_handling: BigintUnsignedHandlingMode,
160    pub ignoring_keycase: bool,
161    pub handle_toast_columns: bool,
162}
163
164impl Default for JsonParseOptions {
165    fn default() -> Self {
166        Self::DEFAULT.clone()
167    }
168}
169
170impl JsonParseOptions {
171    pub const CANAL: JsonParseOptions = JsonParseOptions {
172        bytea_handling: ByteaHandling::Standard,
173        time_handling: TimeHandling::Micro,
174        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
175        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
176        json_value_handling: JsonValueHandling::AsValue,
177        numeric_handling: NumericHandling::Relax {
178            string_parsing: true,
179        },
180        boolean_handling: BooleanHandling::Relax {
181            string_parsing: true,
182            string_integer_parsing: true,
183        },
184        varchar_handling: VarcharHandling::Strict,
185        struct_handling: StructHandling::Strict,
186        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode
187        ignoring_keycase: true,
188        handle_toast_columns: false,
189    };
190    pub const DEFAULT: JsonParseOptions = JsonParseOptions {
191        bytea_handling: ByteaHandling::Standard,
192        time_handling: TimeHandling::Micro,
193        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
194        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
195        json_value_handling: JsonValueHandling::AsValue,
196        numeric_handling: NumericHandling::Relax {
197            string_parsing: true,
198        },
199        boolean_handling: BooleanHandling::Strict,
200        varchar_handling: VarcharHandling::OnlyPrimaryTypes,
201        struct_handling: StructHandling::AllowJsonString,
202        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode
203        ignoring_keycase: true,
204        handle_toast_columns: false,
205    };
206
207    pub fn new_for_debezium(
208        timestamptz_handling: TimestamptzHandling,
209        timestamp_handling: TimestampHandling,
210        time_handling: TimeHandling,
211        bigint_unsigned_handling: BigintUnsignedHandlingMode,
212        handle_toast_columns: bool,
213    ) -> Self {
214        Self {
215            bytea_handling: ByteaHandling::Base64,
216            time_handling,
217            timestamp_handling,
218            timestamptz_handling,
219            json_value_handling: JsonValueHandling::AsString,
220            numeric_handling: NumericHandling::Relax {
221                string_parsing: false,
222            },
223            boolean_handling: BooleanHandling::Relax {
224                string_parsing: false,
225                string_integer_parsing: false,
226            },
227            varchar_handling: VarcharHandling::Strict,
228            struct_handling: StructHandling::Strict,
229            bigint_unsigned_handling,
230            ignoring_keycase: true,
231            handle_toast_columns,
232        }
233    }
234
235    pub fn parse<'a>(
236        &self,
237        value: &'a BorrowedValue<'a>,
238        type_expected: &DataType,
239    ) -> AccessResult<DatumCow<'a>> {
240        let create_error = || AccessError::TypeError {
241            expected: format!("{:?}", type_expected),
242            got: value.value_type().to_string(),
243            value: value.to_string(),
244        };
245        let v: ScalarImpl = match (type_expected, value.value_type()) {
246            (_, ValueType::Null) => return Ok(DatumCow::NULL),
247            // ---- Boolean -----
248            (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
249
250            (
251                DataType::Boolean,
252                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
253            ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
254                && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
255            {
256                (value.as_i64() == Some(1i64)).into()
257            }
258
259            (DataType::Boolean, ValueType::String)
260                if matches!(
261                    self.boolean_handling,
262                    BooleanHandling::Relax {
263                        string_parsing: true,
264                        ..
265                    }
266                ) =>
267            {
268                match value.as_str().unwrap().to_lowercase().as_str() {
269                    "true" => true.into(),
270                    "false" => false.into(),
271                    c @ ("1" | "0")
272                        if matches!(
273                            self.boolean_handling,
274                            BooleanHandling::Relax {
275                                string_parsing: true,
276                                string_integer_parsing: true
277                            }
278                        ) =>
279                    {
280                        if c == "1" {
281                            true.into()
282                        } else {
283                            false.into()
284                        }
285                    }
286                    _ => Err(create_error())?,
287                }
288            }
289            // ---- Int16 -----
290            (
291                DataType::Int16,
292                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
293            ) => value.try_as_i16().map_err(|_| create_error())?.into(),
294
295            (DataType::Int16, ValueType::String)
296                if matches!(
297                    self.numeric_handling,
298                    NumericHandling::Relax {
299                        string_parsing: true
300                    }
301                ) =>
302            {
303                value
304                    .as_str()
305                    .unwrap()
306                    .parse::<i16>()
307                    .map_err(|_| create_error())?
308                    .into()
309            }
310            // ---- Int32 -----
311            (
312                DataType::Int32,
313                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
314            ) => value.try_as_i32().map_err(|_| create_error())?.into(),
315
316            (DataType::Int32, ValueType::String)
317                if matches!(
318                    self.numeric_handling,
319                    NumericHandling::Relax {
320                        string_parsing: true
321                    }
322                ) =>
323            {
324                value
325                    .as_str()
326                    .unwrap()
327                    .parse::<i32>()
328                    .map_err(|_| create_error())?
329                    .into()
330            }
331            // ---- Int64 -----
332            (
333                DataType::Int64,
334                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
335            ) => value.try_as_i64().map_err(|_| create_error())?.into(),
336
337            (DataType::Int64, ValueType::String)
338                if matches!(
339                    self.numeric_handling,
340                    NumericHandling::Relax {
341                        string_parsing: true
342                    }
343                ) =>
344            {
345                value
346                    .as_str()
347                    .unwrap()
348                    .parse::<i64>()
349                    .map_err(|_| create_error())?
350                    .into()
351            }
352            // ---- Float32 -----
353            (
354                DataType::Float32,
355                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
356            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
357                (value.try_as_i64().map_err(|_| create_error())? as f32).into()
358            }
359            (DataType::Float32, ValueType::String)
360                if matches!(
361                    self.numeric_handling,
362                    NumericHandling::Relax {
363                        string_parsing: true
364                    }
365                ) =>
366            {
367                value
368                    .as_str()
369                    .unwrap()
370                    .parse::<f32>()
371                    .map_err(|_| create_error())?
372                    .into()
373            }
374            (DataType::Float32, ValueType::F64) => {
375                value.try_as_f32().map_err(|_| create_error())?.into()
376            }
377            // ---- Float64 -----
378            (
379                DataType::Float64,
380                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
381            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
382                (value.try_as_i64().map_err(|_| create_error())? as f64).into()
383            }
384            (DataType::Float64, ValueType::String)
385                if matches!(
386                    self.numeric_handling,
387                    NumericHandling::Relax {
388                        string_parsing: true
389                    }
390                ) =>
391            {
392                value
393                    .as_str()
394                    .unwrap()
395                    .parse::<f64>()
396                    .map_err(|_| create_error())?
397                    .into()
398            }
399            (DataType::Float64, ValueType::F64) => {
400                value.try_as_f64().map_err(|_| create_error())?.into()
401            }
402            // ---- Decimal -----
403            (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
404                Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
405                    .map_err(|_| create_error())?
406                    .into()
407            }
408            (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
409                let i64_val = value.try_as_i64().map_err(|_| create_error())?;
410                Decimal::from(i64_val).into()
411            }
412            (DataType::Decimal, ValueType::String) => {
413                let str_val = value.as_str().unwrap();
414                // the following values are special string generated by Debezium and should be handled separately
415                match str_val {
416                    "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))),
417                    "POSITIVE_INFINITY" => {
418                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
419                            Decimal::PositiveInf,
420                        ))));
421                    }
422                    "NEGATIVE_INFINITY" => {
423                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
424                            Decimal::NegativeInf,
425                        ))));
426                    }
427                    _ => {}
428                }
429
430                Decimal::from_str(str_val)
431                    .or_else(|_err| {
432                        try_base64_decode_decimal(
433                            str_val,
434                            self.bigint_unsigned_handling,
435                            create_error,
436                        )
437                    })?
438                    .into()
439            }
440
441            (DataType::Decimal, ValueType::F64) => {
442                Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
443                    .map_err(|_| create_error())?
444                    .into()
445            }
446            (DataType::Decimal, ValueType::Object) => {
447                // ref https://github.com/risingwavelabs/risingwave/issues/10628
448                // handle debezium json (variable scale): {"scale": int, "value": bytes}
449                let scale = value
450                    .get("scale")
451                    .ok_or_else(create_error)?
452                    .as_i32()
453                    .unwrap();
454                let value = value
455                    .get("value")
456                    .ok_or_else(create_error)?
457                    .as_str()
458                    .unwrap()
459                    .as_bytes();
460                let unscaled = BigInt::from_signed_bytes_be(value);
461                let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
462                ScalarImpl::Decimal(Decimal::Normalized(decimal))
463            }
464            // ---- Date -----
465            (
466                DataType::Date,
467                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
468            ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
469                .map_err(|_| create_error())?
470                .into(),
471            (DataType::Date, ValueType::String) => value
472                .as_str()
473                .unwrap()
474                .parse::<Date>()
475                .map_err(|_| create_error())?
476                .into(),
477            // ---- Varchar -----
478            (DataType::Varchar, ValueType::String) => {
479                return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
480            }
481            (
482                DataType::Varchar,
483                ValueType::Bool
484                | ValueType::I64
485                | ValueType::I128
486                | ValueType::U64
487                | ValueType::U128
488                | ValueType::F64,
489            ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
490                value.to_string().into()
491            }
492            (
493                DataType::Varchar,
494                ValueType::Bool
495                | ValueType::I64
496                | ValueType::I128
497                | ValueType::U64
498                | ValueType::U128
499                | ValueType::F64
500                | ValueType::Array
501                | ValueType::Object,
502            ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
503                value.to_string().into()
504            }
505            // ---- Time -----
506            (DataType::Time, ValueType::String) => value
507                .as_str()
508                .unwrap()
509                .parse::<Time>()
510                .map_err(|_| create_error())?
511                .into(),
512            (
513                DataType::Time,
514                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
515            ) => value
516                .as_i64()
517                .map(|i| match self.time_handling {
518                    TimeHandling::Milli => Time::with_milli(i as u32),
519                    TimeHandling::Micro => Time::with_micro(i as u64),
520                })
521                .unwrap()
522                .map_err(|_| create_error())?
523                .into(),
524            // ---- Timestamp -----
525            (DataType::Timestamp, ValueType::String) => value
526                .as_str()
527                .unwrap()
528                .parse::<Timestamp>()
529                .map_err(|_| create_error())?
530                .into(),
531            (
532                DataType::Timestamp,
533                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
534            ) => {
535                match self.timestamp_handling {
536                    // Only when user configures debezium.time.precision.mode = 'connect',
537                    // the Milli branch will be executed
538                    TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
539                        .map_err(|_| create_error())?
540                        .into(),
541                    TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
542                        .map_err(|_| create_error())?
543                        .into(),
544                }
545            }
546            // ---- Timestamptz -----
547            (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
548                TimestamptzHandling::UtcWithoutSuffix => value
549                    .as_str()
550                    .unwrap()
551                    .parse::<Timestamp>()
552                    .map(|naive_utc| {
553                        Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
554                    })
555                    .map_err(|_| create_error())?
556                    .into(),
557                // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
558                _ => value
559                    .as_str()
560                    .unwrap()
561                    .parse::<Timestamptz>()
562                    .map_err(|_| create_error())?
563                    .into(),
564            },
565            (
566                DataType::Timestamptz,
567                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
568            ) => value
569                .as_i64()
570                .and_then(|num| match self.timestamptz_handling {
571                    TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
572                    TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
573                    TimestamptzHandling::Milli => Timestamptz::from_millis(num),
574                    // When explicitly requested string format, number without units are rejected.
575                    TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
576                })
577                .ok_or_else(create_error)?
578                .into(),
579            // ---- Interval -----
580            (DataType::Interval, ValueType::String) => value
581                .as_str()
582                .unwrap()
583                .parse::<Interval>()
584                .map_err(|_| create_error())?
585                .into(),
586            // ---- Struct -----
587            (DataType::Struct(struct_type_info), ValueType::Object) => {
588                // Collecting into a Result<Vec<_>> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead.
589                // https://github.com/rust-lang/rust/issues/48994
590                let mut fields = Vec::with_capacity(struct_type_info.len());
591                for (field_name, field_type) in struct_type_info.iter() {
592                    let field_value = json_object_get_case_insensitive(value, field_name)
593                            .unwrap_or_else(|| {
594                                let error = AccessError::Undefined {
595                                    name: field_name.to_owned(),
596                                    path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
597                                };
598                                // TODO: is it possible to unify the logging with the one in `do_action`?
599                                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =  LazyLock::new(LogSuppresser::default);
600                                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
601                                    tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
602                                }
603                                &BorrowedValue::Static(simd_json::StaticNode::Null)
604                            });
605                    fields.push(
606                        self.parse(field_value, field_type)
607                            .map(|d| d.to_owned_datum())?,
608                    );
609                }
610                StructValue::new(fields).into()
611            }
612
613            // String containing json object, e.g. "{\"a\": 1, \"b\": 2}"
614            // Try to parse it as json object.
615            (DataType::Struct(_), ValueType::String)
616                if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
617            {
618                // TODO: avoid copy by accepting `&mut BorrowedValue` in `parse` method.
619                let mut value = value.as_str().unwrap().as_bytes().to_vec();
620                let value =
621                    simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
622                return self
623                    .parse(&value, type_expected)
624                    .map(|d| d.to_owned_datum().into());
625            }
626
627            // ---- List -----
628            (DataType::List(list_type), ValueType::Array) => ListValue::new({
629                let item_type = list_type.elem();
630                let array = value.as_array().unwrap();
631                let mut builder = item_type.create_array_builder(array.len());
632                for v in array {
633                    let value = self.parse(v, item_type)?;
634                    builder.append(value);
635                }
636                builder.finish()
637            })
638            .into(),
639
640            // ---- Bytea -----
641            (DataType::Bytea, ValueType::String) => {
642                let value_str = value.as_str().unwrap();
643
644                match self.bytea_handling {
645                    ByteaHandling::Standard => {
646                        str_to_bytea(value_str).map_err(|_| create_error())?.into()
647                    }
648                    ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
649                        .decode(value_str)
650                        .map_err(|_| create_error())?
651                        .into_boxed_slice()
652                        .into(),
653                }
654            }
655            // ---- Jsonb -----
656            (DataType::Jsonb, ValueType::String)
657                if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
658            {
659                // Check if this value is the Debezium unavailable value (TOAST handling for postgres-cdc).
660                // Debezium will base64 encode the bytea type placeholder.
661                // When a placeholder is encountered, it is converted into a jsonb format placeholder to match the original type.
662                match self.handle_toast_columns {
663                    true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
664                        .map_err(|_| create_error())?
665                        .into(),
666                    false => JsonbVal::from_str(value.as_str().unwrap())
667                        .map_err(|_| create_error())?
668                        .into(),
669                }
670            }
671            (DataType::Jsonb, _)
672                if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
673            {
674                let value: serde_json::Value =
675                    value.clone().try_into().map_err(|_| create_error())?;
676                JsonbVal::from(value).into()
677            }
678            // ---- Int256 -----
679            (
680                DataType::Int256,
681                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
682            ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
683
684            (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
685                .map_err(|_| create_error())?
686                .into(),
687
688            (_expected, _got) => Err(create_error())?,
689        };
690        Ok(DatumCow::Owned(Some(v)))
691    }
692}
693
694/// Try to decode a base64-encoded decimal string for unsigned bigint handling in Precise mode.
695///
696/// This is used when processing CDC data from upstream systems with unsigned bigint (e.g., MySQL CDC).
697/// When users configure `debezium.bigint.unsigned.handling.mode='precise'`, Debezium converts
698/// unsigned bigint to base64-encoded decimal.
699///
700/// Reference: <https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode>.
701fn try_base64_decode_decimal(
702    str_val: &str,
703    bigint_unsigned_handling: BigintUnsignedHandlingMode,
704    create_error: impl Fn() -> AccessError,
705) -> Result<Decimal, AccessError> {
706    match bigint_unsigned_handling {
707        BigintUnsignedHandlingMode::Precise => {
708            // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema
709            // instead of string, as described in <https://github.com/risingwavelabs/risingwave/issues/16852>.
710            // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this
711            // after implementing that functionality.
712            let value = base64::engine::general_purpose::STANDARD
713                .decode(str_val)
714                .map_err(|_| create_error())?;
715            let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value);
716            Decimal::from_str(&unscaled.to_string()).map_err(|_| create_error())
717        }
718        BigintUnsignedHandlingMode::Long => {
719            // In Long mode, don't attempt base64 decoding
720            Err(create_error())
721        }
722    }
723}
724
725pub struct JsonAccess<'a> {
726    value: BorrowedValue<'a>,
727    options: &'a JsonParseOptions,
728}
729
730impl<'a> JsonAccess<'a> {
731    pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
732        Self { value, options }
733    }
734
735    pub fn new(value: BorrowedValue<'a>) -> Self {
736        Self::new_with_options(value, &JsonParseOptions::DEFAULT)
737    }
738}
739
740impl Access for JsonAccess<'_> {
741    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
742        let mut value = &self.value;
743
744        for (idx, &key) in path.iter().enumerate() {
745            if let Some(sub_value) = if self.options.ignoring_keycase {
746                json_object_get_case_insensitive(value, key)
747            } else {
748                value.get(key)
749            } {
750                value = sub_value;
751            } else {
752                Err(AccessError::Undefined {
753                    name: key.to_owned(),
754                    path: path.iter().take(idx).join("."),
755                })?;
756            }
757        }
758
759        self.options.parse(value, type_expected)
760    }
761}
762
763/// Get a value from a json object by key, case insensitive.
764///
765/// Returns `None` if the given json value is not an object, or the key is not found.
766fn json_object_get_case_insensitive<'b>(
767    v: &'b simd_json::BorrowedValue<'b>,
768    key: &str,
769) -> Option<&'b simd_json::BorrowedValue<'b>> {
770    let obj = v.as_object()?;
771    let value = obj.get(key);
772    if value.is_some() {
773        return value; // fast path
774    }
775    for (k, v) in obj {
776        if k.eq_ignore_ascii_case(key) {
777            return Some(v);
778        }
779    }
780    None
781}