risingwave_connector/parser/unified/
json.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{Finite32, ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppressor;
24use risingwave_common::types::{
25    DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26    ToOwnedDatum, VectorVal,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31    TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40/// Try to parse Debezium `PostGIS` `geometry` object.
41///
42/// Debezium represents `PostGIS` `geometry` as an object: `{"srid": <int>, "wkb": <base64_string>}`.
43/// For our current Postgres CDC ingestion, the `wkb` field is expected to be EWKB bytes (base64-encoded),
44/// and `srid` is redundant. We decode `wkb` into raw bytes and store it as `bytea`.
45///
46/// Note: Debezium provides an SMT to convert between `WKB` and `EWKB`, which may be useful for future
47/// unification across connectors (e.g., MySQL): see `GeometryFormatTransformer`.
48///
49/// Return semantics:
50/// - `Ok(Some(bytes))`: The input matches the Debezium geometry shape (`srid` is numeric AND `wkb` is string),
51///   and we successfully decoded `wkb` into bytes.
52/// - `Ok(None)`: The input does NOT look like a Debezium geometry object. This allows the caller to keep the
53///   match arm focused on dispatching, and avoids misclassifying other JSON objects that might map to `bytea`
54///   in the future.
55/// - `Err(...)`: The input looks like a Debezium geometry object, but decoding/parsing failed (e.g. invalid
56///   base64). This indicates a real data/format error and should not be silently ignored.
57fn try_parse_debezium_geometry_as_bytea(
58    value: &BorrowedValue<'_>,
59    create_error: impl Fn() -> AccessError,
60) -> AccessResult<Option<Box<[u8]>>> {
61    let obj = match value.as_object() {
62        Some(obj) => obj,
63        None => return Ok(None),
64    };
65
66    // Strictly identify the geometry object by checking both fields and their types.
67    // There may be other objects that map to bytea in the future.
68    let srid = obj.get("srid").and_then(|v| v.as_i64());
69    let wkb = obj.get("wkb").and_then(|v| v.as_str());
70
71    let (Some(_srid), Some(wkb)) = (srid, wkb) else {
72        return Ok(None);
73    };
74
75    let bytes = base64::engine::general_purpose::STANDARD
76        .decode(wkb)
77        .map_err(|_| create_error())?
78        .into_boxed_slice();
79
80    Ok(Some(bytes))
81}
82
83#[derive(Clone, Debug)]
84pub enum ByteaHandling {
85    Standard,
86    // debezium converts postgres bytea to base64 format
87    Base64,
88}
89#[derive(Clone, Debug)]
90pub enum TimeHandling {
91    Milli,
92    Micro,
93}
94
95#[derive(Clone, Copy, Debug)]
96pub enum BigintUnsignedHandlingMode {
97    /// Convert unsigned bigint to signed bigint (default)
98    Long,
99    /// Use base64-encoded decimal for unsigned bigint (Debezium precise mode)
100    Precise,
101}
102
103#[derive(Clone, Debug)]
104pub enum TimestamptzHandling {
105    /// `"2024-04-11T02:00:00.123456Z"`
106    UtcString,
107    /// `"2024-04-11 02:00:00.123456"`
108    UtcWithoutSuffix,
109    /// `1712800800123`
110    Milli,
111    /// `1712800800123456`
112    Micro,
113    /// Both `1712800800123` (ms) and `1712800800123456` (us) maps to `2024-04-11`.
114    ///
115    /// Only works for `[1973-03-03 09:46:40, 5138-11-16 09:46:40)`.
116    ///
117    /// This option is backward compatible.
118    GuessNumberUnit,
119}
120
121impl TimestamptzHandling {
122    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
123
124    pub fn from_options(value: &str) -> Result<Self, InvalidOptionError> {
125        let mode = match value {
126            "utc_string" => Self::UtcString,
127            "utc_without_suffix" => Self::UtcWithoutSuffix,
128            "micro" => Self::Micro,
129            "milli" => Self::Milli,
130            "guess_number_unit" => Self::GuessNumberUnit,
131            v => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
132        };
133        Ok(mode)
134    }
135}
136
137#[derive(Clone, Debug)]
138pub enum TimestampHandling {
139    Milli,
140    GuessNumberUnit,
141}
142
143#[derive(Clone, Debug)]
144pub enum JsonValueHandling {
145    AsValue,
146    AsString,
147}
148#[derive(Clone, Debug)]
149pub enum NumericHandling {
150    Strict,
151    // should integer be parsed to float
152    Relax {
153        // should "3.14" be parsed to 3.14 in float
154        string_parsing: bool,
155    },
156}
157#[derive(Clone, Debug)]
158pub enum BooleanHandling {
159    Strict,
160    // should integer 1,0 be parsed to boolean (debezium)
161    Relax {
162        // should "True" "False" be parsed to true or false in boolean
163        string_parsing: bool,
164        // should string "1" "0" be paesed to boolean (cannal + mysql)
165        string_integer_parsing: bool,
166    },
167}
168
169#[derive(Clone, Debug)]
170pub enum VarcharHandling {
171    // do not allow other types cast to varchar
172    Strict,
173    // allow Json Value (Null, Bool, I64, I128, U64, U128, F64) cast to varchar
174    OnlyPrimaryTypes,
175    // allow all type cast to varchar (inc. Array, Object)
176    AllTypes,
177}
178
179#[derive(Clone, Debug)]
180pub enum StructHandling {
181    // only allow object parsed to struct
182    Strict,
183    // allow string containing a serialized json object (like "{\"a\": 1, \"b\": 2}") parsed to
184    // struct
185    AllowJsonString,
186}
187
188#[derive(Clone, Debug)]
189pub struct JsonParseOptions {
190    pub bytea_handling: ByteaHandling,
191    pub time_handling: TimeHandling,
192    pub timestamp_handling: TimestampHandling,
193    pub timestamptz_handling: TimestamptzHandling,
194    pub json_value_handling: JsonValueHandling,
195    pub numeric_handling: NumericHandling,
196    pub boolean_handling: BooleanHandling,
197    pub varchar_handling: VarcharHandling,
198    pub struct_handling: StructHandling,
199    pub bigint_unsigned_handling: BigintUnsignedHandlingMode,
200    pub ignoring_keycase: bool,
201    pub handle_toast_columns: bool,
202}
203
204impl Default for JsonParseOptions {
205    fn default() -> Self {
206        Self::DEFAULT.clone()
207    }
208}
209
210impl JsonParseOptions {
211    pub const CANAL: JsonParseOptions = JsonParseOptions {
212        bytea_handling: ByteaHandling::Standard,
213        time_handling: TimeHandling::Micro,
214        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
215        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
216        json_value_handling: JsonValueHandling::AsValue,
217        numeric_handling: NumericHandling::Relax {
218            string_parsing: true,
219        },
220        boolean_handling: BooleanHandling::Relax {
221            string_parsing: true,
222            string_integer_parsing: true,
223        },
224        varchar_handling: VarcharHandling::Strict,
225        struct_handling: StructHandling::Strict,
226        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode
227        ignoring_keycase: true,
228        handle_toast_columns: false,
229    };
230    pub const DEFAULT: JsonParseOptions = JsonParseOptions {
231        bytea_handling: ByteaHandling::Standard,
232        time_handling: TimeHandling::Micro,
233        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
234        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
235        json_value_handling: JsonValueHandling::AsValue,
236        numeric_handling: NumericHandling::Relax {
237            string_parsing: true,
238        },
239        boolean_handling: BooleanHandling::Strict,
240        varchar_handling: VarcharHandling::OnlyPrimaryTypes,
241        struct_handling: StructHandling::AllowJsonString,
242        bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode
243        ignoring_keycase: true,
244        handle_toast_columns: false,
245    };
246
247    pub fn new_for_debezium(
248        timestamptz_handling: TimestamptzHandling,
249        timestamp_handling: TimestampHandling,
250        time_handling: TimeHandling,
251        bigint_unsigned_handling: BigintUnsignedHandlingMode,
252        handle_toast_columns: bool,
253    ) -> Self {
254        Self {
255            bytea_handling: ByteaHandling::Base64,
256            time_handling,
257            timestamp_handling,
258            timestamptz_handling,
259            json_value_handling: JsonValueHandling::AsString,
260            numeric_handling: NumericHandling::Relax {
261                string_parsing: false,
262            },
263            boolean_handling: BooleanHandling::Relax {
264                string_parsing: false,
265                string_integer_parsing: false,
266            },
267            varchar_handling: VarcharHandling::Strict,
268            struct_handling: StructHandling::Strict,
269            bigint_unsigned_handling,
270            ignoring_keycase: true,
271            handle_toast_columns,
272        }
273    }
274
275    pub fn parse<'a>(
276        &self,
277        value: &'a BorrowedValue<'a>,
278        type_expected: &DataType,
279    ) -> AccessResult<DatumCow<'a>> {
280        let create_error = || AccessError::TypeError {
281            expected: format!("{:?}", type_expected),
282            got: value.value_type().to_string(),
283            value: value.to_string(),
284        };
285        let v: ScalarImpl = match (type_expected, value.value_type()) {
286            (_, ValueType::Null) => return Ok(DatumCow::NULL),
287            // ---- Boolean -----
288            (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
289
290            (
291                DataType::Boolean,
292                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
293            ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
294                && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
295            {
296                (value.as_i64() == Some(1i64)).into()
297            }
298
299            (DataType::Boolean, ValueType::String)
300                if matches!(
301                    self.boolean_handling,
302                    BooleanHandling::Relax {
303                        string_parsing: true,
304                        ..
305                    }
306                ) =>
307            {
308                match value.as_str().unwrap().to_lowercase().as_str() {
309                    "true" => true.into(),
310                    "false" => false.into(),
311                    c @ ("1" | "0")
312                        if matches!(
313                            self.boolean_handling,
314                            BooleanHandling::Relax {
315                                string_parsing: true,
316                                string_integer_parsing: true
317                            }
318                        ) =>
319                    {
320                        if c == "1" {
321                            true.into()
322                        } else {
323                            false.into()
324                        }
325                    }
326                    _ => Err(create_error())?,
327                }
328            }
329            // ---- Int16 -----
330            (
331                DataType::Int16,
332                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
333            ) => value.try_as_i16().map_err(|_| create_error())?.into(),
334
335            (DataType::Int16, ValueType::String)
336                if matches!(
337                    self.numeric_handling,
338                    NumericHandling::Relax {
339                        string_parsing: true
340                    }
341                ) =>
342            {
343                value
344                    .as_str()
345                    .unwrap()
346                    .parse::<i16>()
347                    .map_err(|_| create_error())?
348                    .into()
349            }
350            // ---- Int32 -----
351            (
352                DataType::Int32,
353                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
354            ) => value.try_as_i32().map_err(|_| create_error())?.into(),
355
356            (DataType::Int32, ValueType::String)
357                if matches!(
358                    self.numeric_handling,
359                    NumericHandling::Relax {
360                        string_parsing: true
361                    }
362                ) =>
363            {
364                value
365                    .as_str()
366                    .unwrap()
367                    .parse::<i32>()
368                    .map_err(|_| create_error())?
369                    .into()
370            }
371            // ---- Int64 -----
372            (
373                DataType::Int64,
374                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
375            ) => value.try_as_i64().map_err(|_| create_error())?.into(),
376
377            (DataType::Int64, ValueType::String)
378                if matches!(
379                    self.numeric_handling,
380                    NumericHandling::Relax {
381                        string_parsing: true
382                    }
383                ) =>
384            {
385                value
386                    .as_str()
387                    .unwrap()
388                    .parse::<i64>()
389                    .map_err(|_| create_error())?
390                    .into()
391            }
392            // ---- Float32 -----
393            (
394                DataType::Float32,
395                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
396            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
397                (value.try_as_i64().map_err(|_| create_error())? as f32).into()
398            }
399            (DataType::Float32, ValueType::String)
400                if matches!(
401                    self.numeric_handling,
402                    NumericHandling::Relax {
403                        string_parsing: true
404                    }
405                ) =>
406            {
407                value
408                    .as_str()
409                    .unwrap()
410                    .parse::<f32>()
411                    .map_err(|_| create_error())?
412                    .into()
413            }
414            (DataType::Float32, ValueType::F64) => {
415                value.try_as_f32().map_err(|_| create_error())?.into()
416            }
417            // ---- Float64 -----
418            (
419                DataType::Float64,
420                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
421            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
422                (value.try_as_i64().map_err(|_| create_error())? as f64).into()
423            }
424            (DataType::Float64, ValueType::String)
425                if matches!(
426                    self.numeric_handling,
427                    NumericHandling::Relax {
428                        string_parsing: true
429                    }
430                ) =>
431            {
432                value
433                    .as_str()
434                    .unwrap()
435                    .parse::<f64>()
436                    .map_err(|_| create_error())?
437                    .into()
438            }
439            (DataType::Float64, ValueType::F64) => {
440                value.try_as_f64().map_err(|_| create_error())?.into()
441            }
442            // ---- Decimal -----
443            (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
444                Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
445                    .map_err(|_| create_error())?
446                    .into()
447            }
448            (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
449                let i64_val = value.try_as_i64().map_err(|_| create_error())?;
450                Decimal::from(i64_val).into()
451            }
452            (DataType::Decimal, ValueType::String) => {
453                let str_val = value.as_str().unwrap();
454                // the following values are special string generated by Debezium and should be handled separately
455                match str_val {
456                    "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))),
457                    "POSITIVE_INFINITY" => {
458                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
459                            Decimal::PositiveInf,
460                        ))));
461                    }
462                    "NEGATIVE_INFINITY" => {
463                        return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(
464                            Decimal::NegativeInf,
465                        ))));
466                    }
467                    _ => {}
468                }
469
470                Decimal::from_str(str_val)
471                    .or_else(|_err| {
472                        try_base64_decode_decimal(
473                            str_val,
474                            self.bigint_unsigned_handling,
475                            create_error,
476                        )
477                    })?
478                    .into()
479            }
480
481            (DataType::Decimal, ValueType::F64) => {
482                Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
483                    .map_err(|_| create_error())?
484                    .into()
485            }
486            (DataType::Decimal, ValueType::Object) => {
487                // ref https://github.com/risingwavelabs/risingwave/issues/10628
488                // handle debezium json (variable scale): {"scale": int, "value": bytes}
489                let scale = value
490                    .get("scale")
491                    .ok_or_else(create_error)?
492                    .as_i32()
493                    .unwrap();
494                let value = value
495                    .get("value")
496                    .ok_or_else(create_error)?
497                    .as_str()
498                    .unwrap()
499                    .as_bytes();
500                let unscaled = BigInt::from_signed_bytes_be(value);
501                let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
502                ScalarImpl::Decimal(Decimal::Normalized(decimal))
503            }
504            // ---- Date -----
505            (
506                DataType::Date,
507                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
508            ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
509                .map_err(|_| create_error())?
510                .into(),
511            (DataType::Date, ValueType::String) => value
512                .as_str()
513                .unwrap()
514                .parse::<Date>()
515                .map_err(|_| create_error())?
516                .into(),
517            // ---- Varchar -----
518            (DataType::Varchar, ValueType::String) => {
519                return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
520            }
521            (
522                DataType::Varchar,
523                ValueType::Bool
524                | ValueType::I64
525                | ValueType::I128
526                | ValueType::U64
527                | ValueType::U128
528                | ValueType::F64,
529            ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
530                value.to_string().into()
531            }
532            (
533                DataType::Varchar,
534                ValueType::Bool
535                | ValueType::I64
536                | ValueType::I128
537                | ValueType::U64
538                | ValueType::U128
539                | ValueType::F64
540                | ValueType::Array
541                | ValueType::Object,
542            ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
543                value.to_string().into()
544            }
545            // ---- Time -----
546            (DataType::Time, ValueType::String) => value
547                .as_str()
548                .unwrap()
549                .parse::<Time>()
550                .map_err(|_| create_error())?
551                .into(),
552            (
553                DataType::Time,
554                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
555            ) => value
556                .as_i64()
557                .map(|i| match self.time_handling {
558                    TimeHandling::Milli => Time::with_milli(i as u32),
559                    TimeHandling::Micro => Time::with_micro(i as u64),
560                })
561                .unwrap()
562                .map_err(|_| create_error())?
563                .into(),
564            // ---- Timestamp -----
565            (DataType::Timestamp, ValueType::String) => value
566                .as_str()
567                .unwrap()
568                .parse::<Timestamp>()
569                .map_err(|_| create_error())?
570                .into(),
571            (
572                DataType::Timestamp,
573                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
574            ) => {
575                match self.timestamp_handling {
576                    // Only when user configures debezium.time.precision.mode = 'connect',
577                    // the Milli branch will be executed
578                    TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
579                        .map_err(|_| create_error())?
580                        .into(),
581                    TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
582                        .map_err(|_| create_error())?
583                        .into(),
584                }
585            }
586            // ---- Timestamptz -----
587            (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
588                TimestamptzHandling::UtcWithoutSuffix => value
589                    .as_str()
590                    .unwrap()
591                    .parse::<Timestamp>()
592                    .map(|naive_utc| {
593                        Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
594                    })
595                    .map_err(|_| create_error())?
596                    .into(),
597                // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
598                _ => value
599                    .as_str()
600                    .unwrap()
601                    .parse::<Timestamptz>()
602                    .map_err(|_| create_error())?
603                    .into(),
604            },
605            (
606                DataType::Timestamptz,
607                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
608            ) => value
609                .as_i64()
610                .and_then(|num| match self.timestamptz_handling {
611                    TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
612                    TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
613                    TimestamptzHandling::Milli => Timestamptz::from_millis(num),
614                    // When explicitly requested string format, number without units are rejected.
615                    TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
616                })
617                .ok_or_else(create_error)?
618                .into(),
619            // ---- Interval -----
620            (DataType::Interval, ValueType::String) => value
621                .as_str()
622                .unwrap()
623                .parse::<Interval>()
624                .map_err(|_| create_error())?
625                .into(),
626            // ---- Struct -----
627            (DataType::Struct(struct_type_info), ValueType::Object) => {
628                // Collecting into a Result<Vec<_>> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead.
629                // https://github.com/rust-lang/rust/issues/48994
630                let mut fields = Vec::with_capacity(struct_type_info.len());
631                for (field_name, field_type) in struct_type_info.iter() {
632                    let field_value = json_object_get_case_insensitive(value, field_name)
633                            .unwrap_or_else(|| {
634                                let error = AccessError::Undefined {
635                                    name: field_name.to_owned(),
636                                    path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
637                                };
638                                // TODO: is it possible to unify the logging with the one in `do_action`?
639                                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =  LazyLock::new(LogSuppressor::default);
640                                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
641                                    tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
642                                }
643                                &BorrowedValue::Static(simd_json::StaticNode::Null)
644                            });
645                    fields.push(
646                        self.parse(field_value, field_type)
647                            .map(|d| d.to_owned_datum())?,
648                    );
649                }
650                StructValue::new(fields).into()
651            }
652
653            // String containing json object, e.g. "{\"a\": 1, \"b\": 2}"
654            // Try to parse it as json object.
655            (DataType::Struct(_), ValueType::String)
656                if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
657            {
658                // TODO: avoid copy by accepting `&mut BorrowedValue` in `parse` method.
659                let mut value = value.as_str().unwrap().as_bytes().to_vec();
660                let value =
661                    simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
662                return self
663                    .parse(&value, type_expected)
664                    .map(|d| d.to_owned_datum().into());
665            }
666
667            // ---- List -----
668            (DataType::List(list_type), ValueType::Array) => ListValue::new({
669                let item_type = list_type.elem();
670                let array = value.as_array().unwrap();
671                let mut builder = item_type.create_array_builder(array.len());
672                for v in array {
673                    let value = self.parse(v, item_type)?;
674                    builder.append(value);
675                }
676                builder.finish()
677            })
678            .into(),
679            // ---- Vector -----
680            (DataType::Vector(size), ValueType::Array) => {
681                let array = value.as_array().unwrap();
682                if array.len() != *size {
683                    Err(create_error())?
684                }
685                let mut elems = Vec::with_capacity(array.len());
686                for v in array {
687                    let value = match v.value_type() {
688                        ValueType::I64 | ValueType::I128 => {
689                            let i128_value = v.try_as_i128().map_err(|_| create_error())?;
690                            i128_value
691                                .to_string()
692                                .parse::<f32>()
693                                .map_err(|_| create_error())?
694                        }
695                        ValueType::U64 | ValueType::U128 => {
696                            let u128_value = v.try_as_u128().map_err(|_| create_error())?;
697                            u128_value
698                                .to_string()
699                                .parse::<f32>()
700                                .map_err(|_| create_error())?
701                        }
702                        ValueType::F64 => {
703                            let f64_value = v.try_as_f64().map_err(|_| create_error())?;
704                            if !f64_value.is_finite() {
705                                Err(create_error())?
706                            }
707                            f64_value
708                                .to_string()
709                                .parse::<f32>()
710                                .map_err(|_| create_error())?
711                        }
712                        _ => Err(create_error())?,
713                    };
714                    let finite = Finite32::try_from(value).map_err(|_| create_error())?;
715                    elems.push(finite);
716                }
717                VectorVal::from(elems).into()
718            }
719
720            // ---- Bytea -----
721            (DataType::Bytea, ValueType::String) => {
722                let value_str = value.as_str().unwrap();
723
724                match self.bytea_handling {
725                    ByteaHandling::Standard => {
726                        let mut buf = Vec::new();
727                        str_to_bytea(value_str, &mut buf).map_err(|_| create_error())?;
728                        buf.into()
729                    }
730                    ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
731                        .decode(value_str)
732                        .map_err(|_| create_error())?
733                        .into_boxed_slice()
734                        .into(),
735                }
736            }
737            // Handle Debezium PostGIS geometry type: {"srid": <int>, "wkb": <base64_string>}
738            // We extract the wkb field and decode it as EWKB bytes
739            (DataType::Bytea, ValueType::Object) => {
740                match try_parse_debezium_geometry_as_bytea(value, create_error)? {
741                    Some(bytes) => bytes.into(),
742                    None => Err(create_error())?,
743                }
744            }
745            // ---- Jsonb -----
746            (DataType::Jsonb, ValueType::String)
747                if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
748            {
749                // Check if this value is the Debezium unavailable value (TOAST handling for postgres-cdc).
750                // Debezium will base64 encode the bytea type placeholder.
751                // When a placeholder is encountered, it is converted into a jsonb format placeholder to match the original type.
752                match self.handle_toast_columns {
753                    true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
754                        .map_err(|_| create_error())?
755                        .into(),
756                    false => JsonbVal::from_str(value.as_str().unwrap())
757                        .map_err(|_| create_error())?
758                        .into(),
759                }
760            }
761            (DataType::Jsonb, _)
762                if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
763            {
764                let value: serde_json::Value =
765                    value.clone().try_into().map_err(|_| create_error())?;
766                JsonbVal::from(value).into()
767            }
768            // ---- Int256 -----
769            (
770                DataType::Int256,
771                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
772            ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
773
774            (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
775                .map_err(|_| create_error())?
776                .into(),
777
778            (_expected, _got) => Err(create_error())?,
779        };
780        Ok(DatumCow::Owned(Some(v)))
781    }
782}
783
784/// Try to decode a base64-encoded decimal string for unsigned bigint handling in Precise mode.
785///
786/// This is used when processing CDC data from upstream systems with unsigned bigint (e.g., MySQL CDC).
787/// When users configure `debezium.bigint.unsigned.handling.mode='precise'`, Debezium converts
788/// unsigned bigint to base64-encoded decimal.
789///
790/// Reference: <https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode>.
791fn try_base64_decode_decimal(
792    str_val: &str,
793    bigint_unsigned_handling: BigintUnsignedHandlingMode,
794    create_error: impl Fn() -> AccessError,
795) -> Result<Decimal, AccessError> {
796    match bigint_unsigned_handling {
797        BigintUnsignedHandlingMode::Precise => {
798            // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema
799            // instead of string, as described in <https://github.com/risingwavelabs/risingwave/issues/16852>.
800            // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this
801            // after implementing that functionality.
802            let value = base64::engine::general_purpose::STANDARD
803                .decode(str_val)
804                .map_err(|_| create_error())?;
805            let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value);
806            Decimal::from_str(&unscaled.to_string()).map_err(|_| create_error())
807        }
808        BigintUnsignedHandlingMode::Long => {
809            // In Long mode, don't attempt base64 decoding
810            Err(create_error())
811        }
812    }
813}
814
815pub struct JsonAccess<'a> {
816    value: BorrowedValue<'a>,
817    options: &'a JsonParseOptions,
818}
819
820impl<'a> JsonAccess<'a> {
821    pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
822        Self { value, options }
823    }
824
825    pub fn new(value: BorrowedValue<'a>) -> Self {
826        Self::new_with_options(value, &JsonParseOptions::DEFAULT)
827    }
828}
829
830impl Access for JsonAccess<'_> {
831    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
832        let mut value = &self.value;
833
834        for (idx, &key) in path.iter().enumerate() {
835            if let Some(sub_value) = if self.options.ignoring_keycase {
836                json_object_get_case_insensitive(value, key)
837            } else {
838                value.get(key)
839            } {
840                value = sub_value;
841            } else {
842                Err(AccessError::Undefined {
843                    name: key.to_owned(),
844                    path: path.iter().take(idx).join("."),
845                })?;
846            }
847        }
848
849        self.options.parse(value, type_expected)
850    }
851}
852
853/// Get a value from a json object by key, case insensitive.
854///
855/// Returns `None` if the given json value is not an object, or the key is not found.
856fn json_object_get_case_insensitive<'b>(
857    v: &'b simd_json::BorrowedValue<'b>,
858    key: &str,
859) -> Option<&'b simd_json::BorrowedValue<'b>> {
860    let obj = v.as_object()?;
861    let value = obj.get(key);
862    if value.is_some() {
863        return value; // fast path
864    }
865    for (k, v) in obj {
866        if k.eq_ignore_ascii_case(key) {
867            return Some(v);
868        }
869    }
870    None
871}