risingwave_connector_codec/decoder/avro/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod schema;
16
17use apache_avro::Schema;
18use apache_avro::schema::{DecimalSchema, NamesRef, UnionSchema};
19use apache_avro::types::{Value, ValueKind};
20use chrono::Datelike;
21use itertools::Itertools;
22use num_bigint::BigInt;
23use risingwave_common::array::{ListValue, StructValue};
24use risingwave_common::types::{
25    DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
26    Timestamptz, ToOwnedDatum,
27};
28
29pub use self::schema::{MapHandling, ResolvedAvroSchema, avro_schema_to_fields};
30use super::utils::scaled_bigint_to_rust_decimal;
31use super::{Access, AccessError, AccessResult, bail_uncategorized, uncategorized};
32use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
33
34#[derive(Clone)]
35/// Options for parsing an `AvroValue` into Datum, with a root avro schema.
36pub struct AvroParseOptions<'a> {
37    /// The avro schema at root level
38    root_schema: &'a Schema,
39    /// The immutable "global" context during recursive parsing
40    inner: AvroParseOptionsInner<'a>,
41}
42
43#[derive(Clone)]
44/// Options for parsing an `AvroValue` into Datum, with names resolved from root schema.
45struct AvroParseOptionsInner<'a> {
46    /// Mapping from type names to actual schema
47    refs: NamesRef<'a>,
48    /// Strict Mode
49    /// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
50    relax_numeric: bool,
51}
52
53impl<'a> AvroParseOptions<'a> {
54    pub fn create(root_schema: &'a Schema) -> Self {
55        let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema)
56            .expect("avro schema is self contained");
57        Self {
58            root_schema,
59            inner: AvroParseOptionsInner {
60                refs: resolved.get_names().clone(),
61                relax_numeric: true,
62            },
63        }
64    }
65}
66
67impl<'a> AvroParseOptionsInner<'a> {
68    fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema {
69        match schema {
70            Schema::Ref { name } => self.refs[name],
71            _ => schema,
72        }
73    }
74
75    /// Parse an avro value into expected type.
76    ///
77    /// 3 kinds of type info are used to parsing:
78    /// - `type_expected`. The type that we expect the value is.
79    /// - value type. The type info together with the value argument.
80    /// - schema. The `AvroSchema` provided in option.
81    ///
82    /// Cases: (FIXME: Is this precise?)
83    /// - If both `type_expected` and schema are provided, it will check both strictly.
84    /// - If only `type_expected` is provided, it will try to match the value type
85    ///   and the `type_expected`, converting the value if possible.
86    /// - If only value is provided (without schema and `type_expected`),
87    ///   the `DataType` will be inferred.
88    fn convert_to_datum<'b>(
89        &self,
90        unresolved_schema: &'a Schema,
91        value: &'b Value,
92        type_expected: &DataType,
93    ) -> AccessResult<DatumCow<'b>>
94    where
95        'b: 'a,
96    {
97        let create_error = || AccessError::TypeError {
98            expected: format!("{:?}", type_expected),
99            got: format!("{:?}", value),
100            value: String::new(),
101        };
102
103        macro_rules! borrowed {
104            ($v:expr) => {
105                return Ok(DatumCow::Borrowed(Some($v.into())))
106            };
107        }
108
109        let v: ScalarImpl = match (type_expected, value) {
110            (_, Value::Null) => return Ok(DatumCow::NULL),
111            // ---- Union (with >=2 non null variants), and nullable Union ([null, record]) -----
112            (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
113                let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
114                    // XXX: Is this branch actually unreachable? (if self.schema is correctly used)
115                    return Err(create_error());
116                };
117
118                if let Some(inner) = get_nullable_union_inner(u) {
119                    // nullable Union ([null, record])
120                    return self.convert_to_datum(inner, v, type_expected);
121                }
122                let variant_schema = &u.variants()[*variant as usize];
123
124                if matches!(variant_schema, &Schema::Null) {
125                    return Ok(DatumCow::NULL);
126                }
127
128                // Here we compare the field name, instead of using the variant idx to find the field idx.
129                // The latter approach might also work, but might be more error-prone.
130                // We will need to get the index of the "null" variant, and then re-map the variant index to the field index.
131                // XXX: probably we can unwrap here (if self.schema is correctly used)
132                let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
133
134                let mut fields = Vec::with_capacity(struct_type_info.len());
135                for (field_name, field_type) in struct_type_info.iter() {
136                    if field_name == expected_field_name {
137                        let datum = self
138                            .convert_to_datum(variant_schema, v, field_type)?
139                            .to_owned_datum();
140
141                        fields.push(datum)
142                    } else {
143                        fields.push(None)
144                    }
145                }
146                StructValue::new(fields).into()
147            }
148            // nullable Union ([null, T])
149            (_, Value::Union(_, v)) => {
150                let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
151                    return Err(create_error());
152                };
153                let Some(schema) = get_nullable_union_inner(u) else {
154                    return Err(create_error());
155                };
156                return self.convert_to_datum(schema, v, type_expected);
157            }
158            // ---- Boolean -----
159            (DataType::Boolean, Value::Boolean(b)) => (*b).into(),
160            // ---- Int16 -----
161            (DataType::Int16, Value::Int(i)) if self.relax_numeric => (*i as i16).into(),
162            (DataType::Int16, Value::Long(i)) if self.relax_numeric => (*i as i16).into(),
163
164            // ---- Int32 -----
165            (DataType::Int32, Value::Int(i)) => (*i).into(),
166            (DataType::Int32, Value::Long(i)) if self.relax_numeric => (*i as i32).into(),
167            // ---- Int64 -----
168            (DataType::Int64, Value::Long(i)) => (*i).into(),
169            (DataType::Int64, Value::Int(i)) if self.relax_numeric => (*i as i64).into(),
170            // ---- Float32 -----
171            (DataType::Float32, Value::Float(i)) => (*i).into(),
172            (DataType::Float32, Value::Double(i)) => (*i as f32).into(),
173            // ---- Float64 -----
174            (DataType::Float64, Value::Double(i)) => (*i).into(),
175            (DataType::Float64, Value::Float(i)) => (*i as f64).into(),
176            // ---- Decimal -----
177            (DataType::Decimal, Value::Decimal(avro_decimal)) => {
178                let (_precision, scale) = match self.lookup_ref(unresolved_schema) {
179                    Schema::Decimal(DecimalSchema {
180                        precision, scale, ..
181                    }) => (*precision, *scale),
182                    _ => Err(create_error())?,
183                };
184                let decimal = scaled_bigint_to_rust_decimal(avro_decimal.clone().into(), scale)
185                    .map_err(|_| create_error())?;
186                ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
187            }
188            (DataType::Decimal, Value::Record(fields)) => {
189                // VariableScaleDecimal has fixed fields, scale(int) and value(bytes)
190                let find_in_records = |field_name: &str| {
191                    fields
192                        .iter()
193                        .find(|field| field.0 == field_name)
194                        .map(|field| &field.1)
195                        .ok_or_else(|| {
196                            uncategorized!("`{field_name}` field not found in VariableScaleDecimal")
197                        })
198                };
199                let scale = match find_in_records("scale")? {
200                    Value::Int(scale) => *scale,
201                    avro_value => bail_uncategorized!(
202                        "scale field in VariableScaleDecimal is not int, got {:?}",
203                        avro_value
204                    ),
205                };
206
207                let value: BigInt = match find_in_records("value")? {
208                    Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes),
209                    avro_value => bail_uncategorized!(
210                        "value field in VariableScaleDecimal is not bytes, got {:?}",
211                        avro_value
212                    ),
213                };
214
215                let decimal = scaled_bigint_to_rust_decimal(value, scale as _)?;
216                ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
217            }
218            // ---- Time -----
219            (DataType::Time, Value::TimeMillis(ms)) => Time::with_milli(*ms as u32)
220                .map_err(|_| create_error())?
221                .into(),
222            (DataType::Time, Value::TimeMicros(us)) => Time::with_micro(*us as u64)
223                .map_err(|_| create_error())?
224                .into(),
225            // ---- Date -----
226            (DataType::Date, Value::Date(days)) => {
227                Date::with_days_since_ce(days + unix_epoch_days())
228                    .map_err(|_| create_error())?
229                    .into()
230            }
231            // ---- Varchar -----
232            (DataType::Varchar, Value::Enum(_, symbol)) => borrowed!(symbol.as_str()),
233            (DataType::Varchar, Value::String(s)) => borrowed!(s.as_str()),
234            // ---- Timestamp -----
235            (DataType::Timestamp, Value::LocalTimestampMillis(ms)) => Timestamp::with_millis(*ms)
236                .map_err(|_| create_error())?
237                .into(),
238            (DataType::Timestamp, Value::LocalTimestampMicros(us)) => Timestamp::with_micros(*us)
239                .map_err(|_| create_error())?
240                .into(),
241
242            // ---- TimestampTz -----
243            (DataType::Timestamptz, Value::TimestampMillis(ms)) => Timestamptz::from_millis(*ms)
244                .ok_or_else(|| {
245                    uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range")
246                })?
247                .into(),
248            (DataType::Timestamptz, Value::TimestampMicros(us)) => {
249                Timestamptz::from_micros(*us).into()
250            }
251
252            // ---- Interval -----
253            (DataType::Interval, Value::Duration(duration)) => {
254                let months = u32::from(duration.months()) as i32;
255                let days = u32::from(duration.days()) as i32;
256                let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
257                ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
258            }
259            // ---- Struct -----
260            (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({
261                let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else {
262                    return Err(create_error());
263                };
264                struct_type_info
265                    .iter()
266                    .map(|(field_name, field_type)| {
267                        if let Some(idx) = record_schema.lookup.get(field_name) {
268                            let value = &descs[*idx].1;
269                            let schema = &record_schema.fields[*idx].schema;
270                            Ok(self
271                                .convert_to_datum(schema, value, field_type)?
272                                .to_owned_datum())
273                        } else {
274                            Ok(None)
275                        }
276                    })
277                    .collect::<Result<_, AccessError>>()?
278            })
279            .into(),
280            // ---- List -----
281            (DataType::List(item_type), Value::Array(array)) => ListValue::new({
282                let Schema::Array(element_schema) = self.lookup_ref(unresolved_schema) else {
283                    return Err(create_error());
284                };
285                let schema = element_schema;
286                let mut builder = item_type.create_array_builder(array.len());
287                for v in array {
288                    let value = self.convert_to_datum(schema, v, item_type)?;
289                    builder.append(value);
290                }
291                builder.finish()
292            })
293            .into(),
294            // ---- Bytea -----
295            (DataType::Bytea, Value::Bytes(value)) => borrowed!(value.as_slice()),
296            // ---- Jsonb -----
297            (DataType::Jsonb, v @ Value::Map(_)) => {
298                let mut builder = jsonbb::Builder::default();
299                avro_to_jsonb(v, &mut builder)?;
300                let jsonb = builder.finish();
301                debug_assert!(jsonb.as_ref().is_object());
302                JsonbVal::from(jsonb).into()
303            }
304            (DataType::Varchar, Value::Uuid(uuid)) => {
305                uuid.as_hyphenated().to_string().into_boxed_str().into()
306            }
307            (DataType::Map(map_type), Value::Map(map)) => {
308                let Schema::Map(value_schema) = self.lookup_ref(unresolved_schema) else {
309                    return Err(create_error());
310                };
311                let schema = value_schema;
312                let mut builder = map_type
313                    .clone()
314                    .into_struct()
315                    .create_array_builder(map.len());
316                // Since the map is HashMap, we can ensure
317                // key is non-null and unique, keys and values have the same length.
318
319                // NOTE: HashMap's iter order is non-deterministic, but MapValue's
320                // order matters. We sort by key here to have deterministic order
321                // in tests. We might consider removing this, or make all MapValue sorted
322                // in the future.
323                for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) {
324                    let value_datum = self
325                        .convert_to_datum(schema, v, map_type.value())?
326                        .to_owned_datum();
327                    builder.append(
328                        StructValue::new(vec![Some(k.as_str().into()), value_datum])
329                            .to_owned_datum(),
330                    );
331                }
332                let list = ListValue::new(builder.finish());
333                MapValue::from_entries(list).into()
334            }
335
336            (_expected, _got) => Err(create_error())?,
337        };
338        Ok(DatumCow::Owned(Some(v)))
339    }
340}
341
342pub struct AvroAccess<'a> {
343    value: &'a Value,
344    options: AvroParseOptions<'a>,
345}
346
347impl<'a> AvroAccess<'a> {
348    pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self {
349        Self {
350            value: root_value,
351            options,
352        }
353    }
354}
355
356impl Access for AvroAccess<'_> {
357    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
358        let mut value = self.value;
359        let mut unresolved_schema = self.options.root_schema;
360
361        debug_assert!(
362            path.len() == 1
363                || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
364            "unexpected path access: {:?}",
365            path
366        );
367        let mut i = 0;
368        while i < path.len() {
369            let key = path[i];
370            let create_error = || AccessError::Undefined {
371                name: key.to_owned(),
372                path: path.iter().take(i).join("."),
373            };
374            match value {
375                Value::Union(_, v) => {
376                    // The debezium "before" field is a nullable union.
377                    // "fields": [
378                    // {
379                    //     "name": "before",
380                    //     "type": [
381                    //         "null",
382                    //         {
383                    //             "type": "record",
384                    //             "name": "Value",
385                    //             "fields": [...],
386                    //         }
387                    //     ],
388                    //     "default": null
389                    // },
390                    // {
391                    //     "name": "after",
392                    //     "type": [
393                    //         "null",
394                    //         "Value"
395                    //     ],
396                    //     "default": null
397                    // },
398                    // ...]
399                    value = v;
400                    let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else {
401                        return Err(create_error());
402                    };
403                    let Some(schema) = get_nullable_union_inner(u) else {
404                        return Err(create_error());
405                    };
406                    unresolved_schema = schema;
407                    continue;
408                }
409                Value::Record(fields) => {
410                    let Schema::Record(record_schema) =
411                        self.options.inner.lookup_ref(unresolved_schema)
412                    else {
413                        return Err(create_error());
414                    };
415                    if let Some(idx) = record_schema.lookup.get(key) {
416                        value = &fields[*idx].1;
417                        unresolved_schema = &record_schema.fields[*idx].schema;
418                        i += 1;
419                        continue;
420                    }
421                }
422                _ => (),
423            }
424            Err(create_error())?;
425        }
426
427        self.options
428            .inner
429            .convert_to_datum(unresolved_schema, value, type_expected)
430    }
431}
432
433/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`.
434pub fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
435    let variants = union_schema.variants();
436    // Note: `[null, null] is invalid`, we don't need to worry about that.
437    if variants.len() == 2 && variants.contains(&Schema::Null) {
438        let inner_schema = variants
439            .iter()
440            .find(|s| !matches!(s, &&Schema::Null))
441            .unwrap();
442        Some(inner_schema)
443    } else {
444        None
445    }
446}
447
448pub(crate) fn unix_epoch_days() -> i32 {
449    Date::from_ymd_uncheck(1970, 1, 1).0.num_days_from_ce()
450}
451
452pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> AccessResult<()> {
453    match avro {
454        Value::Null => builder.add_null(),
455        Value::Boolean(b) => builder.add_bool(*b),
456        Value::Int(i) => builder.add_i64(*i as i64),
457        Value::String(s) => builder.add_string(s),
458        Value::Map(m) => {
459            builder.begin_object();
460            for (k, v) in m {
461                builder.add_string(k);
462                avro_to_jsonb(v, builder)?;
463            }
464            builder.end_object()
465        }
466        // same representation as map
467        Value::Record(r) => {
468            builder.begin_object();
469            for (k, v) in r {
470                builder.add_string(k);
471                avro_to_jsonb(v, builder)?;
472            }
473            builder.end_object()
474        }
475        Value::Array(a) => {
476            builder.begin_array();
477            for v in a {
478                avro_to_jsonb(v, builder)?;
479            }
480            builder.end_array()
481        }
482
483        // TODO: figure out where the following encoding is reasonable before enabling them.
484        // See discussions: https://github.com/risingwavelabs/risingwave/pull/16948
485
486        // jsonbb supports int64, but JSON spec does not allow it. How should we handle it?
487        // BTW, protobuf canonical JSON converts int64 to string.
488        // Value::Long(l) => builder.add_i64(*l),
489        // Value::Float(f) => {
490        //     if f.is_nan() || f.is_infinite() {
491        //         // XXX: pad null or return err here?
492        //         builder.add_null()
493        //     } else {
494        //         builder.add_f64(*f as f64)
495        //     }
496        // }
497        // Value::Double(f) => {
498        //     if f.is_nan() || f.is_infinite() {
499        //         // XXX: pad null or return err here?
500        //         builder.add_null()
501        //     } else {
502        //         builder.add_f64(*f)
503        //     }
504        // }
505        // // XXX: What encoding to use?
506        // // ToText is \x plus hex string.
507        // Value::Bytes(b) => builder.add_string(&ToText::to_text(&b.as_slice())),
508        // Value::Enum(_, symbol) => {
509        //     builder.add_string(&symbol);
510        // }
511        // Value::Uuid(id) => builder.add_string(&id.as_hyphenated().to_string()),
512        // // For Union, one concern is that the avro union is tagged (like rust enum) but json union is untagged (like c union).
513        // // When the union consists of multiple records, it is possible to distinguish which variant is active in avro, but in json they will all become jsonb objects and indistinguishable.
514        // Value::Union(_, v) => avro_to_jsonb(v, builder)?
515        // XXX: pad null or return err here?
516        v @ (Value::Long(_)
517        | Value::Float(_)
518        | Value::Double(_)
519        | Value::Bytes(_)
520        | Value::Enum(_, _)
521        | Value::Fixed(_, _)
522        | Value::Date(_)
523        | Value::Decimal(_)
524        | Value::TimeMillis(_)
525        | Value::TimeMicros(_)
526        | Value::TimestampMillis(_)
527        | Value::TimestampMicros(_)
528        | Value::LocalTimestampMillis(_)
529        | Value::LocalTimestampMicros(_)
530        | Value::Duration(_)
531        | Value::Uuid(_)
532        | Value::Union(_, _)) => {
533            bail_uncategorized!(
534                "unimplemented conversion from avro to jsonb: {:?}",
535                ValueKind::from(v)
536            )
537        }
538    }
539    Ok(())
540}
541
542#[cfg(test)]
543mod tests {
544    use std::str::FromStr;
545
546    use apache_avro::{Decimal as AvroDecimal, from_avro_datum};
547    use expect_test::expect;
548    use risingwave_common::types::{Datum, Decimal};
549
550    use super::*;
551
552    /// Test the behavior of the Rust Avro lib for handling union with logical type.
553    #[test]
554    fn test_avro_lib_union() {
555        // duplicate types
556        let s = Schema::parse_str(r#"["null", "null"]"#);
557        expect![[r#"
558            Err(
559                Unions cannot contain duplicate types,
560            )
561        "#]]
562        .assert_debug_eq(&s);
563        let s = Schema::parse_str(r#"["int", "int"]"#);
564        expect![[r#"
565            Err(
566                Unions cannot contain duplicate types,
567            )
568        "#]]
569        .assert_debug_eq(&s);
570        // multiple map/array are considered as the same type, regardless of the element type!
571        let s = Schema::parse_str(
572            r#"[
573"null",
574{
575    "type": "map",
576    "values" : "long",
577    "default": {}
578},
579{
580    "type": "map",
581    "values" : "int",
582    "default": {}
583}
584]
585"#,
586        );
587        expect![[r#"
588            Err(
589                Unions cannot contain duplicate types,
590            )
591        "#]]
592        .assert_debug_eq(&s);
593        let s = Schema::parse_str(
594            r#"[
595"null",
596{
597    "type": "array",
598    "items" : "long",
599    "default": {}
600},
601{
602    "type": "array",
603    "items" : "int",
604    "default": {}
605}
606]
607"#,
608        );
609        expect![[r#"
610        Err(
611            Unions cannot contain duplicate types,
612        )
613    "#]]
614        .assert_debug_eq(&s);
615        // multiple named types
616        let s = Schema::parse_str(
617            r#"[
618"null",
619{"type":"fixed","name":"a","size":16},
620{"type":"fixed","name":"b","size":32}
621]
622"#,
623        );
624        expect![[r#"
625            Ok(
626                Union(
627                    UnionSchema {
628                        schemas: [
629                            Null,
630                            Fixed(
631                                FixedSchema {
632                                    name: Name {
633                                        name: "a",
634                                        namespace: None,
635                                    },
636                                    aliases: None,
637                                    doc: None,
638                                    size: 16,
639                                    attributes: {},
640                                },
641                            ),
642                            Fixed(
643                                FixedSchema {
644                                    name: Name {
645                                        name: "b",
646                                        namespace: None,
647                                    },
648                                    aliases: None,
649                                    doc: None,
650                                    size: 32,
651                                    attributes: {},
652                                },
653                            ),
654                        ],
655                        variant_index: {
656                            Null: 0,
657                        },
658                    },
659                ),
660            )
661        "#]]
662        .assert_debug_eq(&s);
663
664        // union in union
665        let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
666        expect![[r#"
667            Err(
668                Unions may not directly contain a union,
669            )
670        "#]]
671        .assert_debug_eq(&s);
672
673        // logical type
674        let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
675        expect![[r#"
676            Union(
677                UnionSchema {
678                    schemas: [
679                        Null,
680                        Uuid,
681                    ],
682                    variant_index: {
683                        Null: 0,
684                        Uuid: 1,
685                    },
686                },
687            )
688        "#]]
689        .assert_debug_eq(&s);
690        // Note: Java Avro lib rejects this (logical type unions with its physical type)
691        let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
692        expect![[r#"
693            Union(
694                UnionSchema {
695                    schemas: [
696                        String,
697                        Uuid,
698                    ],
699                    variant_index: {
700                        String: 0,
701                        Uuid: 1,
702                    },
703                },
704            )
705        "#]]
706        .assert_debug_eq(&s);
707        // Note: Java Avro lib rejects this (logical type unions with its physical type)
708        let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
709        expect![[r#"
710            Union(
711                UnionSchema {
712                    schemas: [
713                        Int,
714                        Date,
715                    ],
716                    variant_index: {
717                        Int: 0,
718                        Date: 1,
719                    },
720                },
721            )
722        "#]]
723        .assert_debug_eq(&s);
724        // Note: Java Avro lib allows this (2 decimal with different "name")
725        let s = Schema::parse_str(
726            r#"[
727{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
728{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
729]"#,
730        );
731        expect![[r#"
732            Err(
733                Unions cannot contain duplicate types,
734            )
735        "#]]
736        .assert_debug_eq(&s);
737    }
738
739    #[test]
740    fn test_avro_lib_union_record_bug() {
741        // multiple named types (record)
742        let s = Schema::parse_str(
743            r#"
744    {
745      "type": "record",
746      "name": "Root",
747      "fields": [
748        {
749          "name": "unionTypeComplex",
750          "type": [
751            "null",
752            {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
753            {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
754            {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
755          ]
756        }
757      ]
758    }
759        "#,
760        )
761        .unwrap();
762
763        let bytes = hex::decode("060c").unwrap();
764        // Correct should be variant 3 (Sms)
765        let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
766        expect![[r#"
767                Ok(
768                    Record(
769                        [
770                            (
771                                "unionTypeComplex",
772                                Union(
773                                    3,
774                                    Record(
775                                        [
776                                            (
777                                                "inner",
778                                                Int(
779                                                    6,
780                                                ),
781                                            ),
782                                        ],
783                                    ),
784                                ),
785                            ),
786                        ],
787                    ),
788                )
789            "#]]
790        .assert_debug_eq(&correct_value);
791        // Bug: We got variant 2 (Fax) here, if we pass the reader schema.
792        let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
793        expect![[r#"
794                Ok(
795                    Record(
796                        [
797                            (
798                                "unionTypeComplex",
799                                Union(
800                                    2,
801                                    Record(
802                                        [
803                                            (
804                                                "inner",
805                                                Int(
806                                                    6,
807                                                ),
808                                            ),
809                                        ],
810                                    ),
811                                ),
812                            ),
813                        ],
814                    ),
815                )
816            "#]]
817        .assert_debug_eq(&wrong_value);
818
819        // The bug below can explain what happened.
820        // The two records below are actually incompatible: https://avro.apache.org/docs/1.11.1/specification/_print/#schema-resolution
821        // > both schemas are records with the _same (unqualified) name_
822        // In from_avro_datum, it first reads the value with the writer schema, and then
823        // it just uses the reader schema to interpret the value.
824        // The value doesn't have record "name" information. So it wrongly passed the conversion.
825        // The correct way is that we need to use both the writer and reader schema in the second step to interpret the value.
826
827        let s = Schema::parse_str(
828            r#"
829    {
830      "type": "record",
831      "name": "Root",
832      "fields": [
833        {
834          "name": "a",
835          "type": "int"
836        }
837      ]
838    }
839        "#,
840        )
841        .unwrap();
842        let s2 = Schema::parse_str(
843            r#"
844{
845  "type": "record",
846  "name": "Root222",
847  "fields": [
848    {
849      "name": "a",
850      "type": "int"
851    }
852  ]
853}
854    "#,
855        )
856        .unwrap();
857
858        let bytes = hex::decode("0c").unwrap();
859        let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
860        expect![[r#"
861            Ok(
862                Record(
863                    [
864                        (
865                            "a",
866                            Int(
867                                6,
868                            ),
869                        ),
870                    ],
871                ),
872            )
873        "#]]
874        .assert_debug_eq(&value);
875    }
876
877    #[test]
878    fn test_convert_decimal() {
879        // 280
880        let v = vec![1, 24];
881        let avro_decimal = AvroDecimal::from(v);
882        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 0).unwrap();
883        assert_eq!(rust_decimal, rust_decimal::Decimal::from(280));
884
885        // 28.1
886        let v = vec![1, 25];
887        let avro_decimal = AvroDecimal::from(v);
888        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 1).unwrap();
889        assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
890
891        // 1.1234567891
892        let value = BigInt::from(11234567891_i64);
893        let decimal = scaled_bigint_to_rust_decimal(value, 10).unwrap();
894        assert_eq!(
895            decimal,
896            rust_decimal::Decimal::try_from(1.1234567891).unwrap()
897        );
898
899        // 1.123456789123456789123456789
900        let v = vec![3, 161, 77, 58, 146, 180, 49, 220, 100, 4, 95, 21];
901        let avro_decimal = AvroDecimal::from(v);
902        let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 27).unwrap();
903        assert_eq!(
904            rust_decimal,
905            rust_decimal::Decimal::from_str("1.123456789123456789123456789").unwrap()
906        );
907    }
908
909    /// Convert Avro value to datum.For now, support the following [Avro type](https://avro.apache.org/docs/current/spec.html).
910    ///  - boolean
911    ///  - int : i32
912    ///  - long: i64
913    ///  - float: f32
914    ///  - double: f64
915    ///  - string: String
916    ///  - Date (the number of days from the unix epoch, 1970-1-1 UTC)
917    ///  - Timestamp (the number of milliseconds from the unix epoch,  1970-1-1 00:00:00.000 UTC)
918    fn from_avro_value(
919        value: Value,
920        value_schema: &Schema,
921        shape: &DataType,
922    ) -> anyhow::Result<Datum> {
923        Ok(AvroParseOptions::create(value_schema)
924            .inner
925            .convert_to_datum(value_schema, &value, shape)?
926            .to_owned_datum())
927    }
928
929    #[test]
930    fn test_avro_timestamptz_micros() {
931        let v1 = Value::TimestampMicros(1620000000000000);
932        let v2 = Value::TimestampMillis(1620000000000);
933        let value_schema1 = Schema::TimestampMicros;
934        let value_schema2 = Schema::TimestampMillis;
935        let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
936        let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
937        assert_eq!(
938            datum1,
939            Some(ScalarImpl::Timestamptz(
940                Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
941            ))
942        );
943        assert_eq!(
944            datum2,
945            Some(ScalarImpl::Timestamptz(
946                Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
947            ))
948        );
949    }
950
951    #[test]
952    fn test_decimal_truncate() {
953        let schema = Schema::parse_str(
954            r#"
955            {
956                "type": "bytes",
957                "logicalType": "decimal",
958                "precision": 38,
959                "scale": 18
960            }
961            "#,
962        )
963        .unwrap();
964        let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
965        let value = Value::Decimal(AvroDecimal::from(bytes));
966        let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
967        assert_eq!(
968            resp,
969            Some(ScalarImpl::Decimal(Decimal::Normalized(
970                rust_decimal::Decimal::from_str("0.017802464409370431").unwrap()
971            )))
972        );
973    }
974
975    #[test]
976    fn test_variable_scale_decimal() {
977        let schema = Schema::parse_str(
978            r#"
979            {
980                "type": "record",
981                "name": "VariableScaleDecimal",
982                "namespace": "io.debezium.data",
983                "fields": [
984                    {
985                        "name": "scale",
986                        "type": "int"
987                    },
988                    {
989                        "name": "value",
990                        "type": "bytes"
991                    }
992                ]
993            }
994            "#,
995        )
996        .unwrap();
997        let value = Value::Record(vec![
998            ("scale".to_owned(), Value::Int(0)),
999            ("value".to_owned(), Value::Bytes(vec![0x01, 0x02, 0x03])),
1000        ]);
1001
1002        let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
1003        assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
1004    }
1005}