1mod schema;
16
17use apache_avro::Schema;
18use apache_avro::schema::{DecimalSchema, NamesRef, UnionSchema};
19use apache_avro::types::{Value, ValueKind};
20use chrono::Datelike;
21use itertools::Itertools;
22use num_bigint::BigInt;
23use risingwave_common::array::{ListValue, StructValue};
24use risingwave_common::types::{
25 DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
26 Timestamptz, ToOwnedDatum,
27};
28
29pub use self::schema::{MapHandling, ResolvedAvroSchema, avro_schema_to_fields};
30use super::utils::scaled_bigint_to_rust_decimal;
31use super::{Access, AccessError, AccessResult, bail_uncategorized, uncategorized};
32use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
33
34#[derive(Clone)]
35pub struct AvroParseOptions<'a> {
37 root_schema: &'a Schema,
39 inner: AvroParseOptionsInner<'a>,
41}
42
43#[derive(Clone)]
44struct AvroParseOptionsInner<'a> {
46 refs: NamesRef<'a>,
48 relax_numeric: bool,
51}
52
53impl<'a> AvroParseOptions<'a> {
54 pub fn create(root_schema: &'a Schema) -> Self {
55 let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema)
56 .expect("avro schema is self contained");
57 Self {
58 root_schema,
59 inner: AvroParseOptionsInner {
60 refs: resolved.get_names().clone(),
61 relax_numeric: true,
62 },
63 }
64 }
65}
66
67impl<'a> AvroParseOptionsInner<'a> {
68 fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema {
69 match schema {
70 Schema::Ref { name } => self.refs[name],
71 _ => schema,
72 }
73 }
74
75 fn convert_to_datum<'b>(
89 &self,
90 unresolved_schema: &'a Schema,
91 value: &'b Value,
92 type_expected: &DataType,
93 ) -> AccessResult<DatumCow<'b>>
94 where
95 'b: 'a,
96 {
97 let create_error = || AccessError::TypeError {
98 expected: format!("{:?}", type_expected),
99 got: format!("{:?}", value),
100 value: String::new(),
101 };
102
103 macro_rules! borrowed {
104 ($v:expr) => {
105 return Ok(DatumCow::Borrowed(Some($v.into())))
106 };
107 }
108
109 let v: ScalarImpl = match (type_expected, value) {
110 (_, Value::Null) => return Ok(DatumCow::NULL),
111 (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
113 let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
114 return Err(create_error());
116 };
117
118 if let Some(inner) = get_nullable_union_inner(u) {
119 return self.convert_to_datum(inner, v, type_expected);
121 }
122 let variant_schema = &u.variants()[*variant as usize];
123
124 if matches!(variant_schema, &Schema::Null) {
125 return Ok(DatumCow::NULL);
126 }
127
128 let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
133
134 let mut fields = Vec::with_capacity(struct_type_info.len());
135 for (field_name, field_type) in struct_type_info.iter() {
136 if field_name == expected_field_name {
137 let datum = self
138 .convert_to_datum(variant_schema, v, field_type)?
139 .to_owned_datum();
140
141 fields.push(datum)
142 } else {
143 fields.push(None)
144 }
145 }
146 StructValue::new(fields).into()
147 }
148 (_, Value::Union(_, v)) => {
150 let Schema::Union(u) = self.lookup_ref(unresolved_schema) else {
151 return Err(create_error());
152 };
153 let Some(schema) = get_nullable_union_inner(u) else {
154 return Err(create_error());
155 };
156 return self.convert_to_datum(schema, v, type_expected);
157 }
158 (DataType::Boolean, Value::Boolean(b)) => (*b).into(),
160 (DataType::Int16, Value::Int(i)) if self.relax_numeric => (*i as i16).into(),
162 (DataType::Int16, Value::Long(i)) if self.relax_numeric => (*i as i16).into(),
163
164 (DataType::Int32, Value::Int(i)) => (*i).into(),
166 (DataType::Int32, Value::Long(i)) if self.relax_numeric => (*i as i32).into(),
167 (DataType::Int64, Value::Long(i)) => (*i).into(),
169 (DataType::Int64, Value::Int(i)) if self.relax_numeric => (*i as i64).into(),
170 (DataType::Float32, Value::Float(i)) => (*i).into(),
172 (DataType::Float32, Value::Double(i)) => (*i as f32).into(),
173 (DataType::Float64, Value::Double(i)) => (*i).into(),
175 (DataType::Float64, Value::Float(i)) => (*i as f64).into(),
176 (DataType::Decimal, Value::Decimal(avro_decimal)) => {
178 let (_precision, scale) = match self.lookup_ref(unresolved_schema) {
179 Schema::Decimal(DecimalSchema {
180 precision, scale, ..
181 }) => (*precision, *scale),
182 _ => Err(create_error())?,
183 };
184 let decimal = scaled_bigint_to_rust_decimal(avro_decimal.clone().into(), scale)
185 .map_err(|_| create_error())?;
186 ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
187 }
188 (DataType::Decimal, Value::Record(fields)) => {
189 let find_in_records = |field_name: &str| {
191 fields
192 .iter()
193 .find(|field| field.0 == field_name)
194 .map(|field| &field.1)
195 .ok_or_else(|| {
196 uncategorized!("`{field_name}` field not found in VariableScaleDecimal")
197 })
198 };
199 let scale = match find_in_records("scale")? {
200 Value::Int(scale) => *scale,
201 avro_value => bail_uncategorized!(
202 "scale field in VariableScaleDecimal is not int, got {:?}",
203 avro_value
204 ),
205 };
206
207 let value: BigInt = match find_in_records("value")? {
208 Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes),
209 avro_value => bail_uncategorized!(
210 "value field in VariableScaleDecimal is not bytes, got {:?}",
211 avro_value
212 ),
213 };
214
215 let decimal = scaled_bigint_to_rust_decimal(value, scale as _)?;
216 ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
217 }
218 (DataType::Time, Value::TimeMillis(ms)) => Time::with_milli(*ms as u32)
220 .map_err(|_| create_error())?
221 .into(),
222 (DataType::Time, Value::TimeMicros(us)) => Time::with_micro(*us as u64)
223 .map_err(|_| create_error())?
224 .into(),
225 (DataType::Date, Value::Date(days)) => {
227 Date::with_days_since_ce(days + unix_epoch_days())
228 .map_err(|_| create_error())?
229 .into()
230 }
231 (DataType::Varchar, Value::Enum(_, symbol)) => borrowed!(symbol.as_str()),
233 (DataType::Varchar, Value::String(s)) => borrowed!(s.as_str()),
234 (DataType::Timestamp, Value::LocalTimestampMillis(ms)) => Timestamp::with_millis(*ms)
236 .map_err(|_| create_error())?
237 .into(),
238 (DataType::Timestamp, Value::LocalTimestampMicros(us)) => Timestamp::with_micros(*us)
239 .map_err(|_| create_error())?
240 .into(),
241
242 (DataType::Timestamptz, Value::TimestampMillis(ms)) => Timestamptz::from_millis(*ms)
244 .ok_or_else(|| {
245 uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range")
246 })?
247 .into(),
248 (DataType::Timestamptz, Value::TimestampMicros(us)) => {
249 Timestamptz::from_micros(*us).into()
250 }
251
252 (DataType::Interval, Value::Duration(duration)) => {
254 let months = u32::from(duration.months()) as i32;
255 let days = u32::from(duration.days()) as i32;
256 let usecs = (u32::from(duration.millis()) as i64) * 1000; ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
258 }
259 (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({
261 let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else {
262 return Err(create_error());
263 };
264 struct_type_info
265 .iter()
266 .map(|(field_name, field_type)| {
267 if let Some(idx) = record_schema.lookup.get(field_name) {
268 let value = &descs[*idx].1;
269 let schema = &record_schema.fields[*idx].schema;
270 Ok(self
271 .convert_to_datum(schema, value, field_type)?
272 .to_owned_datum())
273 } else {
274 Ok(None)
275 }
276 })
277 .collect::<Result<_, AccessError>>()?
278 })
279 .into(),
280 (DataType::List(list_type), Value::Array(array)) => ListValue::new({
282 let Schema::Array(array_schema) = self.lookup_ref(unresolved_schema) else {
283 return Err(create_error());
284 };
285 let schema = &array_schema.items;
286 let elem_type = list_type.elem();
287 let mut builder = elem_type.create_array_builder(array.len());
288 for v in array {
289 let value = self.convert_to_datum(schema, v, elem_type)?;
290 builder.append(value);
291 }
292 builder.finish()
293 })
294 .into(),
295 (DataType::Bytea, Value::Bytes(value)) => borrowed!(value.as_slice()),
297 (DataType::Jsonb, v @ Value::Map(_)) => {
299 let mut builder = jsonbb::Builder::default();
300 avro_to_jsonb(v, &mut builder)?;
301 let jsonb = builder.finish();
302 debug_assert!(jsonb.as_ref().is_object());
303 JsonbVal::from(jsonb).into()
304 }
305 (DataType::Varchar, Value::Uuid(uuid)) => {
306 uuid.as_hyphenated().to_string().into_boxed_str().into()
307 }
308 (DataType::Map(map_type), Value::Map(map)) => {
309 let Schema::Map(map_schema) = self.lookup_ref(unresolved_schema) else {
310 return Err(create_error());
311 };
312 let schema = &map_schema.types;
313 let mut builder = map_type
314 .clone()
315 .into_struct()
316 .create_array_builder(map.len());
317 for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) {
325 let value_datum = self
326 .convert_to_datum(schema, v, map_type.value())?
327 .to_owned_datum();
328 builder.append(
329 StructValue::new(vec![Some(k.as_str().into()), value_datum])
330 .to_owned_datum(),
331 );
332 }
333 let list = ListValue::new(builder.finish());
334 MapValue::from_entries(list).into()
335 }
336
337 (_expected, _got) => Err(create_error())?,
338 };
339 Ok(DatumCow::Owned(Some(v)))
340 }
341}
342
343pub struct AvroAccess<'a> {
344 value: &'a Value,
345 options: AvroParseOptions<'a>,
346}
347
348impl<'a> AvroAccess<'a> {
349 pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self {
350 Self {
351 value: root_value,
352 options,
353 }
354 }
355}
356
357impl Access for AvroAccess<'_> {
358 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
359 let mut value = self.value;
360 let mut unresolved_schema = self.options.root_schema;
361
362 debug_assert!(
363 path.len() == 1
364 || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
365 "unexpected path access: {:?}",
366 path
367 );
368 let mut i = 0;
369 while i < path.len() {
370 let key = path[i];
371 let create_error = || AccessError::Undefined {
372 name: key.to_owned(),
373 path: path.iter().take(i).join("."),
374 };
375 match value {
376 Value::Union(_, v) => {
377 value = v;
401 let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else {
402 return Err(create_error());
403 };
404 let Some(schema) = get_nullable_union_inner(u) else {
405 return Err(create_error());
406 };
407 unresolved_schema = schema;
408 continue;
409 }
410 Value::Record(fields) => {
411 let Schema::Record(record_schema) =
412 self.options.inner.lookup_ref(unresolved_schema)
413 else {
414 return Err(create_error());
415 };
416 if let Some(idx) = record_schema.lookup.get(key) {
417 value = &fields[*idx].1;
418 unresolved_schema = &record_schema.fields[*idx].schema;
419 i += 1;
420 continue;
421 }
422 }
423 _ => (),
424 }
425 Err(create_error())?;
426 }
427
428 self.options
429 .inner
430 .convert_to_datum(unresolved_schema, value, type_expected)
431 }
432}
433
434pub fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
436 let variants = union_schema.variants();
437 if variants.len() == 2 && variants.contains(&Schema::Null) {
439 let inner_schema = variants
440 .iter()
441 .find(|s| !matches!(s, &&Schema::Null))
442 .unwrap();
443 Some(inner_schema)
444 } else {
445 None
446 }
447}
448
449pub(crate) fn unix_epoch_days() -> i32 {
450 Date::from_ymd_uncheck(1970, 1, 1).0.num_days_from_ce()
451}
452
453pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> AccessResult<()> {
454 match avro {
455 Value::Null => builder.add_null(),
456 Value::Boolean(b) => builder.add_bool(*b),
457 Value::Int(i) => builder.add_i64(*i as i64),
458 Value::String(s) => builder.add_string(s),
459 Value::Map(m) => {
460 builder.begin_object();
461 for (k, v) in m {
462 builder.add_string(k);
463 avro_to_jsonb(v, builder)?;
464 }
465 builder.end_object()
466 }
467 Value::Record(r) => {
469 builder.begin_object();
470 for (k, v) in r {
471 builder.add_string(k);
472 avro_to_jsonb(v, builder)?;
473 }
474 builder.end_object()
475 }
476 Value::Array(a) => {
477 builder.begin_array();
478 for v in a {
479 avro_to_jsonb(v, builder)?;
480 }
481 builder.end_array()
482 }
483
484 v @ (Value::Long(_)
518 | Value::Float(_)
519 | Value::Double(_)
520 | Value::Bytes(_)
521 | Value::Enum(_, _)
522 | Value::Fixed(_, _)
523 | Value::Date(_)
524 | Value::Decimal(_)
525 | Value::BigDecimal(_)
526 | Value::TimeMillis(_)
527 | Value::TimeMicros(_)
528 | Value::TimestampMillis(_)
529 | Value::TimestampMicros(_)
530 | Value::TimestampNanos(_)
531 | Value::LocalTimestampMillis(_)
532 | Value::LocalTimestampMicros(_)
533 | Value::LocalTimestampNanos(_)
534 | Value::Duration(_)
535 | Value::Uuid(_)
536 | Value::Union(_, _)) => {
537 bail_uncategorized!(
538 "unimplemented conversion from avro to jsonb: {:?}",
539 ValueKind::from(v)
540 )
541 }
542 }
543 Ok(())
544}
545
546#[cfg(test)]
547mod tests {
548 use std::str::FromStr;
549
550 use apache_avro::{Decimal as AvroDecimal, from_avro_datum};
551 use expect_test::expect;
552 use risingwave_common::types::{Datum, Decimal};
553
554 use super::*;
555
556 #[test]
558 fn test_avro_lib_union() {
559 let s = Schema::parse_str(r#"["null", "null"]"#);
561 expect![[r#"
562 Err(
563 Error {
564 details: Unions cannot contain duplicate types,
565 },
566 )
567 "#]]
568 .assert_debug_eq(&s);
569 let s = Schema::parse_str(r#"["int", "int"]"#);
570 expect![[r#"
571 Err(
572 Error {
573 details: Unions cannot contain duplicate types,
574 },
575 )
576 "#]]
577 .assert_debug_eq(&s);
578 let s = Schema::parse_str(
580 r#"[
581"null",
582{
583 "type": "map",
584 "values" : "long",
585 "default": {}
586},
587{
588 "type": "map",
589 "values" : "int",
590 "default": {}
591}
592]
593"#,
594 );
595 expect![[r#"
596 Err(
597 Error {
598 details: Unions cannot contain duplicate types,
599 },
600 )
601 "#]]
602 .assert_debug_eq(&s);
603 let s = Schema::parse_str(
604 r#"[
605"null",
606{
607 "type": "array",
608 "items" : "long",
609 "default": {}
610},
611{
612 "type": "array",
613 "items" : "int",
614 "default": {}
615}
616]
617"#,
618 );
619 expect![[r#"
620 Err(
621 Error {
622 details: Unions cannot contain duplicate types,
623 },
624 )
625 "#]]
626 .assert_debug_eq(&s);
627 let s = Schema::parse_str(
629 r#"[
630"null",
631{"type":"fixed","name":"a","size":16},
632{"type":"fixed","name":"b","size":32}
633]
634"#,
635 );
636 expect![[r#"
637 Ok(
638 Union(
639 UnionSchema {
640 schemas: [
641 Null,
642 Fixed(
643 FixedSchema {
644 name: Name {
645 name: "a",
646 namespace: None,
647 },
648 aliases: None,
649 doc: None,
650 size: 16,
651 default: None,
652 attributes: {},
653 },
654 ),
655 Fixed(
656 FixedSchema {
657 name: Name {
658 name: "b",
659 namespace: None,
660 },
661 aliases: None,
662 doc: None,
663 size: 32,
664 default: None,
665 attributes: {},
666 },
667 ),
668 ],
669 variant_index: {
670 Null: 0,
671 },
672 },
673 ),
674 )
675 "#]]
676 .assert_debug_eq(&s);
677
678 let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
680 expect![[r#"
681 Err(
682 Error {
683 details: Unions may not directly contain a union,
684 },
685 )
686 "#]]
687 .assert_debug_eq(&s);
688
689 let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
691 expect![[r#"
692 Union(
693 UnionSchema {
694 schemas: [
695 Null,
696 Uuid,
697 ],
698 variant_index: {
699 Null: 0,
700 Uuid: 1,
701 },
702 },
703 )
704 "#]]
705 .assert_debug_eq(&s);
706 let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
708 expect![[r#"
709 Union(
710 UnionSchema {
711 schemas: [
712 String,
713 Uuid,
714 ],
715 variant_index: {
716 String: 0,
717 Uuid: 1,
718 },
719 },
720 )
721 "#]]
722 .assert_debug_eq(&s);
723 let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
725 expect![[r#"
726 Union(
727 UnionSchema {
728 schemas: [
729 Int,
730 Date,
731 ],
732 variant_index: {
733 Int: 0,
734 Date: 1,
735 },
736 },
737 )
738 "#]]
739 .assert_debug_eq(&s);
740 let s = Schema::parse_str(
742 r#"[
743{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
744{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
745]"#,
746 );
747 expect![[r#"
748 Err(
749 Error {
750 details: Unions cannot contain duplicate types,
751 },
752 )
753 "#]]
754 .assert_debug_eq(&s);
755 }
756
757 #[test]
758 fn test_avro_lib_union_record_bug() {
759 let s = Schema::parse_str(
761 r#"
762 {
763 "type": "record",
764 "name": "Root",
765 "fields": [
766 {
767 "name": "unionTypeComplex",
768 "type": [
769 "null",
770 {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
771 {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
772 {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
773 ]
774 }
775 ]
776 }
777 "#,
778 )
779 .unwrap();
780
781 let bytes = hex::decode("060c").unwrap();
782 let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
784 expect![[r#"
785 Ok(
786 Record(
787 [
788 (
789 "unionTypeComplex",
790 Union(
791 3,
792 Record(
793 [
794 (
795 "inner",
796 Int(
797 6,
798 ),
799 ),
800 ],
801 ),
802 ),
803 ),
804 ],
805 ),
806 )
807 "#]]
808 .assert_debug_eq(&correct_value);
809 let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
811 expect![[r#"
812 Ok(
813 Record(
814 [
815 (
816 "unionTypeComplex",
817 Union(
818 2,
819 Record(
820 [
821 (
822 "inner",
823 Int(
824 6,
825 ),
826 ),
827 ],
828 ),
829 ),
830 ),
831 ],
832 ),
833 )
834 "#]]
835 .assert_debug_eq(&wrong_value);
836
837 let s = Schema::parse_str(
846 r#"
847 {
848 "type": "record",
849 "name": "Root",
850 "fields": [
851 {
852 "name": "a",
853 "type": "int"
854 }
855 ]
856 }
857 "#,
858 )
859 .unwrap();
860 let s2 = Schema::parse_str(
861 r#"
862{
863 "type": "record",
864 "name": "Root222",
865 "fields": [
866 {
867 "name": "a",
868 "type": "int"
869 }
870 ]
871}
872 "#,
873 )
874 .unwrap();
875
876 let bytes = hex::decode("0c").unwrap();
877 let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
878 expect![[r#"
879 Ok(
880 Record(
881 [
882 (
883 "a",
884 Int(
885 6,
886 ),
887 ),
888 ],
889 ),
890 )
891 "#]]
892 .assert_debug_eq(&value);
893 }
894
895 #[test]
896 fn test_convert_decimal() {
897 let v = vec![1, 24];
899 let avro_decimal = AvroDecimal::from(v);
900 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 0).unwrap();
901 assert_eq!(rust_decimal, rust_decimal::Decimal::from(280));
902
903 let v = vec![1, 25];
905 let avro_decimal = AvroDecimal::from(v);
906 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 1).unwrap();
907 assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
908
909 let value = BigInt::from(11234567891_i64);
911 let decimal = scaled_bigint_to_rust_decimal(value, 10).unwrap();
912 assert_eq!(
913 decimal,
914 rust_decimal::Decimal::try_from(1.1234567891).unwrap()
915 );
916
917 let v = vec![3, 161, 77, 58, 146, 180, 49, 220, 100, 4, 95, 21];
919 let avro_decimal = AvroDecimal::from(v);
920 let rust_decimal = scaled_bigint_to_rust_decimal(avro_decimal.into(), 27).unwrap();
921 assert_eq!(
922 rust_decimal,
923 rust_decimal::Decimal::from_str("1.123456789123456789123456789").unwrap()
924 );
925 }
926
927 fn from_avro_value(
937 value: Value,
938 value_schema: &Schema,
939 shape: &DataType,
940 ) -> anyhow::Result<Datum> {
941 Ok(AvroParseOptions::create(value_schema)
942 .inner
943 .convert_to_datum(value_schema, &value, shape)?
944 .to_owned_datum())
945 }
946
947 #[test]
948 fn test_avro_timestamptz_micros() {
949 let v1 = Value::TimestampMicros(1620000000000000);
950 let v2 = Value::TimestampMillis(1620000000000);
951 let value_schema1 = Schema::TimestampMicros;
952 let value_schema2 = Schema::TimestampMillis;
953 let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
954 let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
955 assert_eq!(
956 datum1,
957 Some(ScalarImpl::Timestamptz(
958 Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
959 ))
960 );
961 assert_eq!(
962 datum2,
963 Some(ScalarImpl::Timestamptz(
964 Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
965 ))
966 );
967 }
968
969 #[test]
970 fn test_decimal_truncate() {
971 let schema = Schema::parse_str(
972 r#"
973 {
974 "type": "bytes",
975 "logicalType": "decimal",
976 "precision": 38,
977 "scale": 18
978 }
979 "#,
980 )
981 .unwrap();
982 let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
983 let value = Value::Decimal(AvroDecimal::from(bytes));
984 let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
985 assert_eq!(
986 resp,
987 Some(ScalarImpl::Decimal(Decimal::Normalized(
988 rust_decimal::Decimal::from_str("0.017802464409370431").unwrap()
989 )))
990 );
991 }
992
993 #[test]
994 fn test_variable_scale_decimal() {
995 let schema = Schema::parse_str(
996 r#"
997 {
998 "type": "record",
999 "name": "VariableScaleDecimal",
1000 "namespace": "io.debezium.data",
1001 "fields": [
1002 {
1003 "name": "scale",
1004 "type": "int"
1005 },
1006 {
1007 "name": "value",
1008 "type": "bytes"
1009 }
1010 ]
1011 }
1012 "#,
1013 )
1014 .unwrap();
1015 let value = Value::Record(vec![
1016 ("scale".to_owned(), Value::Int(0)),
1017 ("value".to_owned(), Value::Bytes(vec![0x01, 0x02, 0x03])),
1018 ]);
1019
1020 let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap();
1021 assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
1022 }
1023}