risingwave_connector/parser/unified/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::LazyLock;
17
18use base64::Engine;
19use itertools::Itertools;
20use num_bigint::BigInt;
21use risingwave_common::array::{ListValue, StructValue};
22use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{
25    DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
26    ToOwnedDatum,
27};
28use risingwave_connector_codec::decoder::utils::scaled_bigint_to_rust_decimal;
29use simd_json::base::ValueAsObject;
30use simd_json::prelude::{
31    TypedValue, ValueAsArray, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
32};
33use simd_json::{BorrowedValue, ValueType};
34use thiserror_ext::AsReport;
35
36use super::{Access, AccessError, AccessResult};
37use crate::parser::DatumCow;
38use crate::schema::{InvalidOptionError, bail_invalid_option_error};
39
40#[derive(Clone, Debug)]
41pub enum ByteaHandling {
42    Standard,
43    // debezium converts postgres bytea to base64 format
44    Base64,
45}
46#[derive(Clone, Debug)]
47pub enum TimeHandling {
48    Milli,
49    Micro,
50}
51
52#[derive(Clone, Debug)]
53pub enum TimestamptzHandling {
54    /// `"2024-04-11T02:00:00.123456Z"`
55    UtcString,
56    /// `"2024-04-11 02:00:00.123456"`
57    UtcWithoutSuffix,
58    /// `1712800800123`
59    Milli,
60    /// `1712800800123456`
61    Micro,
62    /// Both `1712800800123` (ms) and `1712800800123456` (us) maps to `2024-04-11`.
63    ///
64    /// Only works for `[1973-03-03 09:46:40, 5138-11-16 09:46:40)`.
65    ///
66    /// This option is backward compatible.
67    GuessNumberUnit,
68}
69
70impl TimestamptzHandling {
71    pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";
72
73    pub fn from_options(
74        options: &std::collections::BTreeMap<String, String>,
75    ) -> Result<Option<Self>, InvalidOptionError> {
76        let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
77            Some("utc_string") => Self::UtcString,
78            Some("utc_without_suffix") => Self::UtcWithoutSuffix,
79            Some("micro") => Self::Micro,
80            Some("milli") => Self::Milli,
81            Some("guess_number_unit") => Self::GuessNumberUnit,
82            Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v),
83            None => return Ok(None),
84        };
85        Ok(Some(mode))
86    }
87}
88
89#[derive(Clone, Debug)]
90pub enum TimestampHandling {
91    Milli,
92    GuessNumberUnit,
93}
94
95#[derive(Clone, Debug)]
96pub enum JsonValueHandling {
97    AsValue,
98    AsString,
99}
100#[derive(Clone, Debug)]
101pub enum NumericHandling {
102    Strict,
103    // should integer be parsed to float
104    Relax {
105        // should "3.14" be parsed to 3.14 in float
106        string_parsing: bool,
107    },
108}
109#[derive(Clone, Debug)]
110pub enum BooleanHandling {
111    Strict,
112    // should integer 1,0 be parsed to boolean (debezium)
113    Relax {
114        // should "True" "False" be parsed to true or false in boolean
115        string_parsing: bool,
116        // should string "1" "0" be paesed to boolean (cannal + mysql)
117        string_integer_parsing: bool,
118    },
119}
120
121#[derive(Clone, Debug)]
122pub enum VarcharHandling {
123    // do not allow other types cast to varchar
124    Strict,
125    // allow Json Value (Null, Bool, I64, I128, U64, U128, F64) cast to varchar
126    OnlyPrimaryTypes,
127    // allow all type cast to varchar (inc. Array, Object)
128    AllTypes,
129}
130
131#[derive(Clone, Debug)]
132pub enum StructHandling {
133    // only allow object parsed to struct
134    Strict,
135    // allow string containing a serialized json object (like "{\"a\": 1, \"b\": 2}") parsed to
136    // struct
137    AllowJsonString,
138}
139
140#[derive(Clone, Debug)]
141pub struct JsonParseOptions {
142    pub bytea_handling: ByteaHandling,
143    pub time_handling: TimeHandling,
144    pub timestamp_handling: TimestampHandling,
145    pub timestamptz_handling: TimestamptzHandling,
146    pub json_value_handling: JsonValueHandling,
147    pub numeric_handling: NumericHandling,
148    pub boolean_handling: BooleanHandling,
149    pub varchar_handling: VarcharHandling,
150    pub struct_handling: StructHandling,
151    pub ignoring_keycase: bool,
152    pub handle_toast_columns: bool,
153}
154
155impl Default for JsonParseOptions {
156    fn default() -> Self {
157        Self::DEFAULT.clone()
158    }
159}
160
161impl JsonParseOptions {
162    pub const CANAL: JsonParseOptions = JsonParseOptions {
163        bytea_handling: ByteaHandling::Standard,
164        time_handling: TimeHandling::Micro,
165        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
166        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
167        json_value_handling: JsonValueHandling::AsValue,
168        numeric_handling: NumericHandling::Relax {
169            string_parsing: true,
170        },
171        boolean_handling: BooleanHandling::Relax {
172            string_parsing: true,
173            string_integer_parsing: true,
174        },
175        varchar_handling: VarcharHandling::Strict,
176        struct_handling: StructHandling::Strict,
177        ignoring_keycase: true,
178        handle_toast_columns: false,
179    };
180    pub const DEFAULT: JsonParseOptions = JsonParseOptions {
181        bytea_handling: ByteaHandling::Standard,
182        time_handling: TimeHandling::Micro,
183        timestamp_handling: TimestampHandling::GuessNumberUnit, // backward-compatible
184        timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
185        json_value_handling: JsonValueHandling::AsValue,
186        numeric_handling: NumericHandling::Relax {
187            string_parsing: true,
188        },
189        boolean_handling: BooleanHandling::Strict,
190        varchar_handling: VarcharHandling::OnlyPrimaryTypes,
191        struct_handling: StructHandling::AllowJsonString,
192        ignoring_keycase: true,
193        handle_toast_columns: false,
194    };
195
196    pub fn new_for_debezium(
197        timestamptz_handling: TimestamptzHandling,
198        timestamp_handling: TimestampHandling,
199        time_handling: TimeHandling,
200        handle_toast_columns: bool,
201    ) -> Self {
202        Self {
203            bytea_handling: ByteaHandling::Base64,
204            time_handling,
205            timestamp_handling,
206            timestamptz_handling,
207            json_value_handling: JsonValueHandling::AsString,
208            numeric_handling: NumericHandling::Relax {
209                string_parsing: false,
210            },
211            boolean_handling: BooleanHandling::Relax {
212                string_parsing: false,
213                string_integer_parsing: false,
214            },
215            varchar_handling: VarcharHandling::Strict,
216            struct_handling: StructHandling::Strict,
217            ignoring_keycase: true,
218            handle_toast_columns,
219        }
220    }
221
222    pub fn parse<'a>(
223        &self,
224        value: &'a BorrowedValue<'a>,
225        type_expected: &DataType,
226    ) -> AccessResult<DatumCow<'a>> {
227        let create_error = || AccessError::TypeError {
228            expected: format!("{:?}", type_expected),
229            got: value.value_type().to_string(),
230            value: value.to_string(),
231        };
232        let v: ScalarImpl = match (type_expected, value.value_type()) {
233            (_, ValueType::Null) => return Ok(DatumCow::NULL),
234            // ---- Boolean -----
235            (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),
236
237            (
238                DataType::Boolean,
239                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
240            ) if matches!(self.boolean_handling, BooleanHandling::Relax { .. })
241                && matches!(value.as_i64(), Some(0i64) | Some(1i64)) =>
242            {
243                (value.as_i64() == Some(1i64)).into()
244            }
245
246            (DataType::Boolean, ValueType::String)
247                if matches!(
248                    self.boolean_handling,
249                    BooleanHandling::Relax {
250                        string_parsing: true,
251                        ..
252                    }
253                ) =>
254            {
255                match value.as_str().unwrap().to_lowercase().as_str() {
256                    "true" => true.into(),
257                    "false" => false.into(),
258                    c @ ("1" | "0")
259                        if matches!(
260                            self.boolean_handling,
261                            BooleanHandling::Relax {
262                                string_parsing: true,
263                                string_integer_parsing: true
264                            }
265                        ) =>
266                    {
267                        if c == "1" {
268                            true.into()
269                        } else {
270                            false.into()
271                        }
272                    }
273                    _ => Err(create_error())?,
274                }
275            }
276            // ---- Int16 -----
277            (
278                DataType::Int16,
279                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
280            ) => value.try_as_i16().map_err(|_| create_error())?.into(),
281
282            (DataType::Int16, ValueType::String)
283                if matches!(
284                    self.numeric_handling,
285                    NumericHandling::Relax {
286                        string_parsing: true
287                    }
288                ) =>
289            {
290                value
291                    .as_str()
292                    .unwrap()
293                    .parse::<i16>()
294                    .map_err(|_| create_error())?
295                    .into()
296            }
297            // ---- Int32 -----
298            (
299                DataType::Int32,
300                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
301            ) => value.try_as_i32().map_err(|_| create_error())?.into(),
302
303            (DataType::Int32, ValueType::String)
304                if matches!(
305                    self.numeric_handling,
306                    NumericHandling::Relax {
307                        string_parsing: true
308                    }
309                ) =>
310            {
311                value
312                    .as_str()
313                    .unwrap()
314                    .parse::<i32>()
315                    .map_err(|_| create_error())?
316                    .into()
317            }
318            // ---- Int64 -----
319            (
320                DataType::Int64,
321                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
322            ) => value.try_as_i64().map_err(|_| create_error())?.into(),
323
324            (DataType::Int64, ValueType::String)
325                if matches!(
326                    self.numeric_handling,
327                    NumericHandling::Relax {
328                        string_parsing: true
329                    }
330                ) =>
331            {
332                value
333                    .as_str()
334                    .unwrap()
335                    .parse::<i64>()
336                    .map_err(|_| create_error())?
337                    .into()
338            }
339            // ---- Float32 -----
340            (
341                DataType::Float32,
342                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
343            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
344                (value.try_as_i64().map_err(|_| create_error())? as f32).into()
345            }
346            (DataType::Float32, ValueType::String)
347                if matches!(
348                    self.numeric_handling,
349                    NumericHandling::Relax {
350                        string_parsing: true
351                    }
352                ) =>
353            {
354                value
355                    .as_str()
356                    .unwrap()
357                    .parse::<f32>()
358                    .map_err(|_| create_error())?
359                    .into()
360            }
361            (DataType::Float32, ValueType::F64) => {
362                value.try_as_f32().map_err(|_| create_error())?.into()
363            }
364            // ---- Float64 -----
365            (
366                DataType::Float64,
367                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
368            ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => {
369                (value.try_as_i64().map_err(|_| create_error())? as f64).into()
370            }
371            (DataType::Float64, ValueType::String)
372                if matches!(
373                    self.numeric_handling,
374                    NumericHandling::Relax {
375                        string_parsing: true
376                    }
377                ) =>
378            {
379                value
380                    .as_str()
381                    .unwrap()
382                    .parse::<f64>()
383                    .map_err(|_| create_error())?
384                    .into()
385            }
386            (DataType::Float64, ValueType::F64) => {
387                value.try_as_f64().map_err(|_| create_error())?.into()
388            }
389            // ---- Decimal -----
390            (DataType::Decimal, ValueType::I128 | ValueType::U128) => {
391                Decimal::from_str(&value.try_as_i128().map_err(|_| create_error())?.to_string())
392                    .map_err(|_| create_error())?
393                    .into()
394            }
395            (DataType::Decimal, ValueType::I64 | ValueType::U64) => {
396                Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into()
397            }
398
399            (DataType::Decimal, ValueType::F64) => {
400                Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?)
401                    .map_err(|_| create_error())?
402                    .into()
403            }
404            (DataType::Decimal, ValueType::String) => {
405                let str = value.as_str().unwrap();
406                // the following values are special string generated by Debezium and should be handled separately
407                match str {
408                    "NAN" => ScalarImpl::Decimal(Decimal::NaN),
409                    "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf),
410                    "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf),
411                    _ => {
412                        ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?)
413                    }
414                }
415            }
416            (DataType::Decimal, ValueType::Object) => {
417                // ref https://github.com/risingwavelabs/risingwave/issues/10628
418                // handle debezium json (variable scale): {"scale": int, "value": bytes}
419                let scale = value
420                    .get("scale")
421                    .ok_or_else(create_error)?
422                    .as_i32()
423                    .unwrap();
424                let value = value
425                    .get("value")
426                    .ok_or_else(create_error)?
427                    .as_str()
428                    .unwrap()
429                    .as_bytes();
430                let unscaled = BigInt::from_signed_bytes_be(value);
431                let decimal = scaled_bigint_to_rust_decimal(unscaled, scale as _)?;
432                ScalarImpl::Decimal(Decimal::Normalized(decimal))
433            }
434            // ---- Date -----
435            (
436                DataType::Date,
437                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
438            ) => Date::with_days_since_unix_epoch(value.try_as_i32().map_err(|_| create_error())?)
439                .map_err(|_| create_error())?
440                .into(),
441            (DataType::Date, ValueType::String) => value
442                .as_str()
443                .unwrap()
444                .parse::<Date>()
445                .map_err(|_| create_error())?
446                .into(),
447            // ---- Varchar -----
448            (DataType::Varchar, ValueType::String) => {
449                return Ok(DatumCow::Borrowed(Some(value.as_str().unwrap().into())));
450            }
451            (
452                DataType::Varchar,
453                ValueType::Bool
454                | ValueType::I64
455                | ValueType::I128
456                | ValueType::U64
457                | ValueType::U128
458                | ValueType::F64,
459            ) if matches!(self.varchar_handling, VarcharHandling::OnlyPrimaryTypes) => {
460                value.to_string().into()
461            }
462            (
463                DataType::Varchar,
464                ValueType::Bool
465                | ValueType::I64
466                | ValueType::I128
467                | ValueType::U64
468                | ValueType::U128
469                | ValueType::F64
470                | ValueType::Array
471                | ValueType::Object,
472            ) if matches!(self.varchar_handling, VarcharHandling::AllTypes) => {
473                value.to_string().into()
474            }
475            // ---- Time -----
476            (DataType::Time, ValueType::String) => value
477                .as_str()
478                .unwrap()
479                .parse::<Time>()
480                .map_err(|_| create_error())?
481                .into(),
482            (
483                DataType::Time,
484                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
485            ) => value
486                .as_i64()
487                .map(|i| match self.time_handling {
488                    TimeHandling::Milli => Time::with_milli(i as u32),
489                    TimeHandling::Micro => Time::with_micro(i as u64),
490                })
491                .unwrap()
492                .map_err(|_| create_error())?
493                .into(),
494            // ---- Timestamp -----
495            (DataType::Timestamp, ValueType::String) => value
496                .as_str()
497                .unwrap()
498                .parse::<Timestamp>()
499                .map_err(|_| create_error())?
500                .into(),
501            (
502                DataType::Timestamp,
503                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
504            ) => {
505                match self.timestamp_handling {
506                    // Only when user configures debezium.time.precision.mode = 'connect',
507                    // the Milli branch will be executed
508                    TimestampHandling::Milli => Timestamp::with_millis(value.as_i64().unwrap())
509                        .map_err(|_| create_error())?
510                        .into(),
511                    TimestampHandling::GuessNumberUnit => i64_to_timestamp(value.as_i64().unwrap())
512                        .map_err(|_| create_error())?
513                        .into(),
514                }
515            }
516            // ---- Timestamptz -----
517            (DataType::Timestamptz, ValueType::String) => match self.timestamptz_handling {
518                TimestamptzHandling::UtcWithoutSuffix => value
519                    .as_str()
520                    .unwrap()
521                    .parse::<Timestamp>()
522                    .map(|naive_utc| {
523                        Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
524                    })
525                    .map_err(|_| create_error())?
526                    .into(),
527                // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
528                _ => value
529                    .as_str()
530                    .unwrap()
531                    .parse::<Timestamptz>()
532                    .map_err(|_| create_error())?
533                    .into(),
534            },
535            (
536                DataType::Timestamptz,
537                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
538            ) => value
539                .as_i64()
540                .and_then(|num| match self.timestamptz_handling {
541                    TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(),
542                    TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)),
543                    TimestamptzHandling::Milli => Timestamptz::from_millis(num),
544                    // When explicitly requested string format, number without units are rejected.
545                    TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None,
546                })
547                .ok_or_else(create_error)?
548                .into(),
549            // ---- Interval -----
550            (DataType::Interval, ValueType::String) => value
551                .as_str()
552                .unwrap()
553                .parse::<Interval>()
554                .map_err(|_| create_error())?
555                .into(),
556            // ---- Struct -----
557            (DataType::Struct(struct_type_info), ValueType::Object) => {
558                // Collecting into a Result<Vec<_>> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead.
559                // https://github.com/rust-lang/rust/issues/48994
560                let mut fields = Vec::with_capacity(struct_type_info.len());
561                for (field_name, field_type) in struct_type_info.iter() {
562                    let field_value = json_object_get_case_insensitive(value, field_name)
563                            .unwrap_or_else(|| {
564                                let error = AccessError::Undefined {
565                                    name: field_name.to_owned(),
566                                    path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
567                                };
568                                // TODO: is it possible to unify the logging with the one in `do_action`?
569                                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =  LazyLock::new(LogSuppresser::default);
570                                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
571                                    tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
572                                }
573                                &BorrowedValue::Static(simd_json::StaticNode::Null)
574                            });
575                    fields.push(
576                        self.parse(field_value, field_type)
577                            .map(|d| d.to_owned_datum())?,
578                    );
579                }
580                StructValue::new(fields).into()
581            }
582
583            // String containing json object, e.g. "{\"a\": 1, \"b\": 2}"
584            // Try to parse it as json object.
585            (DataType::Struct(_), ValueType::String)
586                if matches!(self.struct_handling, StructHandling::AllowJsonString) =>
587            {
588                // TODO: avoid copy by accepting `&mut BorrowedValue` in `parse` method.
589                let mut value = value.as_str().unwrap().as_bytes().to_vec();
590                let value =
591                    simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
592                return self
593                    .parse(&value, type_expected)
594                    .map(|d| d.to_owned_datum().into());
595            }
596
597            // ---- List -----
598            (DataType::List(item_type), ValueType::Array) => ListValue::new({
599                let array = value.as_array().unwrap();
600                let mut builder = item_type.create_array_builder(array.len());
601                for v in array {
602                    let value = self.parse(v, item_type)?;
603                    builder.append(value);
604                }
605                builder.finish()
606            })
607            .into(),
608
609            // ---- Bytea -----
610            (DataType::Bytea, ValueType::String) => {
611                let value_str = value.as_str().unwrap();
612
613                match self.bytea_handling {
614                    ByteaHandling::Standard => {
615                        str_to_bytea(value_str).map_err(|_| create_error())?.into()
616                    }
617                    ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD
618                        .decode(value_str)
619                        .map_err(|_| create_error())?
620                        .into_boxed_slice()
621                        .into(),
622                }
623            }
624            // ---- Jsonb -----
625            (DataType::Jsonb, ValueType::String)
626                if matches!(self.json_value_handling, JsonValueHandling::AsString) =>
627            {
628                // Check if this value is the Debezium unavailable value (TOAST handling for postgres-cdc).
629                // Debezium will base64 encode the bytea type placeholder.
630                // When a placeholder is encountered, it is converted into a jsonb format placeholder to match the original type.
631                match self.handle_toast_columns {
632                    true => JsonbVal::from_debezium_unavailable_value(value.as_str().unwrap())
633                        .map_err(|_| create_error())?
634                        .into(),
635                    false => JsonbVal::from_str(value.as_str().unwrap())
636                        .map_err(|_| create_error())?
637                        .into(),
638                }
639            }
640            (DataType::Jsonb, _)
641                if matches!(self.json_value_handling, JsonValueHandling::AsValue) =>
642            {
643                let value: serde_json::Value =
644                    value.clone().try_into().map_err(|_| create_error())?;
645                JsonbVal::from(value).into()
646            }
647            // ---- Int256 -----
648            (
649                DataType::Int256,
650                ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128,
651            ) => Int256::from(value.try_as_i64().map_err(|_| create_error())?).into(),
652
653            (DataType::Int256, ValueType::String) => Int256::from_str(value.as_str().unwrap())
654                .map_err(|_| create_error())?
655                .into(),
656
657            (_expected, _got) => Err(create_error())?,
658        };
659        Ok(DatumCow::Owned(Some(v)))
660    }
661}
662
663pub struct JsonAccess<'a> {
664    value: BorrowedValue<'a>,
665    options: &'a JsonParseOptions,
666}
667
668impl<'a> JsonAccess<'a> {
669    pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
670        Self { value, options }
671    }
672
673    pub fn new(value: BorrowedValue<'a>) -> Self {
674        Self::new_with_options(value, &JsonParseOptions::DEFAULT)
675    }
676}
677
678impl Access for JsonAccess<'_> {
679    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
680        let mut value = &self.value;
681
682        for (idx, &key) in path.iter().enumerate() {
683            if let Some(sub_value) = if self.options.ignoring_keycase {
684                json_object_get_case_insensitive(value, key)
685            } else {
686                value.get(key)
687            } {
688                value = sub_value;
689            } else {
690                Err(AccessError::Undefined {
691                    name: key.to_owned(),
692                    path: path.iter().take(idx).join("."),
693                })?;
694            }
695        }
696
697        self.options.parse(value, type_expected)
698    }
699}
700
701/// Get a value from a json object by key, case insensitive.
702///
703/// Returns `None` if the given json value is not an object, or the key is not found.
704fn json_object_get_case_insensitive<'b>(
705    v: &'b simd_json::BorrowedValue<'b>,
706    key: &str,
707) -> Option<&'b simd_json::BorrowedValue<'b>> {
708    let obj = v.as_object()?;
709    let value = obj.get(key);
710    if value.is_some() {
711        return value; // fast path
712    }
713    for (k, v) in obj {
714        if k.eq_ignore_ascii_case(key) {
715            return Some(v);
716        }
717    }
718    None
719}