1mod schema;
16
17use apache_avro::Schema;
18use apache_avro::schema::{DecimalSchema, NamesRef, UnionSchema};
19use apache_avro::types::{Value, ValueKind};
20use chrono::Datelike;
21use itertools::Itertools;
22use num_bigint::BigInt;
23use risingwave_common::array::{ListValue, StructValue};
24use risingwave_common::types::{
25 DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
26 Timestamptz, ToOwnedDatum,
27};
28
29pub use self::schema::{MapHandling, ResolvedAvroSchema, avro_schema_to_fields};
30use super::utils::scaled_bigint_to_rust_decimal;
31use super::{Access, AccessError, AccessResult, bail_uncategorized, uncategorized};
32use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
33
34#[derive(Clone)]
35pub struct AvroParseOptions<'a> {
37 root_schema: &'a Schema,
39 inner: AvroParseOptionsInner<'a>,
41}
42
43#[derive(Clone)]
44struct AvroParseOptionsInner<'a> {
46 refs: NamesRef<'a>,
48 relax_numeric: bool,
51}
52
53impl<'a> AvroParseOptions<'a> {
54 pub fn create(root_schema: &'a Schema) -> Self {
55 let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema)
56 .expect("avro schema is self contained");
57 Self {
58 root_schema,
59 inner: AvroParseOptionsInner {
60 refs: resolved.get_names().clone(),
61 relax_numeric: true,
62 },
63 }
64 }
65}
66
67impl<'a> AvroParseOptionsInner<'a> {
68 fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema {
69 match schema {
70 Schema::Ref { name } => self.refs[name],
71 _ => schema,
72 }
73 }
74
75 fn convert_to_datum<'b>(
89 &self,
90 unresolved_schema: &'a Schema,
91 value: &'b Value,
92 type_expected: &DataType,
93 ) -> AccessResult<DatumCow<'b>>
94 where
95 'b: 'a,
96 {
97 let create_error = || AccessError::TypeError {
98 expected: format!("{:?}", type_expected),
99 got: format!("{:?}", value),
100 value: String::new(),
101 };
102
103 macro_rules! borrowed {
104 ($v:expr) => {
105 return Ok(DatumCow::Borrowed(Some($v.into())))
106 };
107 }
108
109 let v: ScalarImpl = match (type_expected, value) {
110 (_, Value::Null) => return Ok(DatumCow::NULL),
111 (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
113 let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
114 return Err(create_error());
116 };
117
118 if let Some(inner) = get_nullable_union_inner(u) {
119 return self.convert_to_datum(inner, v, type_expected);
121 }
122 let variant_schema = &u.variants()[*variant as usize];
123
124 if matches!(variant_schema, &Schema::Null) {
125 return Ok(DatumCow::NULL);
126 }
127
128 let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
133
134 let mut fields = Vec::with_capacity(struct_type_info.len());
135 for (field_name, field_type) in struct_type_info.iter() {
136 if field_name == expected_field_name {
137 let datum = self
138 .convert_to_datum(variant_schema, v, field_type)?
139 .to_owned_datum();
140
141 fields.push(datum)
142 } else {
143 fields.push(None)
144 }
145 }
146 StructValue::new(fields).into()
147 }
148 (_, Value::Union(_, v)) => {
150 let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
151 return Err(create_error());
152 };
153 let Some(schema) = get_nullable_union_inner(u) else {
154 return Err(create_error());
155 };
156 return self.convert_to_datum(schema, v, type_expected);
157 }
158 (DataType::Boolean, Value::Boolean(b)) => (*b).into(),
160 (DataType::Int16, Value::Int(i)) if self.relax_numeric => (*i as i16).into(),
162 (DataType::Int16, Value::Long(i)) if self.relax_numeric => (*i as i16).into(),
163
164 (DataType::Int32, Value::Int(i)) => (*i).into(),
166 (DataType::Int32, Value::Long(i)) if self.relax_numeric => (*i as i32).into(),
167 (DataType::Int64, Value::Long(i)) => (*i).into(),
169 (DataType::Int64, Value::Int(i)) if self.relax_numeric => (*i as i64).into(),
170 (DataType::Float32, Value::Float(i)) => (*i).into(),
172 (DataType::Float32, Value::Double(i)) => (*i as f32).into(),
173 (DataType::Float64, Value::Double(i)) => (*i).into(),
175 (DataType::Float64, Value::Float(i)) => (*i as f64).into(),
176 (DataType::Decimal, Value::Decimal(avro_decimal)) => {
178 let (_precision, scale) = match self.lookup_ref(unresolved_schema) {
179 Schema::Decimal(DecimalSchema {
180 precision, scale, ..
181 }) => (*precision, *scale),
182 _ => Err(create_error())?,
183 };
184 let decimal = scaled_bigint_to_rust_decimal(avro_decimal.clone().into(), scale)
185 .map_err(|_| create_error())?;
186 ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
187 }
188 (DataType::Decimal, Value::Record(fields)) => {
189 let find_in_records = |field_name: &str| {
191 fields
192 .iter()
193 .find(|field| field.0 == field_name)
194 .map(|field| &field.1)
195 .ok_or_else(|| {
196 uncategorized!("`{field_name}` field not found in VariableScaleDecimal")
197 })
198 };
199 let scale = match find_in_records("scale")? {
200 Value::Int(scale) => *scale,
201 avro_value => bail_uncategorized!(
202 "scale field in VariableScaleDecimal is not int, got {:?}",
203 avro_value
204 ),
205 };
206
207 let value: BigInt = match find_in_records("value")? {
208 Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes),
209 avro_value => bail_uncategorized!(
210 "value field in VariableScaleDecimal is not bytes, got {:?}",
211 avro_value
212 ),
213 };
214
215 let decimal = scaled_bigint_to_rust_decimal(value, scale as _)?;
216 ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
217 }
218 (DataType::Time, Value::TimeMillis(ms)) => Time::with_milli(*ms as u32)
220 .map_err(|_| create_error())?
221 .into(),
222 (DataType::Time, Value::TimeMicros(us)) => Time::with_micro(*us as u64)
223 .map_err(|_| create_error())?
224 .into(),
225 (DataType::Date, Value::Date(days)) => {
227 Date::with_days_since_ce(days + unix_epoch_days())
228 .map_err(|_| create_error())?
229 .into()
230 }
231 (DataType::Varchar, Value::Enum(_, symbol)) => borrowed!(symbol.as_str()),
233 (DataType::Varchar, Value::String(s)) => borrowed!(s.as_str()),
234 (DataType::Timestamp, Value::LocalTimestampMillis(ms)) => Timestamp::with_millis(*ms)
236 .map_err(|_| create_error())?
237 .into(),
238 (DataType::Timestamp, Value::LocalTimestampMicros(us)) => Timestamp::with_micros(*us)
239 .map_err(|_| create_error())?
240 .into(),
241
242 (DataType::Timestamptz, Value::TimestampMillis(ms)) => Timestamptz::from_millis(*ms)
244 .ok_or_else(|| {
245 uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range")
246 })?
247 .into(),
248 (DataType::Timestamptz, Value::TimestampMicros(us)) => {
249 Timestamptz::from_micros(*us).into()
250 }
251
252 (DataType::Interval, Value::Duration(duration)) => {
254 let months = u32::from(duration.months()) as i32;
255 let days = u32::from(duration.days()) as i32;
256 let usecs = (u32::from(duration.millis()) as i64) * 1000; ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
258 }
259 (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({
261 let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else {
262 return Err(create_error());
263 };
264 struct_type_info
265 .iter()
266 .map(|(field_name, field_type)| {
267 if let Some(idx) = record_schema.lookup.get(field_name) {
268 let value = &descs[*idx].1;
269 let schema = &record_schema.fields[*idx].schema;
270 Ok(self
271 .convert_to_datum(schema, value, field_type)?
272 .to_owned_datum())
273 } else {
274 Ok(None)
275 }
276 })
277 .collect::<Result<_, AccessError>>()?
278 })
279 .into(),
280 (DataType::List(list_type), Value::Array(array)) => ListValue::new({
282 let Schema::Array(element_schema) = self.lookup_ref(unresolved_schema) else {
283 return Err(create_error());
284 };
285 let schema = element_schema;
286 let elem_type = list_type.elem();
287 let mut builder = elem_type.create_array_builder(array.len());
288 for v in array {
289 let value = self.convert_to_datum(schema, v, elem_type)?;
290 builder.append(value);
291 }
292 builder.finish()
293 })
294 .into(),
295 (DataType::Bytea, Value::Bytes(value)) => borrowed!(value.as_slice()),
297 (DataType::Jsonb, v @ Value::Map(_)) => {
299 let mut builder = jsonbb::Builder::default();
300 avro_to_jsonb(v, &mut builder)?;
301 let jsonb = builder.finish();
302 debug_assert!(jsonb.as_ref().is_object());
303 JsonbVal::from(jsonb).into()
304 }
305 (DataType::Varchar, Value::Uuid(uuid)) => {
306 uuid.as_hyphenated().to_string().into_boxed_str().into()
307 }
308 (DataType::Map(map_type), Value::Map(map)) => {
309 let Schema::Map(value_schema) = self.lookup_ref(unresolved_schema) else {
310 return Err(create_error());
311 };
312 let schema = value_schema;
313 let mut builder = map_type
314 .clone()
315 .into_struct()
316 .create_array_builder(map.len());
317 for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) {
325 let value_datum = self
326 .convert_to_datum(schema, v, map_type.value())?
327 .to_owned_datum();
328 builder.append(
329 StructValue::new(vec![Some(k.as_str().into()), value_datum])
330 .to_owned_datum(),
331 );
332 }
333 let list = ListValue::new(builder.finish());
334 MapValue::from_entries(list).into()
335 }
336
337 (_expected, _got) => Err(create_error())?,
338 };
339 Ok(DatumCow::Owned(Some(v)))
340 }
341}
342
343pub struct AvroAccess<'a> {
344 value: &'a Value,
345 options: AvroParseOptions<'a>,
346}
347
348impl<'a> AvroAccess<'a> {
349 pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self {
350 Self {
351 value: root_value,
352 options,
353 }
354 }
355}
356
357impl Access for AvroAccess<'_> {
358 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
359 let mut value = self.value;
360 let mut unresolved_schema = self.options.root_schema;
361
362 debug_assert!(
363 path.len() == 1
364 || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
365 "unexpected path access: {:?}",
366 path
367 );
368 let mut i = 0;
369 while i < path.len() {
370 let key = path[i];
371 let create_error = || AccessError::Undefined {
372 name: key.to_owned(),
373 path: path.iter().take(i).join("."),
374 };
375 match value {
376 Value::Union(_, v) => {
377 value = v;
401 let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else {
402 return Err(create_error());
403 };
404 let Some(schema) = get_nullable_union_inner(u) else {
405 return Err(create_error());
406 };
407 unresolved_schema = schema;
408 continue;
409 }
410 Value::Record(fields) => {
411 let Schema::Record(record_schema) =
412 self.options.inner.lookup_ref(unresolved_schema)
413 else {
414 return Err(create_error());
415 };
416 if let Some(idx) = record_schema.lookup.get(key) {
417 value = &fields[*idx].1;
418 unresolved_schema = &record_schema.fields[*idx].schema;
419 i += 1;
420 continue;
421 }
422 }
423 _ => (),
424 }
425 Err(create_error())?;
426 }
427
428 self.options
429 .inner
430 .convert_to_datum(unresolved_schema, value, type_expected)
431 }
432}
433
434pub fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
436 let variants = union_schema.variants();
437 if variants.len() == 2 && variants.contains(&Schema::Null) {
439 let inner_schema = variants
440 .iter()
441 .find(|s| !matches!(s, &&Schema::Null))
442 .unwrap();
443 Some(inner_schema)
444 } else {
445 None
446 }
447}
448
449pub(crate) fn unix_epoch_days() -> i32 {
450 Date::from_ymd_uncheck(1970, 1, 1).0.num_days_from_ce()
451}
452
453pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> AccessResult<()> {
454 match avro {
455 Value::Null => builder.add_null(),
456 Value::Boolean(b) => builder.add_bool(*b),
457 Value::Int(i) => builder.add_i64(*i as i64),
458 Value::String(s) => builder.add_string(s),
459 Value::Map(m) => {
460 builder.begin_object();
461 for (k, v) in m {
462 builder.add_string(k);
463 avro_to_jsonb(v, builder)?;
464 }
465 builder.end_object()
466 }
467 Value::Record(r) => {
469 builder.begin_object();
470 for (k, v) in r {
471 builder.add_string(k);
472 avro_to_jsonb(v, builder)?;
473 }
474 builder.end_object()
475 }
476 Value::Array(a) => {
477 builder.begin_array();
478 for v in a {
479 avro_to_jsonb(v, builder)?;
480 }
481 builder.end_array()
482 }
483
484 v @ (Value::Long(_)
518 | Value::Float(_)
519 | Value::Double(_)
520 | Value::Bytes(_)
521 | Value::Enum(_, _)
522 | Value::Fixed(_, _)
523 | Value::Date(_)
524 | Value::Decimal(_)
525 | Value::TimeMillis(_)
526 | Value::TimeMicros(_)
527 | Value::TimestampMillis(_)
528 | Value::TimestampMicros(_)
529 | Value::LocalTimestampMillis(_)
530 | Value::LocalTimestampMicros(_)
531 | Value::Duration(_)
532 | Value::Uuid(_)
533 | Value::Union(_, _)) => {
534 bail_uncategorized!(
535 "unimplemented conversion from avro to jsonb: {:?}",
536 ValueKind::from(v)
537 )
538 }
539 }
540 Ok(())
541}
542
543#[cfg(test)]
544mod tests {
545 use std::str::FromStr;
546
547 use apache_avro::{Decimal as AvroDecimal, from_avro_datum};
548 use expect_test::expect;
549 use risingwave_common::types::{Datum, Decimal};
550
551 use super::*;
552
553 #[test]
555 fn test_avro_lib_union() {
556 let s = Schema::parse_str(r#"["null", "null"]"#);
558 expect![[r#"
559 Err(
560 Unions cannot contain duplicate types,
561 )
562 "#]]
563 .assert_debug_eq(&s);
564 let s = Schema::parse_str(r#"["int", "int"]"#);
565 expect![[r#"
566 Err(
567 Unions cannot contain duplicate types,
568 )
569 "#]]
570 .assert_debug_eq(&s);
571 let s = Schema::parse_str(
573 r#"[
574"null",
575{
576 "type": "map",
577 "values" : "long",
578 "default": {}
579},
580{
581 "type": "map",
582 "values" : "int",
583 "default": {}
584}
585]
586"#,
587 );
588 expect![[r#"
589 Err(
590 Unions cannot contain duplicate types,
591 )
592 "#]]
593 .assert_debug_eq(&s);
594 let s = Schema::parse_str(
595 r#"[
596"null",
597{
598 "type": "array",
599 "items" : "long",
600 "default": {}
601},
602{
603 "type": "array",
604 "items" : "int",
605 "default": {}
606}
607]
608"#,
609 );
610 expect![[r#"
611 Err(
612 Unions cannot contain duplicate types,
613 )
614 "#]]
615 .assert_debug_eq(&s);
616 let s = Schema::parse_str(
618 r#"[
619"null",
620{"type":"fixed","name":"a","size":16},
621{"type":"fixed","name":"b","size":32}
622]
623"#,
624 );
625 expect![[r#"
626 Ok(
627 Union(
628 UnionSchema {
629 schemas: [
630 Null,
631 Fixed(
632 FixedSchema {
633 name: Name {
634 name: "a",
635 namespace: None,
636 },
637 aliases: None,
638 doc: None,
639 size: 16,
640 attributes: {},
641 },
642 ),
643 Fixed(
644 FixedSchema {
645 name: Name {
646 name: "b",
647 namespace: None,
648 },
649 aliases: None,
650 doc: None,
651 size: 32,
652 attributes: {},
653 },
654 ),
655 ],
656 variant_index: {
657 Null: 0,
658 },
659 },
660 ),
661 )
662 "#]]
663 .assert_debug_eq(&s);
664
665 let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
667 expect![[r#"
668 Err(
669 Unions may not directly contain a union,
670 )
671 "#]]
672 .assert_debug_eq(&s);
673
674 let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
676 expect![[r#"
677 Union(
678 UnionSchema {
679 schemas: [
680 Null,
681 Uuid,
682 ],
683 variant_index: {
684 Null: 0,
685 Uuid: 1,
686 },
687 },
688 )
689 "#]]
690 .assert_debug_eq(&s);
691 let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
693 expect![[r#"
694 Union(
695 UnionSchema {
696 schemas: [
697 String,
698 Uuid,
699 ],
700 variant_index: {
701 String: 0,
702 Uuid: 1,
703 },
704 },
705 )
706 "#]]
707 .assert_debug_eq(&s);
708 let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
710 expect![[r#"
711 Union(
712 UnionSchema {
713 schemas: [
714 Int,
715 Date,
716 ],
717 variant_index: {
718 Int: 0,
719 Date: 1,
720 },
721 },
722 )
723 "#]]
724 .assert_debug_eq(&s);
725 let s = Schema::parse_str(
727 r#"[
728{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
729{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
730]"#,
731 );
732 expect![[r#"
733 Err(
734 Unions cannot contain duplicate types,
735 )
736 "#]]
737 .assert_debug_eq(&s);
738 }
739
740 #[test]
741 fn test_avro_lib_union_record_bug() {
742 let s = Schema::parse_str(
744 r#"
745 {
746 "type": "record",
747 "name": "Root",
748 "fields": [
749 {
750 "name": "unionTypeComplex",
751 "type": [
752 "null",
753 {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
754 {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
755 {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
756 ]
757 }
758 ]
759 }
760 "#,
761 )
762 .unwrap();
763
764 let bytes = hex::decode("060c").unwrap();
765 let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
767 expect![[r#"
768 Ok(
769 Record(
770 [
771 (
772 "unionTypeComplex",
773 Union(
774 3,
775 Record(
776 [
777 (
778 "inner",
779 Int(
780 6,
781 ),
782 ),
783 ],
784 ),
785 ),
786 ),
787 ],
788 ),
789 )
790 "#]]
791 .assert_debug_eq(&correct_value);
792 let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
794 expect![[r#"
795 Ok(
796 Record(
797 [
798 (
799 "unionTypeComplex",
800 Union(
801 2,
802 Record(
803 [
804 (
805 "inner",
806 Int(
807 6,
808 ),
809 ),
810 ],
811 ),
812 ),
813 ),
814 ],
815 ),
816 )
817 "#]]
818 .assert_debug_eq(&wrong_value);
819
820 let s = Schema::parse_str(
829 r#"
830 {
831 "type": "record",
832 "name": "Root",
833 "fields": [
834 {
835 "name": "a",
836 "type": "int"
837 }
838 ]
839 }
840 "#,
841 )
842 .unwrap();
843 let s2 = Schema::parse_str(
844 r#"
845{
846 "type": "record",
847 "name": "Root222",
848 "fields": [
849 {
850 "name": "a",
851 "type": "int"
852 }
853 ]
854}
855 "#,
856 )
857 .unwrap();
858
859 let bytes = hex::decode("0c").unwrap();
860 let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
861 expect![[r#"
862 Ok(
863 Record(
864 [
865 (
866 "a",
867 Int(
868 6,
869 ),
870 ),
871 ],
872 ),
873 )
874 "#]]
875 .assert_debug_eq(&value);
876 }
877
878 #[test]
879 fn test_convert_decimal() {
880 let v = vec![1, 24];
882 let avro_decimal = AvroDecimal::from(v);
883 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 0).unwrap();
884 assert_eq!(rust_decimal, rust_decimal::Decimal::from(280));
885
886 let v = vec![1, 25];
888 let avro_decimal = AvroDecimal::from(v);
889 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 1).unwrap();
890 assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
891
892 let value = BigInt::from(11234567891_i64);
894 let decimal = scaled_bigint_to_rust_decimal(value, 10).unwrap();
895 assert_eq!(
896 decimal,
897 rust_decimal::Decimal::try_from(1.1234567891).unwrap()
898 );
899
900 let v = vec![3, 161, 77, 58, 146, 180, 49, 220, 100, 4, 95, 21];
902 let avro_decimal = AvroDecimal::from(v);
903 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 27).unwrap();
904 assert_eq!(
905 rust_decimal,
906 rust_decimal::Decimal::from_str("1.123456789123456789123456789").unwrap()
907 );
908 }
909
910 fn from_avro_value(
920 value: Value,
921 value_schema: &Schema,
922 shape: &DataType,
923 ) -> anyhow::Result<Datum> {
924 Ok(AvroParseOptions::create(value_schema)
925 .inner
926 .convert_to_datum(value_schema, &value, shape)?
927 .to_owned_datum())
928 }
929
930 #[test]
931 fn test_avro_timestamptz_micros() {
932 let v1 = Value::TimestampMicros(1620000000000000);
933 let v2 = Value::TimestampMillis(1620000000000);
934 let value_schema1 = Schema::TimestampMicros;
935 let value_schema2 = Schema::TimestampMillis;
936 let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
937 let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
938 assert_eq!(
939 datum1,
940 Some(ScalarImpl::Timestamptz(
941 Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
942 ))
943 );
944 assert_eq!(
945 datum2,
946 Some(ScalarImpl::Timestamptz(
947 Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
948 ))
949 );
950 }
951
952 #[test]
953 fn test_decimal_truncate() {
954 let schema = Schema::parse_str(
955 r#"
956 {
957 "type": "bytes",
958 "logicalType": "decimal",
959 "precision": 38,
960 "scale": 18
961 }
962 "#,
963 )
964 .unwrap();
965 let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
966 let value = Value::Decimal(AvroDecimal::from(bytes));
967 let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
968 assert_eq!(
969 resp,
970 Some(ScalarImpl::Decimal(Decimal::Normalized(
971 rust_decimal::Decimal::from_str("0.017802464409370431").unwrap()
972 )))
973 );
974 }
975
976 #[test]
977 fn test_variable_scale_decimal() {
978 let schema = Schema::parse_str(
979 r#"
980 {
981 "type": "record",
982 "name": "VariableScaleDecimal",
983 "namespace": "io.debezium.data",
984 "fields": [
985 {
986 "name": "scale",
987 "type": "int"
988 },
989 {
990 "name": "value",
991 "type": "bytes"
992 }
993 ]
994 }
995 "#,
996 )
997 .unwrap();
998 let value = Value::Record(vec![
999 ("scale".to_owned(), Value::Int(0)),
1000 ("value".to_owned(), Value::Bytes(vec![0x01, 0x02, 0x03])),
1001 ]);
1002
1003 let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
1004 assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
1005 }
1006}