risingwave_connector_codec/decoder/avro/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod schema;
16
17use apache_avro::Schema;
18use apache_avro::schema::{DecimalSchema, NamesRef, UnionSchema};
19use apache_avro::types::{Value, ValueKind};
20use chrono::Datelike;
21use itertools::Itertools;
22use num_bigint::BigInt;
23use risingwave_common::array::{ListValue, StructValue};
24use risingwave_common::types::{
25    DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
26    Timestamptz, ToOwnedDatum,
27};
28
29pub use self::schema::{MapHandling, ResolvedAvroSchema, avro_schema_to_fields};
30use super::utils::scaled_bigint_to_rust_decimal;
31use super::{Access, AccessError, AccessResult, bail_uncategorized, uncategorized};
32use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
33
34#[derive(Clone)]
35/// Options for parsing an `AvroValue` into Datum, with a root avro schema.
36pub struct AvroParseOptions<'a> {
37    /// The avro schema at root level
38    root_schema: &'a Schema,
39    /// The immutable "global" context during recursive parsing
40    inner: AvroParseOptionsInner<'a>,
41}
42
43#[derive(Clone)]
44/// Options for parsing an `AvroValue` into Datum, with names resolved from root schema.
45struct AvroParseOptionsInner<'a> {
46    /// Mapping from type names to actual schema
47    refs: NamesRef<'a>,
48    /// Strict Mode
49    /// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
50    relax_numeric: bool,
51}
52
53impl<'a> AvroParseOptions<'a> {
54    pub fn create(root_schema: &'a Schema) -> Self {
55        let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema)
56            .expect("avro schema is self contained");
57        Self {
58            root_schema,
59            inner: AvroParseOptionsInner {
60                refs: resolved.get_names().clone(),
61                relax_numeric: true,
62            },
63        }
64    }
65}
66
67impl<'a> AvroParseOptionsInner<'a> {
68    fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema {
69        match schema {
70            Schema::Ref { name } => self.refs[name],
71            _ => schema,
72        }
73    }
74
75    /// Parse an avro value into expected type.
76    ///
77    /// 3 kinds of type info are used to parsing:
78    /// - `type_expected`. The type that we expect the value is.
79    /// - value type. The type info together with the value argument.
80    /// - schema. The `AvroSchema` provided in option.
81    ///
82    /// Cases: (FIXME: Is this precise?)
83    /// - If both `type_expected` and schema are provided, it will check both strictly.
84    /// - If only `type_expected` is provided, it will try to match the value type
85    ///   and the `type_expected`, converting the value if possible.
86    /// - If only value is provided (without schema and `type_expected`),
87    ///   the `DataType` will be inferred.
88    fn convert_to_datum<'b>(
89        &self,
90        unresolved_schema: &'a Schema,
91        value: &'b Value,
92        type_expected: &DataType,
93    ) -> AccessResult<DatumCow<'b>>
94    where
95        'b: 'a,
96    {
97        let create_error = || AccessError::TypeError {
98            expected: format!("{:?}", type_expected),
99            got: format!("{:?}", value),
100            value: String::new(),
101        };
102
103        macro_rules! borrowed {
104            ($v:expr) => {
105                return Ok(DatumCow::Borrowed(Some($v.into())))
106            };
107        }
108
109        let v: ScalarImpl = match (type_expected, value) {
110            (_, Value::Null) => return Ok(DatumCow::NULL),
111            // ---- Union (with >=2 non null variants), and nullable Union ([null, record]) -----
112            (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
113                let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
114                    // XXX: Is this branch actually unreachable? (if self.schema is correctly used)
115                    return Err(create_error());
116                };
117
118                if let Some(inner) = get_nullable_union_inner(u) {
119                    // nullable Union ([null, record])
120                    return self.convert_to_datum(inner, v, type_expected);
121                }
122                let variant_schema = &u.variants()[*variant as usize];
123
124                if matches!(variant_schema, &Schema::Null) {
125                    return Ok(DatumCow::NULL);
126                }
127
128                // Here we compare the field name, instead of using the variant idx to find the field idx.
129                // The latter approach might also work, but might be more error-prone.
130                // We will need to get the index of the "null" variant, and then re-map the variant index to the field index.
131                // XXX: probably we can unwrap here (if self.schema is correctly used)
132                let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
133
134                let mut fields = Vec::with_capacity(struct_type_info.len());
135                for (field_name, field_type) in struct_type_info.iter() {
136                    if field_name == expected_field_name {
137                        let datum = self
138                            .convert_to_datum(variant_schema, v, field_type)?
139                            .to_owned_datum();
140
141                        fields.push(datum)
142                    } else {
143                        fields.push(None)
144                    }
145                }
146                StructValue::new(fields).into()
147            }
148            // nullable Union ([null, T])
149            (_, Value::Union(_, v)) => {
150                let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
151                    return Err(create_error());
152                };
153                let Some(schema) = get_nullable_union_inner(u) else {
154                    return Err(create_error());
155                };
156                return self.convert_to_datum(schema, v, type_expected);
157            }
158            // ---- Boolean -----
159            (DataType::Boolean, Value::Boolean(b)) => (*b).into(),
160            // ---- Int16 -----
161            (DataType::Int16, Value::Int(i)) if self.relax_numeric => (*i as i16).into(),
162            (DataType::Int16, Value::Long(i)) if self.relax_numeric => (*i as i16).into(),
163
164            // ---- Int32 -----
165            (DataType::Int32, Value::Int(i)) => (*i).into(),
166            (DataType::Int32, Value::Long(i)) if self.relax_numeric => (*i as i32).into(),
167            // ---- Int64 -----
168            (DataType::Int64, Value::Long(i)) => (*i).into(),
169            (DataType::Int64, Value::Int(i)) if self.relax_numeric => (*i as i64).into(),
170            // ---- Float32 -----
171            (DataType::Float32, Value::Float(i)) => (*i).into(),
172            (DataType::Float32, Value::Double(i)) => (*i as f32).into(),
173            // ---- Float64 -----
174            (DataType::Float64, Value::Double(i)) => (*i).into(),
175            (DataType::Float64, Value::Float(i)) => (*i as f64).into(),
176            // ---- Decimal -----
177            (DataType::Decimal, Value::Decimal(avro_decimal)) => {
178                let (_precision, scale) = match self.lookup_ref(unresolved_schema) {
179                    Schema::Decimal(DecimalSchema {
180                        precision, scale, ..
181                    }) => (*precision, *scale),
182                    _ => Err(create_error())?,
183                };
184                let decimal = scaled_bigint_to_rust_decimal(avro_decimal.clone().into(), scale)
185                    .map_err(|_| create_error())?;
186                ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
187            }
188            (DataType::Decimal, Value::Record(fields)) => {
189                // VariableScaleDecimal has fixed fields, scale(int) and value(bytes)
190                let find_in_records = |field_name: &str| {
191                    fields
192                        .iter()
193                        .find(|field| field.0 == field_name)
194                        .map(|field| &field.1)
195                        .ok_or_else(|| {
196                            uncategorized!("`{field_name}` field not found in VariableScaleDecimal")
197                        })
198                };
199                let scale = match find_in_records("scale")? {
200                    Value::Int(scale) => *scale,
201                    avro_value => bail_uncategorized!(
202                        "scale field in VariableScaleDecimal is not int, got {:?}",
203                        avro_value
204                    ),
205                };
206
207                let value: BigInt = match find_in_records("value")? {
208                    Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes),
209                    avro_value => bail_uncategorized!(
210                        "value field in VariableScaleDecimal is not bytes, got {:?}",
211                        avro_value
212                    ),
213                };
214
215                let decimal = scaled_bigint_to_rust_decimal(value, scale as _)?;
216                ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
217            }
218            // ---- Time -----
219            (DataType::Time, Value::TimeMillis(ms)) => Time::with_milli(*ms as u32)
220                .map_err(|_| create_error())?
221                .into(),
222            (DataType::Time, Value::TimeMicros(us)) => Time::with_micro(*us as u64)
223                .map_err(|_| create_error())?
224                .into(),
225            // ---- Date -----
226            (DataType::Date, Value::Date(days)) => {
227                Date::with_days_since_ce(days + unix_epoch_days())
228                    .map_err(|_| create_error())?
229                    .into()
230            }
231            // ---- Varchar -----
232            (DataType::Varchar, Value::Enum(_, symbol)) => borrowed!(symbol.as_str()),
233            (DataType::Varchar, Value::String(s)) => borrowed!(s.as_str()),
234            // ---- Timestamp -----
235            (DataType::Timestamp, Value::LocalTimestampMillis(ms)) => Timestamp::with_millis(*ms)
236                .map_err(|_| create_error())?
237                .into(),
238            (DataType::Timestamp, Value::LocalTimestampMicros(us)) => Timestamp::with_micros(*us)
239                .map_err(|_| create_error())?
240                .into(),
241
242            // ---- TimestampTz -----
243            (DataType::Timestamptz, Value::TimestampMillis(ms)) => Timestamptz::from_millis(*ms)
244                .ok_or_else(|| {
245                    uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range")
246                })?
247                .into(),
248            (DataType::Timestamptz, Value::TimestampMicros(us)) => {
249                Timestamptz::from_micros(*us).into()
250            }
251
252            // ---- Interval -----
253            (DataType::Interval, Value::Duration(duration)) => {
254                let months = u32::from(duration.months()) as i32;
255                let days = u32::from(duration.days()) as i32;
256                let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
257                ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
258            }
259            // ---- Struct -----
260            (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({
261                let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else {
262                    return Err(create_error());
263                };
264                struct_type_info
265                    .iter()
266                    .map(|(field_name, field_type)| {
267                        if let Some(idx) = record_schema.lookup.get(field_name) {
268                            let value = &descs[*idx].1;
269                            let schema = &record_schema.fields[*idx].schema;
270                            Ok(self
271                                .convert_to_datum(schema, value, field_type)?
272                                .to_owned_datum())
273                        } else {
274                            Ok(None)
275                        }
276                    })
277                    .collect::<Result<_, AccessError>>()?
278            })
279            .into(),
280            // ---- List -----
281            (DataType::List(list_type), Value::Array(array)) => ListValue::new({
282                let Schema::Array(array_schema) = self.lookup_ref(unresolved_schema) else {
283                    return Err(create_error());
284                };
285                let schema = &array_schema.items;
286                let elem_type = list_type.elem();
287                let mut builder = elem_type.create_array_builder(array.len());
288                for v in array {
289                    let value = self.convert_to_datum(schema, v, elem_type)?;
290                    builder.append(value);
291                }
292                builder.finish()
293            })
294            .into(),
295            // ---- Bytea -----
296            (DataType::Bytea, Value::Bytes(value)) => borrowed!(value.as_slice()),
297            // ---- Jsonb -----
298            (DataType::Jsonb, v @ Value::Map(_)) => {
299                let mut builder = jsonbb::Builder::default();
300                avro_to_jsonb(v, &mut builder)?;
301                let jsonb = builder.finish();
302                debug_assert!(jsonb.as_ref().is_object());
303                JsonbVal::from(jsonb).into()
304            }
305            (DataType::Varchar, Value::Uuid(uuid)) => {
306                uuid.as_hyphenated().to_string().into_boxed_str().into()
307            }
308            (DataType::Map(map_type), Value::Map(map)) => {
309                let Schema::Map(map_schema) = self.lookup_ref(unresolved_schema) else {
310                    return Err(create_error());
311                };
312                let schema = &map_schema.types;
313                let mut builder = map_type
314                    .clone()
315                    .into_struct()
316                    .create_array_builder(map.len());
317                // Since the map is HashMap, we can ensure
318                // key is non-null and unique, keys and values have the same length.
319
320                // NOTE: HashMap's iter order is non-deterministic, but MapValue's
321                // order matters. We sort by key here to have deterministic order
322                // in tests. We might consider removing this, or make all MapValue sorted
323                // in the future.
324                for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) {
325                    let value_datum = self
326                        .convert_to_datum(schema, v, map_type.value())?
327                        .to_owned_datum();
328                    builder.append(
329                        StructValue::new(vec![Some(k.as_str().into()), value_datum])
330                            .to_owned_datum(),
331                    );
332                }
333                let list = ListValue::new(builder.finish());
334                MapValue::from_entries(list).into()
335            }
336
337            (_expected, _got) => Err(create_error())?,
338        };
339        Ok(DatumCow::Owned(Some(v)))
340    }
341}
342
343pub struct AvroAccess<'a> {
344    value: &'a Value,
345    options: AvroParseOptions<'a>,
346}
347
348impl<'a> AvroAccess<'a> {
349    pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self {
350        Self {
351            value: root_value,
352            options,
353        }
354    }
355}
356
357impl Access for AvroAccess<'_> {
358    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
359        let mut value = self.value;
360        let mut unresolved_schema = self.options.root_schema;
361
362        debug_assert!(
363            path.len() == 1
364                || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
365            "unexpected path access: {:?}",
366            path
367        );
368        let mut i = 0;
369        while i < path.len() {
370            let key = path[i];
371            let create_error = || AccessError::Undefined {
372                name: key.to_owned(),
373                path: path.iter().take(i).join("."),
374            };
375            match value {
376                Value::Union(_, v) => {
377                    // The debezium "before" field is a nullable union.
378                    // "fields": [
379                    // {
380                    //     "name": "before",
381                    //     "type": [
382                    //         "null",
383                    //         {
384                    //             "type": "record",
385                    //             "name": "Value",
386                    //             "fields": [...],
387                    //         }
388                    //     ],
389                    //     "default": null
390                    // },
391                    // {
392                    //     "name": "after",
393                    //     "type": [
394                    //         "null",
395                    //         "Value"
396                    //     ],
397                    //     "default": null
398                    // },
399                    // ...]
400                    value = v;
401                    let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else {
402                        return Err(create_error());
403                    };
404                    let Some(schema) = get_nullable_union_inner(u) else {
405                        return Err(create_error());
406                    };
407                    unresolved_schema = schema;
408                    continue;
409                }
410                Value::Record(fields) => {
411                    let Schema::Record(record_schema) =
412                        self.options.inner.lookup_ref(unresolved_schema)
413                    else {
414                        return Err(create_error());
415                    };
416                    if let Some(idx) = record_schema.lookup.get(key) {
417                        value = &fields[*idx].1;
418                        unresolved_schema = &record_schema.fields[*idx].schema;
419                        i += 1;
420                        continue;
421                    }
422                }
423                _ => (),
424            }
425            Err(create_error())?;
426        }
427
428        self.options
429            .inner
430            .convert_to_datum(unresolved_schema, value, type_expected)
431    }
432}
433
434/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`.
435pub fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
436    let variants = union_schema.variants();
437    // Note: `[null, null] is invalid`, we don't need to worry about that.
438    if variants.len() == 2 && variants.contains(&Schema::Null) {
439        let inner_schema = variants
440            .iter()
441            .find(|s| !matches!(s, &&Schema::Null))
442            .unwrap();
443        Some(inner_schema)
444    } else {
445        None
446    }
447}
448
449pub(crate) fn unix_epoch_days() -> i32 {
450    Date::from_ymd_uncheck(1970, 1, 1).0.num_days_from_ce()
451}
452
453pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> AccessResult<()> {
454    match avro {
455        Value::Null => builder.add_null(),
456        Value::Boolean(b) => builder.add_bool(*b),
457        Value::Int(i) => builder.add_i64(*i as i64),
458        Value::String(s) => builder.add_string(s),
459        Value::Map(m) => {
460            builder.begin_object();
461            for (k, v) in m {
462                builder.add_string(k);
463                avro_to_jsonb(v, builder)?;
464            }
465            builder.end_object()
466        }
467        // same representation as map
468        Value::Record(r) => {
469            builder.begin_object();
470            for (k, v) in r {
471                builder.add_string(k);
472                avro_to_jsonb(v, builder)?;
473            }
474            builder.end_object()
475        }
476        Value::Array(a) => {
477            builder.begin_array();
478            for v in a {
479                avro_to_jsonb(v, builder)?;
480            }
481            builder.end_array()
482        }
483
484        // TODO: figure out where the following encoding is reasonable before enabling them.
485        // See discussions: https://github.com/risingwavelabs/risingwave/pull/16948
486
487        // jsonbb supports int64, but JSON spec does not allow it. How should we handle it?
488        // BTW, protobuf canonical JSON converts int64 to string.
489        // Value::Long(l) => builder.add_i64(*l),
490        // Value::Float(f) => {
491        //     if f.is_nan() || f.is_infinite() {
492        //         // XXX: pad null or return err here?
493        //         builder.add_null()
494        //     } else {
495        //         builder.add_f64(*f as f64)
496        //     }
497        // }
498        // Value::Double(f) => {
499        //     if f.is_nan() || f.is_infinite() {
500        //         // XXX: pad null or return err here?
501        //         builder.add_null()
502        //     } else {
503        //         builder.add_f64(*f)
504        //     }
505        // }
506        // // XXX: What encoding to use?
507        // // ToText is \x plus hex string.
508        // Value::Bytes(b) => builder.add_string(&ToText::to_text(&b.as_slice())),
509        // Value::Enum(_, symbol) => {
510        //     builder.add_string(&symbol);
511        // }
512        // Value::Uuid(id) => builder.add_string(&id.as_hyphenated().to_string()),
513        // // For Union, one concern is that the avro union is tagged (like rust enum) but json union is untagged (like c union).
514        // // When the union consists of multiple records, it is possible to distinguish which variant is active in avro, but in json they will all become jsonb objects and indistinguishable.
515        // Value::Union(_, v) => avro_to_jsonb(v, builder)?
516        // XXX: pad null or return err here?
517        v @ (Value::Long(_)
518        | Value::Float(_)
519        | Value::Double(_)
520        | Value::Bytes(_)
521        | Value::Enum(_, _)
522        | Value::Fixed(_, _)
523        | Value::Date(_)
524        | Value::Decimal(_)
525        | Value::BigDecimal(_)
526        | Value::TimeMillis(_)
527        | Value::TimeMicros(_)
528        | Value::TimestampMillis(_)
529        | Value::TimestampMicros(_)
530        | Value::TimestampNanos(_)
531        | Value::LocalTimestampMillis(_)
532        | Value::LocalTimestampMicros(_)
533        | Value::LocalTimestampNanos(_)
534        | Value::Duration(_)
535        | Value::Uuid(_)
536        | Value::Union(_, _)) => {
537            bail_uncategorized!(
538                "unimplemented conversion from avro to jsonb: {:?}",
539                ValueKind::from(v)
540            )
541        }
542    }
543    Ok(())
544}
545
546#[cfg(test)]
547mod tests {
548    use std::str::FromStr;
549
550    use apache_avro::{Decimal as AvroDecimal, from_avro_datum};
551    use expect_test::expect;
552    use risingwave_common::types::{Datum, Decimal};
553
554    use super::*;
555
556    /// Test the behavior of the Rust Avro lib for handling union with logical type.
557    #[test]
558    fn test_avro_lib_union() {
559        // duplicate types
560        let s = Schema::parse_str(r#"["null", "null"]"#);
561        expect![[r#"
562            Err(
563                Error {
564                    details: Unions cannot contain duplicate types,
565                },
566            )
567        "#]]
568        .assert_debug_eq(&s);
569        let s = Schema::parse_str(r#"["int", "int"]"#);
570        expect![[r#"
571            Err(
572                Error {
573                    details: Unions cannot contain duplicate types,
574                },
575            )
576        "#]]
577        .assert_debug_eq(&s);
578        // multiple map/array are considered as the same type, regardless of the element type!
579        let s = Schema::parse_str(
580            r#"[
581"null",
582{
583    "type": "map",
584    "values" : "long",
585    "default": {}
586},
587{
588    "type": "map",
589    "values" : "int",
590    "default": {}
591}
592]
593"#,
594        );
595        expect![[r#"
596            Err(
597                Error {
598                    details: Unions cannot contain duplicate types,
599                },
600            )
601        "#]]
602        .assert_debug_eq(&s);
603        let s = Schema::parse_str(
604            r#"[
605"null",
606{
607    "type": "array",
608    "items" : "long",
609    "default": {}
610},
611{
612    "type": "array",
613    "items" : "int",
614    "default": {}
615}
616]
617"#,
618        );
619        expect![[r#"
620            Err(
621                Error {
622                    details: Unions cannot contain duplicate types,
623                },
624            )
625        "#]]
626        .assert_debug_eq(&s);
627        // multiple named types
628        let s = Schema::parse_str(
629            r#"[
630"null",
631{"type":"fixed","name":"a","size":16},
632{"type":"fixed","name":"b","size":32}
633]
634"#,
635        );
636        expect![[r#"
637            Ok(
638                Union(
639                    UnionSchema {
640                        schemas: [
641                            Null,
642                            Fixed(
643                                FixedSchema {
644                                    name: Name {
645                                        name: "a",
646                                        namespace: None,
647                                    },
648                                    aliases: None,
649                                    doc: None,
650                                    size: 16,
651                                    default: None,
652                                    attributes: {},
653                                },
654                            ),
655                            Fixed(
656                                FixedSchema {
657                                    name: Name {
658                                        name: "b",
659                                        namespace: None,
660                                    },
661                                    aliases: None,
662                                    doc: None,
663                                    size: 32,
664                                    default: None,
665                                    attributes: {},
666                                },
667                            ),
668                        ],
669                        variant_index: {
670                            Null: 0,
671                        },
672                    },
673                ),
674            )
675        "#]]
676        .assert_debug_eq(&s);
677
678        // union in union
679        let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
680        expect![[r#"
681            Err(
682                Error {
683                    details: Unions may not directly contain a union,
684                },
685            )
686        "#]]
687        .assert_debug_eq(&s);
688
689        // logical type
690        let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
691        expect![[r#"
692            Union(
693                UnionSchema {
694                    schemas: [
695                        Null,
696                        Uuid,
697                    ],
698                    variant_index: {
699                        Null: 0,
700                        Uuid: 1,
701                    },
702                },
703            )
704        "#]]
705        .assert_debug_eq(&s);
706        // Note: Java Avro lib rejects this (logical type unions with its physical type)
707        let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
708        expect![[r#"
709            Union(
710                UnionSchema {
711                    schemas: [
712                        String,
713                        Uuid,
714                    ],
715                    variant_index: {
716                        String: 0,
717                        Uuid: 1,
718                    },
719                },
720            )
721        "#]]
722        .assert_debug_eq(&s);
723        // Note: Java Avro lib rejects this (logical type unions with its physical type)
724        let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
725        expect![[r#"
726            Union(
727                UnionSchema {
728                    schemas: [
729                        Int,
730                        Date,
731                    ],
732                    variant_index: {
733                        Int: 0,
734                        Date: 1,
735                    },
736                },
737            )
738        "#]]
739        .assert_debug_eq(&s);
740        // Note: Java Avro lib allows this (2 decimal with different "name")
741        let s = Schema::parse_str(
742            r#"[
743{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
744{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
745]"#,
746        );
747        expect![[r#"
748            Err(
749                Error {
750                    details: Unions cannot contain duplicate types,
751                },
752            )
753        "#]]
754        .assert_debug_eq(&s);
755    }
756
757    #[test]
758    fn test_avro_lib_union_record_bug() {
759        // multiple named types (record)
760        let s = Schema::parse_str(
761            r#"
762    {
763      "type": "record",
764      "name": "Root",
765      "fields": [
766        {
767          "name": "unionTypeComplex",
768          "type": [
769            "null",
770            {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
771            {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
772            {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
773          ]
774        }
775      ]
776    }
777        "#,
778        )
779        .unwrap();
780
781        let bytes = hex::decode("060c").unwrap();
782        // Correct should be variant 3 (Sms)
783        let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
784        expect![[r#"
785                Ok(
786                    Record(
787                        [
788                            (
789                                "unionTypeComplex",
790                                Union(
791                                    3,
792                                    Record(
793                                        [
794                                            (
795                                                "inner",
796                                                Int(
797                                                    6,
798                                                ),
799                                            ),
800                                        ],
801                                    ),
802                                ),
803                            ),
804                        ],
805                    ),
806                )
807            "#]]
808        .assert_debug_eq(&correct_value);
809        // Bug: We got variant 2 (Fax) here, if we pass the reader schema.
810        let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
811        expect![[r#"
812                Ok(
813                    Record(
814                        [
815                            (
816                                "unionTypeComplex",
817                                Union(
818                                    2,
819                                    Record(
820                                        [
821                                            (
822                                                "inner",
823                                                Int(
824                                                    6,
825                                                ),
826                                            ),
827                                        ],
828                                    ),
829                                ),
830                            ),
831                        ],
832                    ),
833                )
834            "#]]
835        .assert_debug_eq(&wrong_value);
836
837        // The bug below can explain what happened.
838        // The two records below are actually incompatible: https://avro.apache.org/docs/1.11.1/specification/_print/#schema-resolution
839        // > both schemas are records with the _same (unqualified) name_
840        // In from_avro_datum, it first reads the value with the writer schema, and then
841        // it just uses the reader schema to interpret the value.
842        // The value doesn't have record "name" information. So it wrongly passed the conversion.
843        // The correct way is that we need to use both the writer and reader schema in the second step to interpret the value.
844
845        let s = Schema::parse_str(
846            r#"
847    {
848      "type": "record",
849      "name": "Root",
850      "fields": [
851        {
852          "name": "a",
853          "type": "int"
854        }
855      ]
856    }
857        "#,
858        )
859        .unwrap();
860        let s2 = Schema::parse_str(
861            r#"
862{
863  "type": "record",
864  "name": "Root222",
865  "fields": [
866    {
867      "name": "a",
868      "type": "int"
869    }
870  ]
871}
872    "#,
873        )
874        .unwrap();
875
876        let bytes = hex::decode("0c").unwrap();
877        let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
878        expect![[r#"
879            Ok(
880                Record(
881                    [
882                        (
883                            "a",
884                            Int(
885                                6,
886                            ),
887                        ),
888                    ],
889                ),
890            )
891        "#]]
892        .assert_debug_eq(&value);
893    }
894
895    #[test]
896    fn test_convert_decimal() {
897        // 280
898        let v = vec![1, 24];
899        let avro_decimal = AvroDecimal::from(v);
900        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 0).unwrap();
901        assert_eq!(rust_decimal, rust_decimal::Decimal::from(280));
902
903        // 28.1
904        let v = vec![1, 25];
905        let avro_decimal = AvroDecimal::from(v);
906        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 1).unwrap();
907        assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
908
909        // 1.1234567891
910        let value = BigInt::from(11234567891_i64);
911        let decimal = scaled_bigint_to_rust_decimal(value, 10).unwrap();
912        assert_eq!(
913            decimal,
914            rust_decimal::Decimal::try_from(1.1234567891).unwrap()
915        );
916
917        // 1.123456789123456789123456789
918        let v = vec![3, 161, 77, 58, 146, 180, 49, 220, 100, 4, 95, 21];
919        let avro_decimal = AvroDecimal::from(v);
920        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 27).unwrap();
921        assert_eq!(
922            rust_decimal,
923            rust_decimal::Decimal::from_str("1.123456789123456789123456789").unwrap()
924        );
925    }
926
927    /// Convert Avro value to datum.For now, support the following [Avro type](https://avro.apache.org/docs/current/spec.html).
928    ///  - boolean
929    ///  - int : i32
930    ///  - long: i64
931    ///  - float: f32
932    ///  - double: f64
933    ///  - string: String
934    ///  - Date (the number of days from the unix epoch, 1970-1-1 UTC)
935    ///  - Timestamp (the number of milliseconds from the unix epoch,  1970-1-1 00:00:00.000 UTC)
936    fn from_avro_value(
937        value: Value,
938        value_schema: &Schema,
939        shape: &DataType,
940    ) -> anyhow::Result<Datum> {
941        Ok(AvroParseOptions::create(value_schema)
942            .inner
943            .convert_to_datum(value_schema, &value, shape)?
944            .to_owned_datum())
945    }
946
947    #[test]
948    fn test_avro_timestamptz_micros() {
949        let v1 = Value::TimestampMicros(1620000000000000);
950        let v2 = Value::TimestampMillis(1620000000000);
951        let value_schema1 = Schema::TimestampMicros;
952        let value_schema2 = Schema::TimestampMillis;
953        let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
954        let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
955        assert_eq!(
956            datum1,
957            Some(ScalarImpl::Timestamptz(
958                Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
959            ))
960        );
961        assert_eq!(
962            datum2,
963            Some(ScalarImpl::Timestamptz(
964                Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
965            ))
966        );
967    }
968
969    #[test]
970    fn test_decimal_truncate() {
971        let schema = Schema::parse_str(
972            r#"
973            {
974                "type": "bytes",
975                "logicalType": "decimal",
976                "precision": 38,
977                "scale": 18
978            }
979            "#,
980        )
981        .unwrap();
982        let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
983        let value = Value::Decimal(AvroDecimal::from(bytes));
984        let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
985        assert_eq!(
986            resp,
987            Some(ScalarImpl::Decimal(Decimal::Normalized(
988                rust_decimal::Decimal::from_str("0.017802464409370431").unwrap()
989            )))
990        );
991    }
992
993    #[test]
994    fn test_variable_scale_decimal() {
995        let schema = Schema::parse_str(
996            r#"
997            {
998                "type": "record",
999                "name": "VariableScaleDecimal",
1000                "namespace": "io.debezium.data",
1001                "fields": [
1002                    {
1003                        "name": "scale",
1004                        "type": "int"
1005                    },
1006                    {
1007                        "name": "value",
1008                        "type": "bytes"
1009                    }
1010                ]
1011            }
1012            "#,
1013        )
1014        .unwrap();
1015        let value = Value::Record(vec![
1016            ("scale".to_owned(), Value::Int(0)),
1017            ("value".to_owned(), Value::Bytes(vec![0x01, 0x02, 0x03])),
1018        ]);
1019
1020        let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
1021        assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
1022    }
1023}