risingwave_connector_codec/decoder/avro/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod schema;
16
17use apache_avro::Schema;
18use apache_avro::schema::{DecimalSchema, NamesRef, UnionSchema};
19use apache_avro::types::{Value, ValueKind};
20use chrono::Datelike;
21use itertools::Itertools;
22use num_bigint::BigInt;
23use risingwave_common::array::{ListValue, StructValue};
24use risingwave_common::types::{
25    DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
26    Timestamptz, ToOwnedDatum,
27};
28
29pub use self::schema::{MapHandling, ResolvedAvroSchema, avro_schema_to_fields};
30use super::utils::scaled_bigint_to_rust_decimal;
31use super::{Access, AccessError, AccessResult, bail_uncategorized, uncategorized};
32use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
33
34#[derive(Clone)]
35/// Options for parsing an `AvroValue` into Datum, with a root avro schema.
36pub struct AvroParseOptions<'a> {
37    /// The avro schema at root level
38    root_schema: &'a Schema,
39    /// The immutable "global" context during recursive parsing
40    inner: AvroParseOptionsInner<'a>,
41}
42
43#[derive(Clone)]
44/// Options for parsing an `AvroValue` into Datum, with names resolved from root schema.
45struct AvroParseOptionsInner<'a> {
46    /// Mapping from type names to actual schema
47    refs: NamesRef<'a>,
48    /// Strict Mode
49    /// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
50    relax_numeric: bool,
51}
52
53impl<'a> AvroParseOptions<'a> {
54    pub fn create(root_schema: &'a Schema) -> Self {
55        let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema)
56            .expect("avro schema is self contained");
57        Self {
58            root_schema,
59            inner: AvroParseOptionsInner {
60                refs: resolved.get_names().clone(),
61                relax_numeric: true,
62            },
63        }
64    }
65}
66
67impl<'a> AvroParseOptionsInner<'a> {
68    fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema {
69        match schema {
70            Schema::Ref { name } => self.refs[name],
71            _ => schema,
72        }
73    }
74
75    /// Parse an avro value into expected type.
76    ///
77    /// 3 kinds of type info are used to parsing:
78    /// - `type_expected`. The type that we expect the value is.
79    /// - value type. The type info together with the value argument.
80    /// - schema. The `AvroSchema` provided in option.
81    ///
82    /// Cases: (FIXME: Is this precise?)
83    /// - If both `type_expected` and schema are provided, it will check both strictly.
84    /// - If only `type_expected` is provided, it will try to match the value type
85    ///   and the `type_expected`, converting the value if possible.
86    /// - If only value is provided (without schema and `type_expected`),
87    ///   the `DataType` will be inferred.
88    fn convert_to_datum<'b>(
89        &self,
90        unresolved_schema: &'a Schema,
91        value: &'b Value,
92        type_expected: &DataType,
93    ) -> AccessResult<DatumCow<'b>>
94    where
95        'b: 'a,
96    {
97        let create_error = || AccessError::TypeError {
98            expected: format!("{:?}", type_expected),
99            got: format!("{:?}", value),
100            value: String::new(),
101        };
102
103        macro_rules! borrowed {
104            ($v:expr) => {
105                return Ok(DatumCow::Borrowed(Some($v.into())))
106            };
107        }
108
109        let v: ScalarImpl = match (type_expected, value) {
110            (_, Value::Null) => return Ok(DatumCow::NULL),
111            // ---- Union (with >=2 non null variants), and nullable Union ([null, record]) -----
112            (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
113                let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
114                    // XXX: Is this branch actually unreachable? (if self.schema is correctly used)
115                    return Err(create_error());
116                };
117
118                if let Some(inner) = get_nullable_union_inner(u) {
119                    // nullable Union ([null, record])
120                    return self.convert_to_datum(inner, v, type_expected);
121                }
122                let variant_schema = &u.variants()[*variant as usize];
123
124                if matches!(variant_schema, &Schema::Null) {
125                    return Ok(DatumCow::NULL);
126                }
127
128                // Here we compare the field name, instead of using the variant idx to find the field idx.
129                // The latter approach might also work, but might be more error-prone.
130                // We will need to get the index of the "null" variant, and then re-map the variant index to the field index.
131                // XXX: probably we can unwrap here (if self.schema is correctly used)
132                let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
133
134                let mut fields = Vec::with_capacity(struct_type_info.len());
135                for (field_name, field_type) in struct_type_info.iter() {
136                    if field_name == expected_field_name {
137                        let datum = self
138                            .convert_to_datum(variant_schema, v, field_type)?
139                            .to_owned_datum();
140
141                        fields.push(datum)
142                    } else {
143                        fields.push(None)
144                    }
145                }
146                StructValue::new(fields).into()
147            }
148            // nullable Union ([null, T])
149            (_, Value::Union(_, v)) => {
150                let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
151                    return Err(create_error());
152                };
153                let Some(schema) = get_nullable_union_inner(u) else {
154                    return Err(create_error());
155                };
156                return self.convert_to_datum(schema, v, type_expected);
157            }
158            // ---- Boolean -----
159            (DataType::Boolean, Value::Boolean(b)) => (*b).into(),
160            // ---- Int16 -----
161            (DataType::Int16, Value::Int(i)) if self.relax_numeric => (*i as i16).into(),
162            (DataType::Int16, Value::Long(i)) if self.relax_numeric => (*i as i16).into(),
163
164            // ---- Int32 -----
165            (DataType::Int32, Value::Int(i)) => (*i).into(),
166            (DataType::Int32, Value::Long(i)) if self.relax_numeric => (*i as i32).into(),
167            // ---- Int64 -----
168            (DataType::Int64, Value::Long(i)) => (*i).into(),
169            (DataType::Int64, Value::Int(i)) if self.relax_numeric => (*i as i64).into(),
170            // ---- Float32 -----
171            (DataType::Float32, Value::Float(i)) => (*i).into(),
172            (DataType::Float32, Value::Double(i)) => (*i as f32).into(),
173            // ---- Float64 -----
174            (DataType::Float64, Value::Double(i)) => (*i).into(),
175            (DataType::Float64, Value::Float(i)) => (*i as f64).into(),
176            // ---- Decimal -----
177            (DataType::Decimal, Value::Decimal(avro_decimal)) => {
178                let (_precision, scale) = match self.lookup_ref(unresolved_schema) {
179                    Schema::Decimal(DecimalSchema {
180                        precision, scale, ..
181                    }) => (*precision, *scale),
182                    _ => Err(create_error())?,
183                };
184                let decimal = scaled_bigint_to_rust_decimal(avro_decimal.clone().into(), scale)
185                    .map_err(|_| create_error())?;
186                ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
187            }
188            (DataType::Decimal, Value::Record(fields)) => {
189                // VariableScaleDecimal has fixed fields, scale(int) and value(bytes)
190                let find_in_records = |field_name: &str| {
191                    fields
192                        .iter()
193                        .find(|field| field.0 == field_name)
194                        .map(|field| &field.1)
195                        .ok_or_else(|| {
196                            uncategorized!("`{field_name}` field not found in VariableScaleDecimal")
197                        })
198                };
199                let scale = match find_in_records("scale")? {
200                    Value::Int(scale) => *scale,
201                    avro_value => bail_uncategorized!(
202                        "scale field in VariableScaleDecimal is not int, got {:?}",
203                        avro_value
204                    ),
205                };
206
207                let value: BigInt = match find_in_records("value")? {
208                    Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes),
209                    avro_value => bail_uncategorized!(
210                        "value field in VariableScaleDecimal is not bytes, got {:?}",
211                        avro_value
212                    ),
213                };
214
215                let decimal = scaled_bigint_to_rust_decimal(value, scale as _)?;
216                ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
217            }
218            // ---- Time -----
219            (DataType::Time, Value::TimeMillis(ms)) => Time::with_milli(*ms as u32)
220                .map_err(|_| create_error())?
221                .into(),
222            (DataType::Time, Value::TimeMicros(us)) => Time::with_micro(*us as u64)
223                .map_err(|_| create_error())?
224                .into(),
225            // ---- Date -----
226            (DataType::Date, Value::Date(days)) => {
227                Date::with_days_since_ce(days + unix_epoch_days())
228                    .map_err(|_| create_error())?
229                    .into()
230            }
231            // ---- Varchar -----
232            (DataType::Varchar, Value::Enum(_, symbol)) => borrowed!(symbol.as_str()),
233            (DataType::Varchar, Value::String(s)) => borrowed!(s.as_str()),
234            // ---- Timestamp -----
235            (DataType::Timestamp, Value::LocalTimestampMillis(ms)) => Timestamp::with_millis(*ms)
236                .map_err(|_| create_error())?
237                .into(),
238            (DataType::Timestamp, Value::LocalTimestampMicros(us)) => Timestamp::with_micros(*us)
239                .map_err(|_| create_error())?
240                .into(),
241
242            // ---- TimestampTz -----
243            (DataType::Timestamptz, Value::TimestampMillis(ms)) => Timestamptz::from_millis(*ms)
244                .ok_or_else(|| {
245                    uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range")
246                })?
247                .into(),
248            (DataType::Timestamptz, Value::TimestampMicros(us)) => {
249                Timestamptz::from_micros(*us).into()
250            }
251
252            // ---- Interval -----
253            (DataType::Interval, Value::Duration(duration)) => {
254                let months = u32::from(duration.months()) as i32;
255                let days = u32::from(duration.days()) as i32;
256                let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
257                ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
258            }
259            // ---- Struct -----
260            (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({
261                let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else {
262                    return Err(create_error());
263                };
264                struct_type_info
265                    .iter()
266                    .map(|(field_name, field_type)| {
267                        if let Some(idx) = record_schema.lookup.get(field_name) {
268                            let value = &descs[*idx].1;
269                            let schema = &record_schema.fields[*idx].schema;
270                            Ok(self
271                                .convert_to_datum(schema, value, field_type)?
272                                .to_owned_datum())
273                        } else {
274                            Ok(None)
275                        }
276                    })
277                    .collect::<Result<_, AccessError>>()?
278            })
279            .into(),
280            // ---- List -----
281            (DataType::List(list_type), Value::Array(array)) => ListValue::new({
282                let Schema::Array(element_schema) = self.lookup_ref(unresolved_schema) else {
283                    return Err(create_error());
284                };
285                let schema = element_schema;
286                let elem_type = list_type.elem();
287                let mut builder = elem_type.create_array_builder(array.len());
288                for v in array {
289                    let value = self.convert_to_datum(schema, v, elem_type)?;
290                    builder.append(value);
291                }
292                builder.finish()
293            })
294            .into(),
295            // ---- Bytea -----
296            (DataType::Bytea, Value::Bytes(value)) => borrowed!(value.as_slice()),
297            // ---- Jsonb -----
298            (DataType::Jsonb, v @ Value::Map(_)) => {
299                let mut builder = jsonbb::Builder::default();
300                avro_to_jsonb(v, &mut builder)?;
301                let jsonb = builder.finish();
302                debug_assert!(jsonb.as_ref().is_object());
303                JsonbVal::from(jsonb).into()
304            }
305            (DataType::Varchar, Value::Uuid(uuid)) => {
306                uuid.as_hyphenated().to_string().into_boxed_str().into()
307            }
308            (DataType::Map(map_type), Value::Map(map)) => {
309                let Schema::Map(value_schema) = self.lookup_ref(unresolved_schema) else {
310                    return Err(create_error());
311                };
312                let schema = value_schema;
313                let mut builder = map_type
314                    .clone()
315                    .into_struct()
316                    .create_array_builder(map.len());
317                // Since the map is HashMap, we can ensure
318                // key is non-null and unique, keys and values have the same length.
319
320                // NOTE: HashMap's iter order is non-deterministic, but MapValue's
321                // order matters. We sort by key here to have deterministic order
322                // in tests. We might consider removing this, or make all MapValue sorted
323                // in the future.
324                for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) {
325                    let value_datum = self
326                        .convert_to_datum(schema, v, map_type.value())?
327                        .to_owned_datum();
328                    builder.append(
329                        StructValue::new(vec![Some(k.as_str().into()), value_datum])
330                            .to_owned_datum(),
331                    );
332                }
333                let list = ListValue::new(builder.finish());
334                MapValue::from_entries(list).into()
335            }
336
337            (_expected, _got) => Err(create_error())?,
338        };
339        Ok(DatumCow::Owned(Some(v)))
340    }
341}
342
343pub struct AvroAccess<'a> {
344    value: &'a Value,
345    options: AvroParseOptions<'a>,
346}
347
348impl<'a> AvroAccess<'a> {
349    pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self {
350        Self {
351            value: root_value,
352            options,
353        }
354    }
355}
356
357impl Access for AvroAccess<'_> {
358    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
359        let mut value = self.value;
360        let mut unresolved_schema = self.options.root_schema;
361
362        debug_assert!(
363            path.len() == 1
364                || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
365            "unexpected path access: {:?}",
366            path
367        );
368        let mut i = 0;
369        while i < path.len() {
370            let key = path[i];
371            let create_error = || AccessError::Undefined {
372                name: key.to_owned(),
373                path: path.iter().take(i).join("."),
374            };
375            match value {
376                Value::Union(_, v) => {
377                    // The debezium "before" field is a nullable union.
378                    // "fields": [
379                    // {
380                    //     "name": "before",
381                    //     "type": [
382                    //         "null",
383                    //         {
384                    //             "type": "record",
385                    //             "name": "Value",
386                    //             "fields": [...],
387                    //         }
388                    //     ],
389                    //     "default": null
390                    // },
391                    // {
392                    //     "name": "after",
393                    //     "type": [
394                    //         "null",
395                    //         "Value"
396                    //     ],
397                    //     "default": null
398                    // },
399                    // ...]
400                    value = v;
401                    let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else {
402                        return Err(create_error());
403                    };
404                    let Some(schema) = get_nullable_union_inner(u) else {
405                        return Err(create_error());
406                    };
407                    unresolved_schema = schema;
408                    continue;
409                }
410                Value::Record(fields) => {
411                    let Schema::Record(record_schema) =
412                        self.options.inner.lookup_ref(unresolved_schema)
413                    else {
414                        return Err(create_error());
415                    };
416                    if let Some(idx) = record_schema.lookup.get(key) {
417                        value = &fields[*idx].1;
418                        unresolved_schema = &record_schema.fields[*idx].schema;
419                        i += 1;
420                        continue;
421                    }
422                }
423                _ => (),
424            }
425            Err(create_error())?;
426        }
427
428        self.options
429            .inner
430            .convert_to_datum(unresolved_schema, value, type_expected)
431    }
432}
433
434/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`.
435pub fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
436    let variants = union_schema.variants();
437    // Note: `[null, null] is invalid`, we don't need to worry about that.
438    if variants.len() == 2 && variants.contains(&Schema::Null) {
439        let inner_schema = variants
440            .iter()
441            .find(|s| !matches!(s, &&Schema::Null))
442            .unwrap();
443        Some(inner_schema)
444    } else {
445        None
446    }
447}
448
449pub(crate) fn unix_epoch_days() -> i32 {
450    Date::from_ymd_uncheck(1970, 1, 1).0.num_days_from_ce()
451}
452
453pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> AccessResult<()> {
454    match avro {
455        Value::Null => builder.add_null(),
456        Value::Boolean(b) => builder.add_bool(*b),
457        Value::Int(i) => builder.add_i64(*i as i64),
458        Value::String(s) => builder.add_string(s),
459        Value::Map(m) => {
460            builder.begin_object();
461            for (k, v) in m {
462                builder.add_string(k);
463                avro_to_jsonb(v, builder)?;
464            }
465            builder.end_object()
466        }
467        // same representation as map
468        Value::Record(r) => {
469            builder.begin_object();
470            for (k, v) in r {
471                builder.add_string(k);
472                avro_to_jsonb(v, builder)?;
473            }
474            builder.end_object()
475        }
476        Value::Array(a) => {
477            builder.begin_array();
478            for v in a {
479                avro_to_jsonb(v, builder)?;
480            }
481            builder.end_array()
482        }
483
484        // TODO: figure out where the following encoding is reasonable before enabling them.
485        // See discussions: https://github.com/risingwavelabs/risingwave/pull/16948
486
487        // jsonbb supports int64, but JSON spec does not allow it. How should we handle it?
488        // BTW, protobuf canonical JSON converts int64 to string.
489        // Value::Long(l) => builder.add_i64(*l),
490        // Value::Float(f) => {
491        //     if f.is_nan() || f.is_infinite() {
492        //         // XXX: pad null or return err here?
493        //         builder.add_null()
494        //     } else {
495        //         builder.add_f64(*f as f64)
496        //     }
497        // }
498        // Value::Double(f) => {
499        //     if f.is_nan() || f.is_infinite() {
500        //         // XXX: pad null or return err here?
501        //         builder.add_null()
502        //     } else {
503        //         builder.add_f64(*f)
504        //     }
505        // }
506        // // XXX: What encoding to use?
507        // // ToText is \x plus hex string.
508        // Value::Bytes(b) => builder.add_string(&ToText::to_text(&b.as_slice())),
509        // Value::Enum(_, symbol) => {
510        //     builder.add_string(&symbol);
511        // }
512        // Value::Uuid(id) => builder.add_string(&id.as_hyphenated().to_string()),
513        // // For Union, one concern is that the avro union is tagged (like rust enum) but json union is untagged (like c union).
514        // // When the union consists of multiple records, it is possible to distinguish which variant is active in avro, but in json they will all become jsonb objects and indistinguishable.
515        // Value::Union(_, v) => avro_to_jsonb(v, builder)?
516        // XXX: pad null or return err here?
517        v @ (Value::Long(_)
518        | Value::Float(_)
519        | Value::Double(_)
520        | Value::Bytes(_)
521        | Value::Enum(_, _)
522        | Value::Fixed(_, _)
523        | Value::Date(_)
524        | Value::Decimal(_)
525        | Value::TimeMillis(_)
526        | Value::TimeMicros(_)
527        | Value::TimestampMillis(_)
528        | Value::TimestampMicros(_)
529        | Value::LocalTimestampMillis(_)
530        | Value::LocalTimestampMicros(_)
531        | Value::Duration(_)
532        | Value::Uuid(_)
533        | Value::Union(_, _)) => {
534            bail_uncategorized!(
535                "unimplemented conversion from avro to jsonb: {:?}",
536                ValueKind::from(v)
537            )
538        }
539    }
540    Ok(())
541}
542
543#[cfg(test)]
544mod tests {
545    use std::str::FromStr;
546
547    use apache_avro::{Decimal as AvroDecimal, from_avro_datum};
548    use expect_test::expect;
549    use risingwave_common::types::{Datum, Decimal};
550
551    use super::*;
552
553    /// Test the behavior of the Rust Avro lib for handling union with logical type.
554    #[test]
555    fn test_avro_lib_union() {
556        // duplicate types
557        let s = Schema::parse_str(r#"["null", "null"]"#);
558        expect![[r#"
559            Err(
560                Unions cannot contain duplicate types,
561            )
562        "#]]
563        .assert_debug_eq(&s);
564        let s = Schema::parse_str(r#"["int", "int"]"#);
565        expect![[r#"
566            Err(
567                Unions cannot contain duplicate types,
568            )
569        "#]]
570        .assert_debug_eq(&s);
571        // multiple map/array are considered as the same type, regardless of the element type!
572        let s = Schema::parse_str(
573            r#"[
574"null",
575{
576    "type": "map",
577    "values" : "long",
578    "default": {}
579},
580{
581    "type": "map",
582    "values" : "int",
583    "default": {}
584}
585]
586"#,
587        );
588        expect![[r#"
589            Err(
590                Unions cannot contain duplicate types,
591            )
592        "#]]
593        .assert_debug_eq(&s);
594        let s = Schema::parse_str(
595            r#"[
596"null",
597{
598    "type": "array",
599    "items" : "long",
600    "default": {}
601},
602{
603    "type": "array",
604    "items" : "int",
605    "default": {}
606}
607]
608"#,
609        );
610        expect![[r#"
611        Err(
612            Unions cannot contain duplicate types,
613        )
614    "#]]
615        .assert_debug_eq(&s);
616        // multiple named types
617        let s = Schema::parse_str(
618            r#"[
619"null",
620{"type":"fixed","name":"a","size":16},
621{"type":"fixed","name":"b","size":32}
622]
623"#,
624        );
625        expect![[r#"
626            Ok(
627                Union(
628                    UnionSchema {
629                        schemas: [
630                            Null,
631                            Fixed(
632                                FixedSchema {
633                                    name: Name {
634                                        name: "a",
635                                        namespace: None,
636                                    },
637                                    aliases: None,
638                                    doc: None,
639                                    size: 16,
640                                    attributes: {},
641                                },
642                            ),
643                            Fixed(
644                                FixedSchema {
645                                    name: Name {
646                                        name: "b",
647                                        namespace: None,
648                                    },
649                                    aliases: None,
650                                    doc: None,
651                                    size: 32,
652                                    attributes: {},
653                                },
654                            ),
655                        ],
656                        variant_index: {
657                            Null: 0,
658                        },
659                    },
660                ),
661            )
662        "#]]
663        .assert_debug_eq(&s);
664
665        // union in union
666        let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
667        expect![[r#"
668            Err(
669                Unions may not directly contain a union,
670            )
671        "#]]
672        .assert_debug_eq(&s);
673
674        // logical type
675        let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
676        expect![[r#"
677            Union(
678                UnionSchema {
679                    schemas: [
680                        Null,
681                        Uuid,
682                    ],
683                    variant_index: {
684                        Null: 0,
685                        Uuid: 1,
686                    },
687                },
688            )
689        "#]]
690        .assert_debug_eq(&s);
691        // Note: Java Avro lib rejects this (logical type unions with its physical type)
692        let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
693        expect![[r#"
694            Union(
695                UnionSchema {
696                    schemas: [
697                        String,
698                        Uuid,
699                    ],
700                    variant_index: {
701                        String: 0,
702                        Uuid: 1,
703                    },
704                },
705            )
706        "#]]
707        .assert_debug_eq(&s);
708        // Note: Java Avro lib rejects this (logical type unions with its physical type)
709        let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
710        expect![[r#"
711            Union(
712                UnionSchema {
713                    schemas: [
714                        Int,
715                        Date,
716                    ],
717                    variant_index: {
718                        Int: 0,
719                        Date: 1,
720                    },
721                },
722            )
723        "#]]
724        .assert_debug_eq(&s);
725        // Note: Java Avro lib allows this (2 decimal with different "name")
726        let s = Schema::parse_str(
727            r#"[
728{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
729{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
730]"#,
731        );
732        expect![[r#"
733            Err(
734                Unions cannot contain duplicate types,
735            )
736        "#]]
737        .assert_debug_eq(&s);
738    }
739
740    #[test]
741    fn test_avro_lib_union_record_bug() {
742        // multiple named types (record)
743        let s = Schema::parse_str(
744            r#"
745    {
746      "type": "record",
747      "name": "Root",
748      "fields": [
749        {
750          "name": "unionTypeComplex",
751          "type": [
752            "null",
753            {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
754            {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
755            {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
756          ]
757        }
758      ]
759    }
760        "#,
761        )
762        .unwrap();
763
764        let bytes = hex::decode("060c").unwrap();
765        // Correct should be variant 3 (Sms)
766        let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
767        expect![[r#"
768                Ok(
769                    Record(
770                        [
771                            (
772                                "unionTypeComplex",
773                                Union(
774                                    3,
775                                    Record(
776                                        [
777                                            (
778                                                "inner",
779                                                Int(
780                                                    6,
781                                                ),
782                                            ),
783                                        ],
784                                    ),
785                                ),
786                            ),
787                        ],
788                    ),
789                )
790            "#]]
791        .assert_debug_eq(&correct_value);
792        // Bug: We got variant 2 (Fax) here, if we pass the reader schema.
793        let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
794        expect![[r#"
795                Ok(
796                    Record(
797                        [
798                            (
799                                "unionTypeComplex",
800                                Union(
801                                    2,
802                                    Record(
803                                        [
804                                            (
805                                                "inner",
806                                                Int(
807                                                    6,
808                                                ),
809                                            ),
810                                        ],
811                                    ),
812                                ),
813                            ),
814                        ],
815                    ),
816                )
817            "#]]
818        .assert_debug_eq(&wrong_value);
819
820        // The bug below can explain what happened.
821        // The two records below are actually incompatible: https://avro.apache.org/docs/1.11.1/specification/_print/#schema-resolution
822        // > both schemas are records with the _same (unqualified) name_
823        // In from_avro_datum, it first reads the value with the writer schema, and then
824        // it just uses the reader schema to interpret the value.
825        // The value doesn't have record "name" information. So it wrongly passed the conversion.
826        // The correct way is that we need to use both the writer and reader schema in the second step to interpret the value.
827
828        let s = Schema::parse_str(
829            r#"
830    {
831      "type": "record",
832      "name": "Root",
833      "fields": [
834        {
835          "name": "a",
836          "type": "int"
837        }
838      ]
839    }
840        "#,
841        )
842        .unwrap();
843        let s2 = Schema::parse_str(
844            r#"
845{
846  "type": "record",
847  "name": "Root222",
848  "fields": [
849    {
850      "name": "a",
851      "type": "int"
852    }
853  ]
854}
855    "#,
856        )
857        .unwrap();
858
859        let bytes = hex::decode("0c").unwrap();
860        let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
861        expect![[r#"
862            Ok(
863                Record(
864                    [
865                        (
866                            "a",
867                            Int(
868                                6,
869                            ),
870                        ),
871                    ],
872                ),
873            )
874        "#]]
875        .assert_debug_eq(&value);
876    }
877
878    #[test]
879    fn test_convert_decimal() {
880        // 280
881        let v = vec![1, 24];
882        let avro_decimal = AvroDecimal::from(v);
883        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 0).unwrap();
884        assert_eq!(rust_decimal, rust_decimal::Decimal::from(280));
885
886        // 28.1
887        let v = vec![1, 25];
888        let avro_decimal = AvroDecimal::from(v);
889        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 1).unwrap();
890        assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
891
892        // 1.1234567891
893        let value = BigInt::from(11234567891_i64);
894        let decimal = scaled_bigint_to_rust_decimal(value, 10).unwrap();
895        assert_eq!(
896            decimal,
897            rust_decimal::Decimal::try_from(1.1234567891).unwrap()
898        );
899
900        // 1.123456789123456789123456789
901        let v = vec![3, 161, 77, 58, 146, 180, 49, 220, 100, 4, 95, 21];
902        let avro_decimal = AvroDecimal::from(v);
903        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 27).unwrap();
904        assert_eq!(
905            rust_decimal,
906            rust_decimal::Decimal::from_str("1.123456789123456789123456789").unwrap()
907        );
908    }
909
910    /// Convert Avro value to datum.For now, support the following [Avro type](https://avro.apache.org/docs/current/spec.html).
911    ///  - boolean
912    ///  - int : i32
913    ///  - long: i64
914    ///  - float: f32
915    ///  - double: f64
916    ///  - string: String
917    ///  - Date (the number of days from the unix epoch, 1970-1-1 UTC)
918    ///  - Timestamp (the number of milliseconds from the unix epoch,  1970-1-1 00:00:00.000 UTC)
919    fn from_avro_value(
920        value: Value,
921        value_schema: &Schema,
922        shape: &DataType,
923    ) -> anyhow::Result<Datum> {
924        Ok(AvroParseOptions::create(value_schema)
925            .inner
926            .convert_to_datum(value_schema, &value, shape)?
927            .to_owned_datum())
928    }
929
930    #[test]
931    fn test_avro_timestamptz_micros() {
932        let v1 = Value::TimestampMicros(1620000000000000);
933        let v2 = Value::TimestampMillis(1620000000000);
934        let value_schema1 = Schema::TimestampMicros;
935        let value_schema2 = Schema::TimestampMillis;
936        let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
937        let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
938        assert_eq!(
939            datum1,
940            Some(ScalarImpl::Timestamptz(
941                Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
942            ))
943        );
944        assert_eq!(
945            datum2,
946            Some(ScalarImpl::Timestamptz(
947                Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
948            ))
949        );
950    }
951
952    #[test]
953    fn test_decimal_truncate() {
954        let schema = Schema::parse_str(
955            r#"
956            {
957                "type": "bytes",
958                "logicalType": "decimal",
959                "precision": 38,
960                "scale": 18
961            }
962            "#,
963        )
964        .unwrap();
965        let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
966        let value = Value::Decimal(AvroDecimal::from(bytes));
967        let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
968        assert_eq!(
969            resp,
970            Some(ScalarImpl::Decimal(Decimal::Normalized(
971                rust_decimal::Decimal::from_str("0.017802464409370431").unwrap()
972            )))
973        );
974    }
975
976    #[test]
977    fn test_variable_scale_decimal() {
978        let schema = Schema::parse_str(
979            r#"
980            {
981                "type": "record",
982                "name": "VariableScaleDecimal",
983                "namespace": "io.debezium.data",
984                "fields": [
985                    {
986                        "name": "scale",
987                        "type": "int"
988                    },
989                    {
990                        "name": "value",
991                        "type": "bytes"
992                    }
993                ]
994            }
995            "#,
996        )
997        .unwrap();
998        let value = Value::Record(vec![
999            ("scale".to_owned(), Value::Int(0)),
1000            ("value".to_owned(), Value::Bytes(vec![0x01, 0x02, 0x03])),
1001        ]);
1002
1003        let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
1004        assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
1005    }
1006}