risingwave_connector/parser/unified/
json.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppressor;
24use risingwave_common::types::{
25    DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26    ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31    TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40/// Try to parse Debezium `PostGIS` `geometry` object.
41///
42/// Debezium represents `PostGIS` `geometry` as an object: `{"srid": <int>, "wkb": <base64_string>}`.
43/// For our current Postgres CDC ingestion, the `wkb` field is expected to be EWKB bytes (base64-encoded),
44/// and `srid` is redundant. We decode `wkb` into raw bytes and store it as `bytea`.
45///
46/// Note: Debezium provides an SMT to convert between `WKB` and `EWKB`, which may be useful for future
47/// unification across connectors (e.g., MySQL): see `GeometryFormatTransformer`.
48///
49/// Return semantics:
50/// - `Ok(Some(bytes))`: The input matches the Debezium geometry shape (`srid` is numeric AND `wkb` is string),
51///   and we successfully decoded `wkb` into bytes.
52/// - `Ok(None)`: The input does NOT look like a Debezium geometry object. This allows the caller to keep the
53///   match arm focused on dispatching, and avoids misclassifying other JSON objects that might map to `bytea`
54///   in the future.
55/// - `Err(...)`: The input looks like a Debezium geometry object, but decoding/parsing failed (e.g. invalid
56///   base64). This indicates a real data/format error and should not be silently ignored.
57fn try_parse_debezium_geometry_as_bytea(
58    value: &BorrowedValue<'_>,
59    create_error: impl Fn() -> AccessError,
60) -> AccessResult<Option<Box<[u8]>>> {
61    let obj = match value.as_object() {
62        Some(obj) => obj,
63        None => return Ok(None),
64    };
65
66    // Strictly identify the geometry object by checking both fields and their types.
67    // There may be other objects that map to bytea in the future.
68    let srid = obj.get("srid").and_then(|v| v.as_i64());
69    let wkb = obj.get("wkb").and_then(|v| v.as_str());
70
71    let (Some(_srid), Some(wkb)) = (srid, wkb) else {
72        return Ok(None);
73    };
74
75    let bytes = base64::engine::general_purpose::STANDARD
76        .decode(wkb)
77        .map_err(|_| create_error())?
78        .into_boxed_slice();
79
80    Ok(Some(bytes))
81}
82
83#[derive(Clone, Debug)]
84pub enum ByteaHandling {
85    Standard,
86    // debezium converts postgres bytea to base64 format
87    Base64,
88}
89#[derive(Clone, Debug)]
90pub enum TimeHandling {
91    Milli,
92    Micro,
93}
94
95#[derive(Clone, Copy, Debug)]
96pub enum BigintUnsignedHandlingMode {
97    /// Convert unsigned bigint to signed bigint (default)
98    Long,
99    /// Use base64-encoded decimal for unsigned bigint (Debezium precise mode)
100    Precise,
101}
102
103#[derive(Clone, Debug)]
104pub enum TimestamptzHandling {
105    /// `"2024-04-11T02:00:00.123456Z"`
106    UtcString,
107    /// `"2024-04-11 02:00:00.123456"`
108    UtcWithoutSuffix,
109    /// `1712800800123`
110    Milli,
111    /// `1712800800123456`
112    Micro,
113    /// Both `1712800800123` (ms) and `1712800800123456` (us) maps to `2024-04-11`.
114    ///
115    /// Only works for `[1973-03-03 09:46:40, 5138-11-16 09:46:40)`.
116    ///
117    /// This option is backward compatible.
118    GuessNumberUnit,
119}
120
121impl TimestamptzHandling {
122    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
123
124    pub fn from_options(
125        options: &std::collections::BTreeMap<String, String>,
126    ) -> Result<Option<Self>, InvalidOptionError> {
127        let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
128            Some("utc_string") => Self::UtcString,
129            Some("utc_without_suffix") => Self::UtcWithoutSuffix,
130            Some("micro") => Self::Micro,
131            Some("milli") => Self::Milli,
132            Some("guess_number_unit") => Self::GuessNumberUnit,
133            Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
134            None => return Ok(None),
135        };
136        Ok(Some(mode))
137    }
138}
139
140#[derive(Clone, Debug)]
141pub enum TimestampHandling {
142    Milli,
143    GuessNumberUnit,
144}
145
146#[derive(Clone, Debug)]
147pub enum JsonValueHandling {
148    AsValue,
149    AsString,
150}
151#[derive(Clone, Debug)]
152pub enum NumericHandling {
153    Strict,
154    // should integer be parsed to float
155    Relax {
156        // should "3.14" be parsed to 3.14 in float
157        string_parsing: bool,
158    },
159}
160#[derive(Clone, Debug)]
161pub enum BooleanHandling {
162    Strict,
163    // should integer 1,0 be parsed to boolean (debezium)
164    Relax {
165        // should "True" "False" be parsed to true or false in boolean
166        string_parsing: bool,
167        // should string "1" "0" be paesed to boolean (cannal + mysql)
168        string_integer_parsing: bool,
169    },
170}
171
172#[derive(Clone, Debug)]
173pub enum VarcharHandling {
174    // do not allow other types cast to varchar
175    Strict,
176    // allow Json Value (Null, Bool, I64, I128, U64, U128, F64) cast to varchar
177    OnlyPrimaryTypes,
178    // allow all type cast to varchar (inc. Array, Object)
179    AllTypes,
180}
181
182#[derive(Clone, Debug)]
183pub enum StructHandling {
184    // only allow object parsed to struct
185    Strict,
186    // allow string containing a serialized json object (like "{\"a\": 1, \"b\": 2}") parsed to
187    // struct
188    AllowJsonString,
189}
190
191#[derive(Clone, Debug)]
192pub struct JsonParseOptions {
193    pub bytea_handling: ByteaHandling,
194    pub time_handling: TimeHandling,
195    pub timestamp_handling: TimestampHandling,
196    pub timestamptz_handling: TimestamptzHandling,
197    pub json_value_handling: JsonValueHandling,
198    pub numeric_handling: NumericHandling,
199    pub boolean_handling: BooleanHandling,
200    pub varchar_handling: VarcharHandling,
201    pub struct_handling: StructHandling,
202    pub bigint_unsigned_handling: BigintUnsignedHandlingMode,
203    pub ignoring_keycase: bool,
204    pub handle_toast_columns: bool,
205}
206
207impl Default for JsonParseOptions {
208    fn default() -> Self {
209        Self::DEFAULT.clone()
210    }
211}
212
213impl JsonParseOptions {
214    pub const CANAL: JsonParseOptions = JsonParseOptions {
215        bytea_handling: ByteaHandling::Standard,
216        time_handling: TimeHandling::Micro,
217        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
218        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
219        json_value_handling: JsonValueHandling::AsValue,
220        numeric_handling: NumericHandling::Relax {
221            string_parsing: true,
222        },
223        boolean_handling: BooleanHandling::Relax {
224            string_parsing: true,
225            string_integer_parsing: true,
226        },
227        varchar_handling: VarcharHandling::Strict,
228        struct_handling: StructHandling::Strict,
229        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode
230        ignoring_keycase: true,
231        handle_toast_columns: false,
232    };
233    pub const DEFAULT: JsonParseOptions = JsonParseOptions {
234        bytea_handling: ByteaHandling::Standard,
235        time_handling: TimeHandling::Micro,
236        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
237        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
238        json_value_handling: JsonValueHandling::AsValue,
239        numeric_handling: NumericHandling::Relax {
240            string_parsing: true,
241        },
242        boolean_handling: BooleanHandling::Strict,
243        varchar_handling: VarcharHandling::OnlyPrimaryTypes,
244        struct_handling: StructHandling::AllowJsonString,
245        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode
246        ignoring_keycase: true,
247        handle_toast_columns: false,
248    };
249
250    pub fn new_for_debezium(
251        timestamptz_handling: TimestamptzHandling,
252        timestamp_handling: TimestampHandling,
253        time_handling: TimeHandling,
254        bigint_unsigned_handling: BigintUnsignedHandlingMode,
255        handle_toast_columns: bool,
256    ) -> Self {
257        Self {
258            bytea_handling: ByteaHandling::Base64,
259            time_handling,
260            timestamp_handling,
261            timestamptz_handling,
262            json_value_handling: JsonValueHandling::AsString,
263            numeric_handling: NumericHandling::Relax {
264                string_parsing: false,
265            },
266            boolean_handling: BooleanHandling::Relax {
267                string_parsing: false,
268                string_integer_parsing: false,
269            },
270            varchar_handling: VarcharHandling::Strict,
271            struct_handling: StructHandling::Strict,
272            bigint_unsigned_handling,
273            ignoring_keycase: true,
274            handle_toast_columns,
275        }
276    }
277
278    pub fn parse<'a>(
279        &self,
280        value: &'a BorrowedValue<'a>,
281        type_expected: &DataType,
282    ) -> AccessResult<DatumCow<'a>> {
283        let create_error = || AccessError::TypeError {
284            expected: format!("{:?}", type_expected),
285            got: value.value_type().to_string(),
286            value: value.to_string(),
287        };
288        let v: ScalarImpl = match (type_expected, value.value_type()) {
289            (_, ValueType::Null) => return Ok(DatumCow::NULL),
290            // ---- Boolean -----
291            (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
292
293            (
294                DataType::Boolean,
295                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
296            ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
297                && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
298            {
299                (value.as_i64() == Some(1i64)).into()
300            }
301
302            (DataType::Boolean, ValueType::String)
303                if matches!(
304                    self.boolean_handling,
305                    BooleanHandling::Relax {
306                        string_parsing: true,
307                        ..
308                    }
309                ) =>
310            {
311                match value.as_str().unwrap().to_lowercase().as_str() {
312                    "true" => true.into(),
313                    "false" => false.into(),
314                    c @ ("1" | "0")
315                        if matches!(
316                            self.boolean_handling,
317                            BooleanHandling::Relax {
318                                string_parsing: true,
319                                string_integer_parsing: true
320                            }
321                        ) =>
322                    {
323                        if c == "1" {
324                            true.into()
325                        } else {
326                            false.into()
327                        }
328                    }
329                    _ => Err(create_error())?,
330                }
331            }
332            // ---- Int16 -----
333            (
334                DataType::Int16,
335                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
336            ) => value.try_as_i16().map_err(|_| create_error())?.into(),
337
338            (DataType::Int16, ValueType::String)
339                if matches!(
340                    self.numeric_handling,
341                    NumericHandling::Relax {
342                        string_parsing: true
343                    }
344                ) =>
345            {
346                value
347                    .as_str()
348                    .unwrap()
349                    .parse::<i16>()
350                    .map_err(|_| create_error())?
351                    .into()
352            }
353            // ---- Int32 -----
354            (
355                DataType::Int32,
356                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
357            ) => value.try_as_i32().map_err(|_| create_error())?.into(),
358
359            (DataType::Int32, ValueType::String)
360                if matches!(
361                    self.numeric_handling,
362                    NumericHandling::Relax {
363                        string_parsing: true
364                    }
365                ) =>
366            {
367                value
368                    .as_str()
369                    .unwrap()
370                    .parse::<i32>()
371                    .map_err(|_| create_error())?
372                    .into()
373            }
374            // ---- Int64 -----
375            (
376                DataType::Int64,
377                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
378            ) => value.try_as_i64().map_err(|_| create_error())?.into(),
379
380            (DataType::Int64, ValueType::String)
381                if matches!(
382                    self.numeric_handling,
383                    NumericHandling::Relax {
384                        string_parsing: true
385                    }
386                ) =>
387            {
388                value
389                    .as_str()
390                    .unwrap()
391                    .parse::<i64>()
392                    .map_err(|_| create_error())?
393                    .into()
394            }
395            // ---- Float32 -----
396            (
397                DataType::Float32,
398                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
399            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
400                (value.try_as_i64().map_err(|_| create_error())? as f32).into()
401            }
402            (DataType::Float32, ValueType::String)
403                if matches!(
404                    self.numeric_handling,
405                    NumericHandling::Relax {
406                        string_parsing: true
407                    }
408                ) =>
409            {
410                value
411                    .as_str()
412                    .unwrap()
413                    .parse::<f32>()
414                    .map_err(|_| create_error())?
415                    .into()
416            }
417            (DataType::Float32, ValueType::F64) => {
418                value.try_as_f32().map_err(|_| create_error())?.into()
419            }
420            // ---- Float64 -----
421            (
422                DataType::Float64,
423                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
424            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
425                (value.try_as_i64().map_err(|_| create_error())? as f64).into()
426            }
427            (DataType::Float64, ValueType::String)
428                if matches!(
429                    self.numeric_handling,
430                    NumericHandling::Relax {
431                        string_parsing: true
432                    }
433                ) =>
434            {
435                value
436                    .as_str()
437                    .unwrap()
438                    .parse::<f64>()
439                    .map_err(|_| create_error())?
440                    .into()
441            }
442            (DataType::Float64, ValueType::F64) => {
443                value.try_as_f64().map_err(|_| create_error())?.into()
444            }
445            // ---- Decimal -----
446            (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
447                Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
448                    .map_err(|_| create_error())?
449                    .into()
450            }
451            (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
452                let i64_val = value.try_as_i64().map_err(|_| create_error())?;
453                Decimal::from(i64_val).into()
454            }
455            (DataType::Decimal, ValueType::String) => {
456                let str_val = value.as_str().unwrap();
457                // the following values are special string generated by Debezium and should be handled separately
458                match str_val {
459                    "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))),
460                    "POSITIVE_INFINITY" => {
461                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
462                            Decimal::PositiveInf,
463                        ))));
464                    }
465                    "NEGATIVE_INFINITY" => {
466                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
467                            Decimal::NegativeInf,
468                        ))));
469                    }
470                    _ => {}
471                }
472
473                Decimal::from_str(str_val)
474                    .or_else(|_err| {
475                        try_base64_decode_decimal(
476                            str_val,
477                            self.bigint_unsigned_handling,
478                            create_error,
479                        )
480                    })?
481                    .into()
482            }
483
484            (DataType::Decimal, ValueType::F64) => {
485                Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
486                    .map_err(|_| create_error())?
487                    .into()
488            }
489            (DataType::Decimal, ValueType::Object) => {
490                // ref https://github.com/risingwavelabs/risingwave/issues/10628
491                // handle debezium json (variable scale): {"scale": int, "value": bytes}
492                let scale = value
493                    .get("scale")
494                    .ok_or_else(create_error)?
495                    .as_i32()
496                    .unwrap();
497                let value = value
498                    .get("value")
499                    .ok_or_else(create_error)?
500                    .as_str()
501                    .unwrap()
502                    .as_bytes();
503                let unscaled = BigInt::from_signed_bytes_be(value);
504                let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
505                ScalarImpl::Decimal(Decimal::Normalized(decimal))
506            }
507            // ---- Date -----
508            (
509                DataType::Date,
510                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
511            ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
512                .map_err(|_| create_error())?
513                .into(),
514            (DataType::Date, ValueType::String) => value
515                .as_str()
516                .unwrap()
517                .parse::<Date>()
518                .map_err(|_| create_error())?
519                .into(),
520            // ---- Varchar -----
521            (DataType::Varchar, ValueType::String) => {
522                return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
523            }
524            (
525                DataType::Varchar,
526                ValueType::Bool
527                | ValueType::I64
528                | ValueType::I128
529                | ValueType::U64
530                | ValueType::U128
531                | ValueType::F64,
532            ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
533                value.to_string().into()
534            }
535            (
536                DataType::Varchar,
537                ValueType::Bool
538                | ValueType::I64
539                | ValueType::I128
540                | ValueType::U64
541                | ValueType::U128
542                | ValueType::F64
543                | ValueType::Array
544                | ValueType::Object,
545            ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
546                value.to_string().into()
547            }
548            // ---- Time -----
549            (DataType::Time, ValueType::String) => value
550                .as_str()
551                .unwrap()
552                .parse::<Time>()
553                .map_err(|_| create_error())?
554                .into(),
555            (
556                DataType::Time,
557                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
558            ) => value
559                .as_i64()
560                .map(|i| match self.time_handling {
561                    TimeHandling::Milli => Time::with_milli(i as u32),
562                    TimeHandling::Micro => Time::with_micro(i as u64),
563                })
564                .unwrap()
565                .map_err(|_| create_error())?
566                .into(),
567            // ---- Timestamp -----
568            (DataType::Timestamp, ValueType::String) => value
569                .as_str()
570                .unwrap()
571                .parse::<Timestamp>()
572                .map_err(|_| create_error())?
573                .into(),
574            (
575                DataType::Timestamp,
576                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
577            ) => {
578                match self.timestamp_handling {
579                    // Only when user configures debezium.time.precision.mode = 'connect',
580                    // the Milli branch will be executed
581                    TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
582                        .map_err(|_| create_error())?
583                        .into(),
584                    TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
585                        .map_err(|_| create_error())?
586                        .into(),
587                }
588            }
589            // ---- Timestamptz -----
590            (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
591                TimestamptzHandling::UtcWithoutSuffix => value
592                    .as_str()
593                    .unwrap()
594                    .parse::<Timestamp>()
595                    .map(|naive_utc| {
596                        Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
597                    })
598                    .map_err(|_| create_error())?
599                    .into(),
600                // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
601                _ => value
602                    .as_str()
603                    .unwrap()
604                    .parse::<Timestamptz>()
605                    .map_err(|_| create_error())?
606                    .into(),
607            },
608            (
609                DataType::Timestamptz,
610                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
611            ) => value
612                .as_i64()
613                .and_then(|num| match self.timestamptz_handling {
614                    TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
615                    TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
616                    TimestamptzHandling::Milli => Timestamptz::from_millis(num),
617                    // When explicitly requested string format, number without units are rejected.
618                    TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
619                })
620                .ok_or_else(create_error)?
621                .into(),
622            // ---- Interval -----
623            (DataType::Interval, ValueType::String) => value
624                .as_str()
625                .unwrap()
626                .parse::<Interval>()
627                .map_err(|_| create_error())?
628                .into(),
629            // ---- Struct -----
630            (DataType::Struct(struct_type_info), ValueType::Object) => {
631                // Collecting into a Result<Vec<_>> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead.
632                // https://github.com/rust-lang/rust/issues/48994
633                let mut fields = Vec::with_capacity(struct_type_info.len());
634                for (field_name, field_type) in struct_type_info.iter() {
635                    let field_value = json_object_get_case_insensitive(value, field_name)
636                            .unwrap_or_else(|| {
637                                let error = AccessError::Undefined {
638                                    name: field_name.to_owned(),
639                                    path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
640                                };
641                                // TODO: is it possible to unify the logging with the one in `do_action`?
642                                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =  LazyLock::new(LogSuppressor::default);
643                                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
644                                    tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
645                                }
646                                &BorrowedValue::Static(simd_json::StaticNode::Null)
647                            });
648                    fields.push(
649                        self.parse(field_value, field_type)
650                            .map(|d| d.to_owned_datum())?,
651                    );
652                }
653                StructValue::new(fields).into()
654            }
655
656            // String containing json object, e.g. "{\"a\": 1, \"b\": 2}"
657            // Try to parse it as json object.
658            (DataType::Struct(_), ValueType::String)
659                if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
660            {
661                // TODO: avoid copy by accepting `&mut BorrowedValue` in `parse` method.
662                let mut value = value.as_str().unwrap().as_bytes().to_vec();
663                let value =
664                    simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
665                return self
666                    .parse(&value, type_expected)
667                    .map(|d| d.to_owned_datum().into());
668            }
669
670            // ---- List -----
671            (DataType::List(list_type), ValueType::Array) => ListValue::new({
672                let item_type = list_type.elem();
673                let array = value.as_array().unwrap();
674                let mut builder = item_type.create_array_builder(array.len());
675                for v in array {
676                    let value = self.parse(v, item_type)?;
677                    builder.append(value);
678                }
679                builder.finish()
680            })
681            .into(),
682
683            // ---- Bytea -----
684            (DataType::Bytea, ValueType::String) => {
685                let value_str = value.as_str().unwrap();
686
687                match self.bytea_handling {
688                    ByteaHandling::Standard => {
689                        let mut buf = Vec::new();
690                        str_to_bytea(value_str, &mut buf).map_err(|_| create_error())?;
691                        buf.into()
692                    }
693                    ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
694                        .decode(value_str)
695                        .map_err(|_| create_error())?
696                        .into_boxed_slice()
697                        .into(),
698                }
699            }
700            // Handle Debezium PostGIS geometry type: {"srid": <int>, "wkb": <base64_string>}
701            // We extract the wkb field and decode it as EWKB bytes
702            (DataType::Bytea, ValueType::Object) => {
703                match try_parse_debezium_geometry_as_bytea(value, create_error)? {
704                    Some(bytes) => bytes.into(),
705                    None => Err(create_error())?,
706                }
707            }
708            // ---- Jsonb -----
709            (DataType::Jsonb, ValueType::String)
710                if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
711            {
712                // Check if this value is the Debezium unavailable value (TOAST handling for postgres-cdc).
713                // Debezium will base64 encode the bytea type placeholder.
714                // When a placeholder is encountered, it is converted into a jsonb format placeholder to match the original type.
715                match self.handle_toast_columns {
716                    true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
717                        .map_err(|_| create_error())?
718                        .into(),
719                    false => JsonbVal::from_str(value.as_str().unwrap())
720                        .map_err(|_| create_error())?
721                        .into(),
722                }
723            }
724            (DataType::Jsonb, _)
725                if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
726            {
727                let value: serde_json::Value =
728                    value.clone().try_into().map_err(|_| create_error())?;
729                JsonbVal::from(value).into()
730            }
731            // ---- Int256 -----
732            (
733                DataType::Int256,
734                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
735            ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
736
737            (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
738                .map_err(|_| create_error())?
739                .into(),
740
741            (_expected, _got) => Err(create_error())?,
742        };
743        Ok(DatumCow::Owned(Some(v)))
744    }
745}
746
747/// Try to decode a base64-encoded decimal string for unsigned bigint handling in Precise mode.
748///
749/// This is used when processing CDC data from upstream systems with unsigned bigint (e.g., MySQL CDC).
750/// When users configure `debezium.bigint.unsigned.handling.mode='precise'`, Debezium converts
751/// unsigned bigint to base64-encoded decimal.
752///
753/// Reference: <https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode>.
754fn try_base64_decode_decimal(
755    str_val: &str,
756    bigint_unsigned_handling: BigintUnsignedHandlingMode,
757    create_error: impl Fn() -> AccessError,
758) -> Result<Decimal, AccessError> {
759    match bigint_unsigned_handling {
760        BigintUnsignedHandlingMode::Precise => {
761            // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema
762            // instead of string, as described in <https://github.com/risingwavelabs/risingwave/issues/16852>.
763            // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this
764            // after implementing that functionality.
765            let value = base64::engine::general_purpose::STANDARD
766                .decode(str_val)
767                .map_err(|_| create_error())?;
768            let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value);
769            Decimal::from_str(&unscaled.to_string()).map_err(|_| create_error())
770        }
771        BigintUnsignedHandlingMode::Long => {
772            // In Long mode, don't attempt base64 decoding
773            Err(create_error())
774        }
775    }
776}
777
778pub struct JsonAccess<'a> {
779    value: BorrowedValue<'a>,
780    options: &'a JsonParseOptions,
781}
782
783impl<'a> JsonAccess<'a> {
784    pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
785        Self { value, options }
786    }
787
788    pub fn new(value: BorrowedValue<'a>) -> Self {
789        Self::new_with_options(value, &JsonParseOptions::DEFAULT)
790    }
791}
792
793impl Access for JsonAccess<'_> {
794    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
795        let mut value = &self.value;
796
797        for (idx, &key) in path.iter().enumerate() {
798            if let Some(sub_value) = if self.options.ignoring_keycase {
799                json_object_get_case_insensitive(value, key)
800            } else {
801                value.get(key)
802            } {
803                value = sub_value;
804            } else {
805                Err(AccessError::Undefined {
806                    name: key.to_owned(),
807                    path: path.iter().take(idx).join("."),
808                })?;
809            }
810        }
811
812        self.options.parse(value, type_expected)
813    }
814}
815
816/// Get a value from a json object by key, case insensitive.
817///
818/// Returns `None` if the given json value is not an object, or the key is not found.
819fn json_object_get_case_insensitive<'b>(
820    v: &'b simd_json::BorrowedValue<'b>,
821    key: &str,
822) -> Option<&'b simd_json::BorrowedValue<'b>> {
823    let obj = v.as_object()?;
824    let value = obj.get(key);
825    if value.is_some() {
826        return value; // fast path
827    }
828    for (k, v) in obj {
829        if k.eq_ignore_ascii_case(key) {
830            return Some(v);
831        }
832    }
833    None
834}