1mod schema;
16
17use apache_avro::Schema;
18use apache_avro::schema::{DecimalSchema, NamesRef, UnionSchema};
19use apache_avro::types::{Value, ValueKind};
20use chrono::Datelike;
21use itertools::Itertools;
22use num_bigint::BigInt;
23use risingwave_common::array::{ListValue, StructValue};
24use risingwave_common::types::{
25 DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
26 Timestamptz, ToOwnedDatum,
27};
28
29pub use self::schema::{MapHandling, ResolvedAvroSchema, avro_schema_to_fields};
30use super::utils::scaled_bigint_to_rust_decimal;
31use super::{Access, AccessError, AccessResult, bail_uncategorized, uncategorized};
32use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
33
34#[derive(Clone)]
35pub struct AvroParseOptions<'a> {
37 root_schema: &'a Schema,
39 inner: AvroParseOptionsInner<'a>,
41}
42
43#[derive(Clone)]
44struct AvroParseOptionsInner<'a> {
46 refs: NamesRef<'a>,
48 relax_numeric: bool,
51}
52
53impl<'a> AvroParseOptions<'a> {
54 pub fn create(root_schema: &'a Schema) -> Self {
55 let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema)
56 .expect("avro schema is self contained");
57 Self {
58 root_schema,
59 inner: AvroParseOptionsInner {
60 refs: resolved.get_names().clone(),
61 relax_numeric: true,
62 },
63 }
64 }
65}
66
67impl<'a> AvroParseOptionsInner<'a> {
68 fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema {
69 match schema {
70 Schema::Ref { name } => self.refs[name],
71 _ => schema,
72 }
73 }
74
75 fn convert_to_datum<'b>(
89 &self,
90 unresolved_schema: &'a Schema,
91 value: &'b Value,
92 type_expected: &DataType,
93 ) -> AccessResult<DatumCow<'b>>
94 where
95 'b: 'a,
96 {
97 let create_error = || AccessError::TypeError {
98 expected: format!("{:?}", type_expected),
99 got: format!("{:?}", value),
100 value: String::new(),
101 };
102
103 macro_rules! borrowed {
104 ($v:expr) => {
105 return Ok(DatumCow::Borrowed(Some($v.into())))
106 };
107 }
108
109 let v: ScalarImpl = match (type_expected, value) {
110 (_, Value::Null) => return Ok(DatumCow::NULL),
111 (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
113 let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
114 return Err(create_error());
116 };
117
118 if let Some(inner) = get_nullable_union_inner(u) {
119 return self.convert_to_datum(inner, v, type_expected);
121 }
122 let variant_schema = &u.variants()[*variant as usize];
123
124 if matches!(variant_schema, &Schema::Null) {
125 return Ok(DatumCow::NULL);
126 }
127
128 let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
133
134 let mut fields = Vec::with_capacity(struct_type_info.len());
135 for (field_name, field_type) in struct_type_info.iter() {
136 if field_name == expected_field_name {
137 let datum = self
138 .convert_to_datum(variant_schema, v, field_type)?
139 .to_owned_datum();
140
141 fields.push(datum)
142 } else {
143 fields.push(None)
144 }
145 }
146 StructValue::new(fields).into()
147 }
148 (_, Value::Union(_, v)) => {
150 let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
151 return Err(create_error());
152 };
153 let Some(schema) = get_nullable_union_inner(u) else {
154 return Err(create_error());
155 };
156 return self.convert_to_datum(schema, v, type_expected);
157 }
158 (DataType::Boolean, Value::Boolean(b)) => (*b).into(),
160 (DataType::Int16, Value::Int(i)) if self.relax_numeric => (*i as i16).into(),
162 (DataType::Int16, Value::Long(i)) if self.relax_numeric => (*i as i16).into(),
163
164 (DataType::Int32, Value::Int(i)) => (*i).into(),
166 (DataType::Int32, Value::Long(i)) if self.relax_numeric => (*i as i32).into(),
167 (DataType::Int64, Value::Long(i)) => (*i).into(),
169 (DataType::Int64, Value::Int(i)) if self.relax_numeric => (*i as i64).into(),
170 (DataType::Float32, Value::Float(i)) => (*i).into(),
172 (DataType::Float32, Value::Double(i)) => (*i as f32).into(),
173 (DataType::Float64, Value::Double(i)) => (*i).into(),
175 (DataType::Float64, Value::Float(i)) => (*i as f64).into(),
176 (DataType::Decimal, Value::Decimal(avro_decimal)) => {
178 let (_precision, scale) = match self.lookup_ref(unresolved_schema) {
179 Schema::Decimal(DecimalSchema {
180 precision, scale, ..
181 }) => (*precision, *scale),
182 _ => Err(create_error())?,
183 };
184 let decimal = scaled_bigint_to_rust_decimal(avro_decimal.clone().into(), scale)
185 .map_err(|_| create_error())?;
186 ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
187 }
188 (DataType::Decimal, Value::Record(fields)) => {
189 let find_in_records = |field_name: &str| {
191 fields
192 .iter()
193 .find(|field| field.0 == field_name)
194 .map(|field| &field.1)
195 .ok_or_else(|| {
196 uncategorized!("`{field_name}` field not found in VariableScaleDecimal")
197 })
198 };
199 let scale = match find_in_records("scale")? {
200 Value::Int(scale) => *scale,
201 avro_value => bail_uncategorized!(
202 "scale field in VariableScaleDecimal is not int, got {:?}",
203 avro_value
204 ),
205 };
206
207 let value: BigInt = match find_in_records("value")? {
208 Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes),
209 avro_value => bail_uncategorized!(
210 "value field in VariableScaleDecimal is not bytes, got {:?}",
211 avro_value
212 ),
213 };
214
215 let decimal = scaled_bigint_to_rust_decimal(value, scale as _)?;
216 ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
217 }
218 (DataType::Time, Value::TimeMillis(ms)) => Time::with_milli(*ms as u32)
220 .map_err(|_| create_error())?
221 .into(),
222 (DataType::Time, Value::TimeMicros(us)) => Time::with_micro(*us as u64)
223 .map_err(|_| create_error())?
224 .into(),
225 (DataType::Date, Value::Date(days)) => {
227 Date::with_days_since_ce(days + unix_epoch_days())
228 .map_err(|_| create_error())?
229 .into()
230 }
231 (DataType::Varchar, Value::Enum(_, symbol)) => borrowed!(symbol.as_str()),
233 (DataType::Varchar, Value::String(s)) => borrowed!(s.as_str()),
234 (DataType::Timestamp, Value::LocalTimestampMillis(ms)) => Timestamp::with_millis(*ms)
236 .map_err(|_| create_error())?
237 .into(),
238 (DataType::Timestamp, Value::LocalTimestampMicros(us)) => Timestamp::with_micros(*us)
239 .map_err(|_| create_error())?
240 .into(),
241
242 (DataType::Timestamptz, Value::TimestampMillis(ms)) => Timestamptz::from_millis(*ms)
244 .ok_or_else(|| {
245 uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range")
246 })?
247 .into(),
248 (DataType::Timestamptz, Value::TimestampMicros(us)) => {
249 Timestamptz::from_micros(*us).into()
250 }
251
252 (DataType::Interval, Value::Duration(duration)) => {
254 let months = u32::from(duration.months()) as i32;
255 let days = u32::from(duration.days()) as i32;
256 let usecs = (u32::from(duration.millis()) as i64) * 1000; ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
258 }
259 (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({
261 let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else {
262 return Err(create_error());
263 };
264 struct_type_info
265 .iter()
266 .map(|(field_name, field_type)| {
267 if let Some(idx) = record_schema.lookup.get(field_name) {
268 let value = &descs[*idx].1;
269 let schema = &record_schema.fields[*idx].schema;
270 Ok(self
271 .convert_to_datum(schema, value, field_type)?
272 .to_owned_datum())
273 } else {
274 Ok(None)
275 }
276 })
277 .collect::<Result<_, AccessError>>()?
278 })
279 .into(),
280 (DataType::List(item_type), Value::Array(array)) => ListValue::new({
282 let Schema::Array(element_schema) = self.lookup_ref(unresolved_schema) else {
283 return Err(create_error());
284 };
285 let schema = element_schema;
286 let mut builder = item_type.create_array_builder(array.len());
287 for v in array {
288 let value = self.convert_to_datum(schema, v, item_type)?;
289 builder.append(value);
290 }
291 builder.finish()
292 })
293 .into(),
294 (DataType::Bytea, Value::Bytes(value)) => borrowed!(value.as_slice()),
296 (DataType::Jsonb, v @ Value::Map(_)) => {
298 let mut builder = jsonbb::Builder::default();
299 avro_to_jsonb(v, &mut builder)?;
300 let jsonb = builder.finish();
301 debug_assert!(jsonb.as_ref().is_object());
302 JsonbVal::from(jsonb).into()
303 }
304 (DataType::Varchar, Value::Uuid(uuid)) => {
305 uuid.as_hyphenated().to_string().into_boxed_str().into()
306 }
307 (DataType::Map(map_type), Value::Map(map)) => {
308 let Schema::Map(value_schema) = self.lookup_ref(unresolved_schema) else {
309 return Err(create_error());
310 };
311 let schema = value_schema;
312 let mut builder = map_type
313 .clone()
314 .into_struct()
315 .create_array_builder(map.len());
316 for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) {
324 let value_datum = self
325 .convert_to_datum(schema, v, map_type.value())?
326 .to_owned_datum();
327 builder.append(
328 StructValue::new(vec![Some(k.as_str().into()), value_datum])
329 .to_owned_datum(),
330 );
331 }
332 let list = ListValue::new(builder.finish());
333 MapValue::from_entries(list).into()
334 }
335
336 (_expected, _got) => Err(create_error())?,
337 };
338 Ok(DatumCow::Owned(Some(v)))
339 }
340}
341
342pub struct AvroAccess<'a> {
343 value: &'a Value,
344 options: AvroParseOptions<'a>,
345}
346
347impl<'a> AvroAccess<'a> {
348 pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self {
349 Self {
350 value: root_value,
351 options,
352 }
353 }
354}
355
356impl Access for AvroAccess<'_> {
357 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
358 let mut value = self.value;
359 let mut unresolved_schema = self.options.root_schema;
360
361 debug_assert!(
362 path.len() == 1
363 || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
364 "unexpected path access: {:?}",
365 path
366 );
367 let mut i = 0;
368 while i < path.len() {
369 let key = path[i];
370 let create_error = || AccessError::Undefined {
371 name: key.to_owned(),
372 path: path.iter().take(i).join("."),
373 };
374 match value {
375 Value::Union(_, v) => {
376 value = v;
400 let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else {
401 return Err(create_error());
402 };
403 let Some(schema) = get_nullable_union_inner(u) else {
404 return Err(create_error());
405 };
406 unresolved_schema = schema;
407 continue;
408 }
409 Value::Record(fields) => {
410 let Schema::Record(record_schema) =
411 self.options.inner.lookup_ref(unresolved_schema)
412 else {
413 return Err(create_error());
414 };
415 if let Some(idx) = record_schema.lookup.get(key) {
416 value = &fields[*idx].1;
417 unresolved_schema = &record_schema.fields[*idx].schema;
418 i += 1;
419 continue;
420 }
421 }
422 _ => (),
423 }
424 Err(create_error())?;
425 }
426
427 self.options
428 .inner
429 .convert_to_datum(unresolved_schema, value, type_expected)
430 }
431}
432
433pub fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
435 let variants = union_schema.variants();
436 if variants.len() == 2 && variants.contains(&Schema::Null) {
438 let inner_schema = variants
439 .iter()
440 .find(|s| !matches!(s, &&Schema::Null))
441 .unwrap();
442 Some(inner_schema)
443 } else {
444 None
445 }
446}
447
448pub(crate) fn unix_epoch_days() -> i32 {
449 Date::from_ymd_uncheck(1970, 1, 1).0.num_days_from_ce()
450}
451
452pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> AccessResult<()> {
453 match avro {
454 Value::Null => builder.add_null(),
455 Value::Boolean(b) => builder.add_bool(*b),
456 Value::Int(i) => builder.add_i64(*i as i64),
457 Value::String(s) => builder.add_string(s),
458 Value::Map(m) => {
459 builder.begin_object();
460 for (k, v) in m {
461 builder.add_string(k);
462 avro_to_jsonb(v, builder)?;
463 }
464 builder.end_object()
465 }
466 Value::Record(r) => {
468 builder.begin_object();
469 for (k, v) in r {
470 builder.add_string(k);
471 avro_to_jsonb(v, builder)?;
472 }
473 builder.end_object()
474 }
475 Value::Array(a) => {
476 builder.begin_array();
477 for v in a {
478 avro_to_jsonb(v, builder)?;
479 }
480 builder.end_array()
481 }
482
483 v @ (Value::Long(_)
517 | Value::Float(_)
518 | Value::Double(_)
519 | Value::Bytes(_)
520 | Value::Enum(_, _)
521 | Value::Fixed(_, _)
522 | Value::Date(_)
523 | Value::Decimal(_)
524 | Value::TimeMillis(_)
525 | Value::TimeMicros(_)
526 | Value::TimestampMillis(_)
527 | Value::TimestampMicros(_)
528 | Value::LocalTimestampMillis(_)
529 | Value::LocalTimestampMicros(_)
530 | Value::Duration(_)
531 | Value::Uuid(_)
532 | Value::Union(_, _)) => {
533 bail_uncategorized!(
534 "unimplemented conversion from avro to jsonb: {:?}",
535 ValueKind::from(v)
536 )
537 }
538 }
539 Ok(())
540}
541
542#[cfg(test)]
543mod tests {
544 use std::str::FromStr;
545
546 use apache_avro::{Decimal as AvroDecimal, from_avro_datum};
547 use expect_test::expect;
548 use risingwave_common::types::{Datum, Decimal};
549
550 use super::*;
551
552 #[test]
554 fn test_avro_lib_union() {
555 let s = Schema::parse_str(r#"["null", "null"]"#);
557 expect![[r#"
558 Err(
559 Unions cannot contain duplicate types,
560 )
561 "#]]
562 .assert_debug_eq(&s);
563 let s = Schema::parse_str(r#"["int", "int"]"#);
564 expect![[r#"
565 Err(
566 Unions cannot contain duplicate types,
567 )
568 "#]]
569 .assert_debug_eq(&s);
570 let s = Schema::parse_str(
572 r#"[
573"null",
574{
575 "type": "map",
576 "values" : "long",
577 "default": {}
578},
579{
580 "type": "map",
581 "values" : "int",
582 "default": {}
583}
584]
585"#,
586 );
587 expect![[r#"
588 Err(
589 Unions cannot contain duplicate types,
590 )
591 "#]]
592 .assert_debug_eq(&s);
593 let s = Schema::parse_str(
594 r#"[
595"null",
596{
597 "type": "array",
598 "items" : "long",
599 "default": {}
600},
601{
602 "type": "array",
603 "items" : "int",
604 "default": {}
605}
606]
607"#,
608 );
609 expect![[r#"
610 Err(
611 Unions cannot contain duplicate types,
612 )
613 "#]]
614 .assert_debug_eq(&s);
615 let s = Schema::parse_str(
617 r#"[
618"null",
619{"type":"fixed","name":"a","size":16},
620{"type":"fixed","name":"b","size":32}
621]
622"#,
623 );
624 expect![[r#"
625 Ok(
626 Union(
627 UnionSchema {
628 schemas: [
629 Null,
630 Fixed(
631 FixedSchema {
632 name: Name {
633 name: "a",
634 namespace: None,
635 },
636 aliases: None,
637 doc: None,
638 size: 16,
639 attributes: {},
640 },
641 ),
642 Fixed(
643 FixedSchema {
644 name: Name {
645 name: "b",
646 namespace: None,
647 },
648 aliases: None,
649 doc: None,
650 size: 32,
651 attributes: {},
652 },
653 ),
654 ],
655 variant_index: {
656 Null: 0,
657 },
658 },
659 ),
660 )
661 "#]]
662 .assert_debug_eq(&s);
663
664 let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
666 expect![[r#"
667 Err(
668 Unions may not directly contain a union,
669 )
670 "#]]
671 .assert_debug_eq(&s);
672
673 let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
675 expect![[r#"
676 Union(
677 UnionSchema {
678 schemas: [
679 Null,
680 Uuid,
681 ],
682 variant_index: {
683 Null: 0,
684 Uuid: 1,
685 },
686 },
687 )
688 "#]]
689 .assert_debug_eq(&s);
690 let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
692 expect![[r#"
693 Union(
694 UnionSchema {
695 schemas: [
696 String,
697 Uuid,
698 ],
699 variant_index: {
700 String: 0,
701 Uuid: 1,
702 },
703 },
704 )
705 "#]]
706 .assert_debug_eq(&s);
707 let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
709 expect![[r#"
710 Union(
711 UnionSchema {
712 schemas: [
713 Int,
714 Date,
715 ],
716 variant_index: {
717 Int: 0,
718 Date: 1,
719 },
720 },
721 )
722 "#]]
723 .assert_debug_eq(&s);
724 let s = Schema::parse_str(
726 r#"[
727{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
728{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
729]"#,
730 );
731 expect![[r#"
732 Err(
733 Unions cannot contain duplicate types,
734 )
735 "#]]
736 .assert_debug_eq(&s);
737 }
738
739 #[test]
740 fn test_avro_lib_union_record_bug() {
741 let s = Schema::parse_str(
743 r#"
744 {
745 "type": "record",
746 "name": "Root",
747 "fields": [
748 {
749 "name": "unionTypeComplex",
750 "type": [
751 "null",
752 {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
753 {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
754 {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
755 ]
756 }
757 ]
758 }
759 "#,
760 )
761 .unwrap();
762
763 let bytes = hex::decode("060c").unwrap();
764 let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
766 expect![[r#"
767 Ok(
768 Record(
769 [
770 (
771 "unionTypeComplex",
772 Union(
773 3,
774 Record(
775 [
776 (
777 "inner",
778 Int(
779 6,
780 ),
781 ),
782 ],
783 ),
784 ),
785 ),
786 ],
787 ),
788 )
789 "#]]
790 .assert_debug_eq(&correct_value);
791 let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
793 expect![[r#"
794 Ok(
795 Record(
796 [
797 (
798 "unionTypeComplex",
799 Union(
800 2,
801 Record(
802 [
803 (
804 "inner",
805 Int(
806 6,
807 ),
808 ),
809 ],
810 ),
811 ),
812 ),
813 ],
814 ),
815 )
816 "#]]
817 .assert_debug_eq(&wrong_value);
818
819 let s = Schema::parse_str(
828 r#"
829 {
830 "type": "record",
831 "name": "Root",
832 "fields": [
833 {
834 "name": "a",
835 "type": "int"
836 }
837 ]
838 }
839 "#,
840 )
841 .unwrap();
842 let s2 = Schema::parse_str(
843 r#"
844{
845 "type": "record",
846 "name": "Root222",
847 "fields": [
848 {
849 "name": "a",
850 "type": "int"
851 }
852 ]
853}
854 "#,
855 )
856 .unwrap();
857
858 let bytes = hex::decode("0c").unwrap();
859 let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
860 expect![[r#"
861 Ok(
862 Record(
863 [
864 (
865 "a",
866 Int(
867 6,
868 ),
869 ),
870 ],
871 ),
872 )
873 "#]]
874 .assert_debug_eq(&value);
875 }
876
877 #[test]
878 fn test_convert_decimal() {
879 let v = vec![1, 24];
881 let avro_decimal = AvroDecimal::from(v);
882 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 0).unwrap();
883 assert_eq!(rust_decimal, rust_decimal::Decimal::from(280));
884
885 let v = vec![1, 25];
887 let avro_decimal = AvroDecimal::from(v);
888 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 1).unwrap();
889 assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
890
891 let value = BigInt::from(11234567891_i64);
893 let decimal = scaled_bigint_to_rust_decimal(value, 10).unwrap();
894 assert_eq!(
895 decimal,
896 rust_decimal::Decimal::try_from(1.1234567891).unwrap()
897 );
898
899 let v = vec![3, 161, 77, 58, 146, 180, 49, 220, 100, 4, 95, 21];
901 let avro_decimal = AvroDecimal::from(v);
902 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 27).unwrap();
903 assert_eq!(
904 rust_decimal,
905 rust_decimal::Decimal::from_str("1.123456789123456789123456789").unwrap()
906 );
907 }
908
909 fn from_avro_value(
919 value: Value,
920 value_schema: &Schema,
921 shape: &DataType,
922 ) -> anyhow::Result<Datum> {
923 Ok(AvroParseOptions::create(value_schema)
924 .inner
925 .convert_to_datum(value_schema, &value, shape)?
926 .to_owned_datum())
927 }
928
929 #[test]
930 fn test_avro_timestamptz_micros() {
931 let v1 = Value::TimestampMicros(1620000000000000);
932 let v2 = Value::TimestampMillis(1620000000000);
933 let value_schema1 = Schema::TimestampMicros;
934 let value_schema2 = Schema::TimestampMillis;
935 let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
936 let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
937 assert_eq!(
938 datum1,
939 Some(ScalarImpl::Timestamptz(
940 Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
941 ))
942 );
943 assert_eq!(
944 datum2,
945 Some(ScalarImpl::Timestamptz(
946 Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
947 ))
948 );
949 }
950
951 #[test]
952 fn test_decimal_truncate() {
953 let schema = Schema::parse_str(
954 r#"
955 {
956 "type": "bytes",
957 "logicalType": "decimal",
958 "precision": 38,
959 "scale": 18
960 }
961 "#,
962 )
963 .unwrap();
964 let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
965 let value = Value::Decimal(AvroDecimal::from(bytes));
966 let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
967 assert_eq!(
968 resp,
969 Some(ScalarImpl::Decimal(Decimal::Normalized(
970 rust_decimal::Decimal::from_str("0.017802464409370431").unwrap()
971 )))
972 );
973 }
974
975 #[test]
976 fn test_variable_scale_decimal() {
977 let schema = Schema::parse_str(
978 r#"
979 {
980 "type": "record",
981 "name": "VariableScaleDecimal",
982 "namespace": "io.debezium.data",
983 "fields": [
984 {
985 "name": "scale",
986 "type": "int"
987 },
988 {
989 "name": "value",
990 "type": "bytes"
991 }
992 ]
993 }
994 "#,
995 )
996 .unwrap();
997 let value = Value::Record(vec![
998 ("scale".to_owned(), Value::Int(0)),
999 ("value".to_owned(), Value::Bytes(vec![0x01, 0x02, 0x03])),
1000 ]);
1001
1002 let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
1003 assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
1004 }
1005}