1use std::collections::HashMap;
16use std::sync::Arc;
17
18use apache_avro::schema::{Name, RecordSchema, Schema as AvroSchema};
19use apache_avro::types::{Record, Value};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType};
23use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
24use risingwave_connector_codec::decoder::utils::rust_decimal_to_scaled_bigint;
25use thiserror_ext::AsReport;
26
27use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
28
29type Result<T> = std::result::Result<T, FieldEncodeError>;
30struct NamesRef(HashMap<Name, AvroSchema>);
31
32pub struct AvroEncoder {
33 schema: Schema,
34 col_indices: Option<Vec<usize>>,
35 avro_schema: Arc<AvroSchema>,
36 refs: NamesRef,
37 header: AvroHeader,
38}
39
40#[derive(Debug, Clone, Copy)]
41pub enum AvroHeader {
42 None,
43 SingleObject,
48 ContainerFile,
54 ConfluentSchemaRegistry(i32),
59 GlueSchemaRegistry(uuid::Uuid),
65}
66
67impl AvroEncoder {
68 pub fn new(
69 schema: Schema,
70 col_indices: Option<Vec<usize>>,
71 avro_schema: Arc<AvroSchema>,
72 header: AvroHeader,
73 ) -> SinkResult<Self> {
74 let refs = NamesRef::new(&avro_schema)?;
75 match &col_indices {
76 Some(col_indices) => validate_fields(
77 col_indices.iter().map(|idx| {
78 let f = &schema[*idx];
79 (f.name.as_str(), &f.data_type)
80 }),
81 &avro_schema,
82 &refs,
83 )?,
84 None => validate_fields(
85 schema
86 .fields
87 .iter()
88 .map(|f| (f.name.as_str(), &f.data_type)),
89 &avro_schema,
90 &refs,
91 )?,
92 };
93
94 Ok(Self {
95 schema,
96 col_indices,
97 avro_schema,
98 refs,
99 header,
100 })
101 }
102}
103
104impl NamesRef {
105 fn new(root: &AvroSchema) -> std::result::Result<Self, apache_avro::Error> {
106 let resolved = apache_avro::schema::ResolvedSchema::try_from(root)?;
107 let refs = resolved
108 .get_names()
109 .iter()
110 .map(|(k, v)| (k.to_owned(), (*v).to_owned()))
111 .collect();
112 Ok(Self(refs))
113 }
114
115 fn lookup<'a>(&'a self, avro: &'a AvroSchema) -> &'a AvroSchema {
116 match avro {
117 AvroSchema::Ref { name } => &self.0[name],
118 _ => avro,
119 }
120 }
121}
122
123pub struct AvroEncoded {
124 value: Value,
125 schema: Arc<AvroSchema>,
126 header: AvroHeader,
127}
128
129impl RowEncoder for AvroEncoder {
130 type Output = AvroEncoded;
131
132 fn schema(&self) -> &Schema {
133 &self.schema
134 }
135
136 fn col_indices(&self) -> Option<&[usize]> {
137 self.col_indices.as_deref()
138 }
139
140 fn encode_cols(
141 &self,
142 row: impl Row,
143 col_indices: impl Iterator<Item = usize>,
144 ) -> SinkResult<Self::Output> {
145 let record = encode_fields(
146 col_indices.map(|idx| {
147 let f = &self.schema[idx];
148 ((f.name.as_str(), &f.data_type), row.datum_at(idx))
149 }),
150 &self.avro_schema,
151 &self.refs,
152 )?;
153 Ok(AvroEncoded {
154 value: record.into(),
155 schema: self.avro_schema.clone(),
156 header: self.header,
157 })
158 }
159}
160
161impl SerTo<Vec<u8>> for AvroEncoded {
162 fn ser_to(self) -> SinkResult<Vec<u8>> {
163 use bytes::BufMut as _;
164
165 let header = match self.header {
166 AvroHeader::ConfluentSchemaRegistry(schema_id) => {
167 let mut buf = Vec::with_capacity(1 + 4);
168 buf.put_u8(0);
169 buf.put_i32(schema_id);
170 buf
171 }
172 AvroHeader::GlueSchemaRegistry(schema_version_id) => {
173 let mut buf = Vec::with_capacity(1 + 1 + 16);
174 buf.put_u8(3);
175 buf.put_u8(0);
176 buf.put_slice(schema_version_id.as_bytes());
177 buf
178 }
179 AvroHeader::None | AvroHeader::SingleObject | AvroHeader::ContainerFile => {
180 return Err(crate::sink::SinkError::Encode(format!(
181 "{:?} unsupported yet",
182 self.header
183 )));
184 }
185 };
186
187 let raw = apache_avro::to_avro_datum(&self.schema, self.value)
188 .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
189 let mut buf = Vec::with_capacity(header.len() + raw.len());
190 buf.put_slice(&header);
191 buf.put_slice(&raw);
192
193 Ok(buf)
194 }
195}
196
197enum OptIdx {
198 NotUnion,
200 Single,
202 NullLeft,
204 NullRight,
206}
207
208trait MaybeData: std::fmt::Debug {
214 type Out;
215
216 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
217
218 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
220
221 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
222
223 fn on_map(
224 self,
225 value_type: &DataType,
226 avro_value_schema: &AvroSchema,
227 refs: &NamesRef,
228 ) -> Result<Self::Out>;
229
230 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
231}
232
233impl MaybeData for () {
234 type Out = ();
235
236 fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
237 Ok(self)
238 }
239
240 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
241 validate_fields(st.iter(), avro, refs)
242 }
243
244 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
245 on_field(elem, (), avro, refs)
246 }
247
248 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
249 on_field(elem, (), avro, refs)
250 }
251
252 fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
253 Ok(out)
254 }
255}
256
257impl MaybeData for DatumRef<'_> {
258 type Out = Value;
259
260 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
261 match self {
262 Some(s) => f(s),
263 None => Ok(Value::Null),
264 }
265 }
266
267 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
268 let d = match self {
269 Some(s) => s.into_struct(),
270 None => return Ok(Value::Null),
271 };
272 let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro, refs)?;
273 Ok(record.into())
274 }
275
276 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
277 let d = match self {
278 Some(s) => s.into_list(),
279 None => return Ok(Value::Null),
280 };
281 let vs = d
282 .iter()
283 .map(|d| on_field(elem, d, avro, refs))
284 .try_collect()?;
285 Ok(Value::Array(vs))
286 }
287
288 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
289 let d = match self {
290 Some(s) => s.into_map(),
291 None => return Ok(Value::Null),
292 };
293 let vs = d
294 .iter()
295 .map(|(k, v)| {
296 let k = k.into_utf8().to_owned();
297 let v = on_field(elem, v, avro, refs)?;
298 Ok((k, v))
299 })
300 .try_collect()?;
301 Ok(Value::Map(vs))
302 }
303
304 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
305 use OptIdx::*;
306
307 match out == Value::Null {
308 true => {
309 let ni = match opt_idx {
310 NotUnion | Single => {
311 return Err(FieldEncodeError::new("found null but required"));
312 }
313 NullLeft => 0,
314 NullRight => 1,
315 };
316 Ok(Value::Union(ni, out.into()))
317 }
318 false => {
319 let vi = match opt_idx {
320 NotUnion => return Ok(out),
321 NullLeft => 1,
322 Single | NullRight => 0,
323 };
324 Ok(Value::Union(vi, out.into()))
325 }
326 }
327 }
328}
329
330fn validate_fields<'rw>(
331 rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
332 avro: &AvroSchema,
333 refs: &NamesRef,
334) -> Result<()> {
335 let avro = refs.lookup(avro);
336 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
337 return Err(FieldEncodeError::new(format!(
338 "expect avro record but got {}",
339 avro.canonical_form(),
340 )));
341 };
342 let mut present = vec![false; fields.len()];
343 for (name, t) in rw_fields {
344 let Some(&idx) = lookup.get(name) else {
345 return Err(FieldEncodeError::new("field not in avro").with_name(name));
346 };
347 present[idx] = true;
348 let avro_field = &fields[idx];
349 on_field(t, (), &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
350 }
351 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
352 if p {
353 continue;
354 }
355 if !avro_field.is_nullable() {
356 return Err(
357 FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
358 );
359 }
360 }
361 Ok(())
362}
363
364fn encode_fields<'avro, 'rw>(
365 fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
366 schema: &'avro AvroSchema,
367 refs: &'avro NamesRef,
368) -> Result<Record<'avro>> {
369 let schema = refs.lookup(schema);
370 let mut record = Record::new(schema).unwrap();
371 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
372 unreachable!()
373 };
374 let mut present = vec![false; fields.len()];
375 for ((name, t), d) in fields_with_datums {
376 let idx = lookup[name];
377 present[idx] = true;
378 let avro_field = &fields[idx];
379 let value = on_field(t, d, &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
380 record.put(name, value);
381 }
382 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
386 if p {
387 continue;
388 }
389 let AvroSchema::Union(u) = &avro_field.schema else {
390 unreachable!()
391 };
392 let ni = u
396 .variants()
397 .iter()
398 .position(|a| a == &AvroSchema::Null)
399 .unwrap();
400 record.put(
401 &avro_field.name,
402 Value::Union(ni.try_into().unwrap(), Value::Null.into()),
403 );
404 }
405 Ok(record)
406}
407
408fn on_field<D: MaybeData>(
411 data_type: &DataType,
412 maybe: D,
413 expected: &AvroSchema,
414 refs: &NamesRef,
415) -> Result<D::Out> {
416 use risingwave_common::types::Interval;
417
418 let no_match_err = || {
419 Err(FieldEncodeError::new(format!(
420 "cannot encode {} column as {} field",
421 data_type,
422 expected.canonical_form()
423 )))
424 };
425
426 let (inner, opt_idx) = match expected {
429 AvroSchema::Union(union) => match union.variants() {
430 [] => return no_match_err(),
431 [one] => (one, OptIdx::Single),
432 [AvroSchema::Null, r] => (r, OptIdx::NullLeft),
433 [l, AvroSchema::Null] => (l, OptIdx::NullRight),
434 _ => return no_match_err(),
435 },
436 _ => (expected, OptIdx::NotUnion),
437 };
438
439 let inner = refs.lookup(inner);
440
441 let value = match &data_type {
442 DataType::Boolean => match inner {
444 AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
445 _ => return no_match_err(),
446 },
447 DataType::Varchar => match inner {
448 AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
449 _ => return no_match_err(),
450 },
451 DataType::Bytea => match inner {
452 AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
453 _ => return no_match_err(),
454 },
455 DataType::Float32 => match inner {
456 AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
457 _ => return no_match_err(),
458 },
459 DataType::Float64 => match inner {
460 AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
461 _ => return no_match_err(),
462 },
463 DataType::Int32 => match inner {
464 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
465 _ => return no_match_err(),
466 },
467 DataType::Int64 => match inner {
468 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
469 _ => return no_match_err(),
470 },
471 DataType::Serial => match inner {
472 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
473 _ => return no_match_err(),
474 },
475 DataType::Struct(st) => match inner {
476 AvroSchema::Record { .. } => maybe.on_struct(st, inner, refs)?,
477 _ => return no_match_err(),
478 },
479 DataType::List(elem) => match inner {
480 AvroSchema::Array(avro_elem) => maybe.on_list(elem, avro_elem, refs)?,
481 _ => return no_match_err(),
482 },
483 DataType::Map(m) => {
484 if *m.key() != DataType::Varchar {
485 return no_match_err();
486 }
487 match inner {
488 AvroSchema::Map(avro_value_type) => {
489 maybe.on_map(m.value(), avro_value_type, refs)?
490 }
491 _ => return no_match_err(),
492 }
493 }
494
495 DataType::Timestamptz => match inner {
497 AvroSchema::TimestampMicros => maybe.on_base(|s| {
498 Ok(Value::TimestampMicros(
499 s.into_timestamptz().timestamp_micros(),
500 ))
501 })?,
502 AvroSchema::TimestampMillis => maybe.on_base(|s| {
503 Ok(Value::TimestampMillis(
504 s.into_timestamptz().timestamp_millis(),
505 ))
506 })?,
507 _ => return no_match_err(),
508 },
509 DataType::Timestamp => return no_match_err(),
510 DataType::Date => match inner {
511 AvroSchema::Date => {
512 maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
513 }
514 _ => return no_match_err(),
515 },
516 DataType::Time => match inner {
517 AvroSchema::TimeMicros => {
518 maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
519 }
520 AvroSchema::TimeMillis => maybe.on_base(|s| {
521 Ok(Value::TimeMillis(
522 (Interval::from(s.into_time()).usecs() / 1000)
523 .try_into()
524 .unwrap(),
525 ))
526 })?,
527 _ => return no_match_err(),
528 },
529 DataType::Interval => match inner {
530 AvroSchema::Duration => maybe.on_base(|s| {
531 use apache_avro::{Days, Duration, Millis, Months};
532 let iv = s.into_interval();
533
534 let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
535
536 Ok(Value::Duration(Duration::new(
537 Months::new(iv.months().try_into().map_err(overflow)?),
538 Days::new(iv.days().try_into().map_err(overflow)?),
539 Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
540 )))
541 })?,
542 _ => return no_match_err(),
543 },
544 DataType::Int16 => match inner {
546 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int16() as i32)))?,
547 _ => return no_match_err(),
548 },
549 DataType::Decimal => match inner {
550 AvroSchema::Decimal(decimal_schema) => {
551 maybe.on_base(|s| {
552 match s.into_decimal() {
553 risingwave_common::types::Decimal::Normalized(decimal) => {
554 let signed_bigint_bytes =
561 rust_decimal_to_scaled_bigint(decimal, decimal_schema.scale)
562 .map_err(FieldEncodeError::new)?;
563 Ok(Value::Decimal(apache_avro::Decimal::from(
564 &signed_bigint_bytes,
565 )))
566 }
567 d @ risingwave_common::types::Decimal::NaN
568 | d @ risingwave_common::types::Decimal::NegativeInf
569 | d @ risingwave_common::types::Decimal::PositiveInf => {
570 Err(FieldEncodeError::new(format!(
571 "Avro Decimal does not support NaN or Inf, but got {}",
572 d
573 )))
574 }
575 }
576 })?
577 }
578 _ => return no_match_err(),
579 },
580 DataType::Jsonb => match inner {
581 AvroSchema::String => {
582 maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?
583 }
584 _ => return no_match_err(),
585 },
586 DataType::Int256 => {
588 return no_match_err();
589 }
590 };
591
592 D::handle_nullable_union(value, opt_idx)
593}
594
595#[cfg(test)]
596mod tests {
597 use std::collections::HashMap;
598 use std::str::FromStr;
599
600 use expect_test::expect;
601 use itertools::Itertools;
602 use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
603 use risingwave_common::catalog::Field;
604 use risingwave_common::row::OwnedRow;
605 use risingwave_common::types::{
606 Date, Datum, Interval, JsonbVal, ListValue, MapType, MapValue, Scalar, ScalarImpl,
607 StructValue, Time, Timestamptz, ToDatumRef,
608 };
609
610 use super::*;
611
612 #[track_caller]
613 fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
614 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
615 let refs = NamesRef::new(&avro_schema).unwrap();
616 let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema, &refs).unwrap();
617 assert_eq!(actual, expected);
618 }
619
620 #[track_caller]
621 fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
622 where
623 D::Out: std::fmt::Debug,
624 {
625 let avro_schema = AvroSchema::parse_str(avro).unwrap();
626 let refs = NamesRef::new(&avro_schema).unwrap();
627 let err = on_field(t, d, &avro_schema, &refs).unwrap_err();
628 assert_eq!(err.to_string(), expected);
629 }
630
631 #[track_caller]
632 fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
633 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
634 let refs = NamesRef::new(&avro_schema).unwrap();
635 let rw_type = DataType::from_str(rw_type).unwrap();
636 let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
637
638 if let Err(validate_err) = on_field(&rw_type, (), &avro_schema, &refs) {
639 expected.assert_debug_eq(&validate_err);
640 return;
641 }
642 let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema, &refs);
643 match actual {
644 Ok(v) => expected.assert_eq(&print_avro_value(&v)),
645 Err(e) => expected.assert_debug_eq(&e),
646 }
647 }
648
649 fn print_avro_value(v: &Value) -> String {
650 match v {
651 Value::Map(m) => {
652 let mut res = "Map({".to_owned();
653 for (k, v) in m.iter().sorted_by_key(|x| x.0) {
654 res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
655 }
656 res.push_str("})");
657 res
658 }
659 _ => format!("{v:?}"),
660 }
661 }
662
663 #[test]
664 fn test_encode_v2() {
665 test_v2(
666 "boolean",
667 "false",
668 r#""int""#,
669 expect![[r#"
670 FieldEncodeError {
671 message: "cannot encode boolean column as \"int\" field",
672 rev_path: [],
673 }
674 "#]],
675 );
676 test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
677
678 test_v2(
679 "map(varchar,varchar)",
680 "{1:1,2:2,3:3}",
681 r#"{"type": "map","values": "string"}"#,
682 expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
683 );
684
685 test_v2(
686 "map(varchar,varchar)",
687 "{1:1,2:NULL,3:3}",
688 r#"{"type": "map","values": "string"}"#,
689 expect![[r#"
690 FieldEncodeError {
691 message: "found null but required",
692 rev_path: [],
693 }
694 "#]],
695 );
696
697 test_v2(
698 "map(varchar,varchar)",
699 "{1:1,2:NULL,3:3}",
700 r#"{"type": "map","values": ["null", "string"]}"#,
701 expect![[
702 r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
703 ]],
704 );
705
706 test_v2(
707 "map(int,varchar)",
708 "{1:1,2:NULL,3:3}",
709 r#"{"type": "map","values": ["null", "string"]}"#,
710 expect![[r#"
711 FieldEncodeError {
712 message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
713 rev_path: [],
714 }
715 "#]],
716 );
717 }
718
719 #[test]
720 fn test_encode_avro_ok() {
721 test_ok(
722 &DataType::Boolean,
723 Some(ScalarImpl::Bool(false)),
724 r#""boolean""#,
725 Value::Boolean(false),
726 );
727
728 test_ok(
729 &DataType::Varchar,
730 Some(ScalarImpl::Utf8("RisingWave".into())),
731 r#""string""#,
732 Value::String("RisingWave".into()),
733 );
734
735 test_ok(
736 &DataType::Bytea,
737 Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
738 r#""bytes""#,
739 Value::Bytes([0xbe, 0xef].into()),
740 );
741
742 test_ok(
743 &DataType::Float32,
744 Some(ScalarImpl::Float32(3.5f32.into())),
745 r#""float""#,
746 Value::Float(3.5f32),
747 );
748
749 test_ok(
750 &DataType::Float64,
751 Some(ScalarImpl::Float64(4.25f64.into())),
752 r#""double""#,
753 Value::Double(4.25f64),
754 );
755
756 test_ok(
757 &DataType::Int32,
758 Some(ScalarImpl::Int32(16)),
759 r#""int""#,
760 Value::Int(16),
761 );
762
763 test_ok(
764 &DataType::Int64,
765 Some(ScalarImpl::Int64(i64::MAX)),
766 r#""long""#,
767 Value::Long(i64::MAX),
768 );
769
770 test_ok(
771 &DataType::Serial,
772 Some(ScalarImpl::Serial(i64::MAX.into())),
773 r#""long""#,
774 Value::Long(i64::MAX),
775 );
776
777 let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
778 test_ok(
779 &DataType::Timestamptz,
780 Some(ScalarImpl::Timestamptz(tstz)),
781 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
782 Value::TimestampMicros(tstz.timestamp_micros()),
783 );
784 test_ok(
785 &DataType::Timestamptz,
786 Some(ScalarImpl::Timestamptz(tstz)),
787 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
788 Value::TimestampMillis(tstz.timestamp_millis()),
789 );
790
791 test_ok(
792 &DataType::Date,
793 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
794 r#"{"type": "int", "logicalType": "date"}"#,
795 Value::Date(1),
796 );
797
798 let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
799 test_ok(
800 &DataType::Time,
801 Some(ScalarImpl::Time(tm)),
802 r#"{"type": "long", "logicalType": "time-micros"}"#,
803 Value::TimeMicros(1000 * 1_000_000),
804 );
805 test_ok(
806 &DataType::Time,
807 Some(ScalarImpl::Time(tm)),
808 r#"{"type": "int", "logicalType": "time-millis"}"#,
809 Value::TimeMillis(1000 * 1000),
810 );
811
812 test_ok(
813 &DataType::Int16,
814 Some(ScalarImpl::Int16(i16::MAX)),
815 r#""int""#,
816 Value::Int(i16::MAX as i32),
817 );
818
819 test_ok(
820 &DataType::Int16,
821 Some(ScalarImpl::Int16(i16::MIN)),
822 r#""int""#,
823 Value::Int(i16::MIN as i32),
824 );
825
826 test_ok(
827 &DataType::Jsonb,
828 Some(ScalarImpl::Jsonb(
829 JsonbVal::from_str(r#"{"a": 1}"#).unwrap(),
830 )),
831 r#""string""#,
832 Value::String(r#"{"a": 1}"#.into()),
833 );
834
835 test_ok(
836 &DataType::Interval,
837 Some(ScalarImpl::Interval(Interval::from_month_day_usec(
838 13, 2, 1000000,
839 ))),
840 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
841 Value::Duration(apache_avro::Duration::new(
842 apache_avro::Months::new(13),
843 apache_avro::Days::new(2),
844 apache_avro::Millis::new(1000),
845 )),
846 );
847
848 let mut inner_map_array_builder = MapArrayBuilder::with_type(
849 2,
850 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
851 );
852 inner_map_array_builder.append(Some(
853 MapValue::try_from_kv(
854 ListValue::from_iter(["a", "b"]),
855 ListValue::from_iter([1, 2]),
856 )
857 .unwrap()
858 .as_scalar_ref(),
859 ));
860 inner_map_array_builder.append(Some(
861 MapValue::try_from_kv(
862 ListValue::from_iter(["c", "d"]),
863 ListValue::from_iter([3, 4]),
864 )
865 .unwrap()
866 .as_scalar_ref(),
867 ));
868 let inner_map_array = inner_map_array_builder.finish();
869 test_ok(
870 &DataType::Map(MapType::from_kv(
871 DataType::Varchar,
872 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
873 )),
874 Some(ScalarImpl::Map(
875 MapValue::try_from_kv(
876 ListValue::from_iter(["k1", "k2"]),
877 ListValue::new(inner_map_array.into()),
878 )
879 .unwrap(),
880 )),
881 r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
882 Value::Map(HashMap::from_iter([
883 (
884 "k1".into(),
885 Value::Map(HashMap::from_iter([
886 ("a".into(), Value::Int(1)),
887 ("b".into(), Value::Int(2)),
888 ])),
889 ),
890 (
891 "k2".into(),
892 Value::Map(HashMap::from_iter([
893 ("c".into(), Value::Int(3)),
894 ("d".into(), Value::Int(4)),
895 ])),
896 ),
897 ])),
898 );
899
900 test_ok(
901 &DataType::Struct(StructType::new(vec![
902 (
903 "p",
904 DataType::Struct(StructType::new(vec![
905 ("x", DataType::Int32),
906 ("y", DataType::Int32),
907 ])),
908 ),
909 (
910 "q",
911 DataType::Struct(StructType::new(vec![
912 ("x", DataType::Int32),
913 ("y", DataType::Int32),
914 ])),
915 ),
916 ])),
917 Some(ScalarImpl::Struct(StructValue::new(vec![
918 Some(ScalarImpl::Struct(StructValue::new(vec![
919 Some(ScalarImpl::Int32(-2)),
920 Some(ScalarImpl::Int32(-1)),
921 ]))),
922 Some(ScalarImpl::Struct(StructValue::new(vec![
923 Some(ScalarImpl::Int32(2)),
924 Some(ScalarImpl::Int32(1)),
925 ]))),
926 ]))),
927 r#"{
928 "type": "record",
929 "name": "Segment",
930 "fields": [
931 {
932 "name": "p",
933 "type": {
934 "type": "record",
935 "name": "Point",
936 "fields": [
937 {
938 "name": "x",
939 "type": "int"
940 },
941 {
942 "name": "y",
943 "type": "int"
944 }
945 ]
946 }
947 },
948 {
949 "name": "q",
950 "type": "Point"
951 }
952 ]
953 }"#,
954 Value::Record(vec![
955 (
956 "p".to_owned(),
957 Value::Record(vec![
958 ("x".to_owned(), Value::Int(-2)),
959 ("y".to_owned(), Value::Int(-1)),
960 ]),
961 ),
962 (
963 "q".to_owned(),
964 Value::Record(vec![
965 ("x".to_owned(), Value::Int(2)),
966 ("y".to_owned(), Value::Int(1)),
967 ]),
968 ),
969 ]),
970 );
971
972 let complex_json = r#"{
974 "person": {
975 "name": "John Doe",
976 "age": 30,
977 "address": {
978 "street": "123 Main St.",
979 "city": "New York",
980 "coordinates": [40.7128, -74.0060]
981 },
982 "contacts": [
983 {"type": "email", "value": "john@example.com"},
984 {"type": "phone", "value": "+1-555-123-4567"}
985 ],
986 "active": true,
987 "preferences": {
988 "notifications": true,
989 "theme": "dark",
990 "languages": ["en", "es"],
991 "lastLogin": null
992 },
993 "tags": ["premium", "verified"],
994 "unicode_test": "Hello, δΈη! π"
995 }
996 }"#;
997
998 let input_json = JsonbVal::from_str(complex_json).unwrap();
999 let result = on_field(
1000 &DataType::Jsonb,
1001 Some(ScalarImpl::Jsonb(input_json.clone())).to_datum_ref(),
1002 &AvroSchema::parse_str(r#""string""#).unwrap(),
1003 &NamesRef::new(&AvroSchema::parse_str(r#""string""#).unwrap()).unwrap(),
1004 )
1005 .unwrap();
1006
1007 if let Value::String(result_str) = result {
1009 let expected_json: serde_json::Value = serde_json::from_str(complex_json).unwrap();
1010 let actual_json: serde_json::Value = serde_json::from_str(&result_str).unwrap();
1011 assert_eq!(
1012 expected_json, actual_json,
1013 "JSON values should be equivalent regardless of key order"
1014 );
1015 } else {
1016 panic!("Expected String value");
1017 };
1018 }
1019
1020 #[test]
1021 fn test_encode_avro_err() {
1022 test_err(
1023 &DataType::Interval,
1024 Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
1025 -1,
1026 -1,
1027 i64::MAX,
1028 ))),
1029 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
1030 "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
1031 );
1032
1033 let avro_schema = AvroSchema::parse_str(
1034 r#"{"type": "record", "name": "Root", "fields": [
1035 {"name": "f0", "type": "int"}
1036 ]}"#,
1037 )
1038 .unwrap();
1039 let mut record = Record::new(&avro_schema).unwrap();
1040 record.put("f0", Value::String("2".into()));
1041 let res: SinkResult<Vec<u8>> = AvroEncoded {
1042 value: Value::from(record),
1043 schema: Arc::new(avro_schema),
1044 header: AvroHeader::ConfluentSchemaRegistry(42),
1045 }
1046 .ser_to();
1047 assert_eq!(
1048 res.unwrap_err().to_string(),
1049 "Encode error: Value does not match schema"
1050 );
1051 }
1052
1053 #[test]
1054 fn test_encode_avro_record() {
1055 let avro_schema = AvroSchema::parse_str(
1056 r#"{
1057 "type": "record",
1058 "name": "Root",
1059 "fields": [
1060 {"name": "req", "type": "int"},
1061 {"name": "opt", "type": ["null", "long"]}
1062 ]
1063 }"#,
1064 )
1065 .unwrap();
1066 let avro_schema = Arc::new(avro_schema);
1067 let header = AvroHeader::None;
1068
1069 let schema = Schema::new(vec![
1070 Field::with_name(DataType::Int64, "opt"),
1071 Field::with_name(DataType::Int32, "req"),
1072 ]);
1073 let row = OwnedRow::new(vec![
1074 Some(ScalarImpl::Int64(31)),
1075 Some(ScalarImpl::Int32(15)),
1076 ]);
1077 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1078 let actual = encoder.encode(row).unwrap();
1079 assert_eq!(
1080 actual.value,
1081 Value::Record(vec![
1082 ("req".into(), Value::Int(15)),
1083 ("opt".into(), Value::Union(1, Value::Long(31).into())),
1084 ])
1085 );
1086
1087 let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
1088 let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
1089 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1090 let actual = encoder.encode(row).unwrap();
1091 assert_eq!(
1092 actual.value,
1093 Value::Record(vec![
1094 ("req".into(), Value::Int(15)),
1095 ("opt".into(), Value::Union(0, Value::Null.into())),
1096 ])
1097 );
1098
1099 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1100 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1101 panic!()
1102 };
1103 assert_eq!(
1104 err.to_string(),
1105 "Encode error: encode 'req' error: field not present but required"
1106 );
1107
1108 let schema = Schema::new(vec![
1109 Field::with_name(DataType::Int64, "opt"),
1110 Field::with_name(DataType::Int32, "req"),
1111 Field::with_name(DataType::Varchar, "extra"),
1112 ]);
1113 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1114 panic!()
1115 };
1116 assert_eq!(
1117 err.to_string(),
1118 "Encode error: encode 'extra' error: field not in avro"
1119 );
1120
1121 let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
1122 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1123 let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
1124 panic!()
1125 };
1126 assert_eq!(
1127 err.to_string(),
1128 r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
1129 );
1130
1131 test_err(
1132 &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
1133 (),
1134 r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
1135 "encode 'f0' error: cannot encode boolean column as \"int\" field",
1136 );
1137 }
1138
1139 #[test]
1140 fn test_encode_avro_array() {
1141 let avro_schema = r#"{
1142 "type": "array",
1143 "items": "int"
1144 }"#;
1145
1146 test_ok(
1147 &DataType::List(DataType::Int32.into()),
1148 Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
1149 avro_schema,
1150 Value::Array(vec![Value::Int(4), Value::Int(5)]),
1151 );
1152
1153 test_err(
1154 &DataType::List(DataType::Int32.into()),
1155 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
1156 avro_schema,
1157 "encode '' error: found null but required",
1158 );
1159
1160 test_ok(
1161 &DataType::List(DataType::Int32.into()),
1162 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
1163 r#"{
1164 "type": "array",
1165 "items": ["null", "int"]
1166 }"#,
1167 Value::Array(vec![
1168 Value::Union(1, Value::Int(4).into()),
1169 Value::Union(0, Value::Null.into()),
1170 ]),
1171 );
1172
1173 test_ok(
1174 &DataType::List(DataType::List(DataType::Int32.into()).into()),
1175 Some(ScalarImpl::List(ListValue::from_iter([
1176 ListValue::from_iter([26, 29]),
1177 ListValue::from_iter([46, 49]),
1178 ]))),
1179 r#"{
1180 "type": "array",
1181 "items": {
1182 "type": "array",
1183 "items": "int"
1184 }
1185 }"#,
1186 Value::Array(vec![
1187 Value::Array(vec![Value::Int(26), Value::Int(29)]),
1188 Value::Array(vec![Value::Int(46), Value::Int(49)]),
1189 ]),
1190 );
1191
1192 test_err(
1193 &DataType::List(DataType::Boolean.into()),
1194 (),
1195 r#"{"type": "array", "items": "int"}"#,
1196 "encode '' error: cannot encode boolean column as \"int\" field",
1197 );
1198 }
1199
1200 #[test]
1201 fn test_encode_avro_union() {
1202 let t = &DataType::Timestamptz;
1203 let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
1204 let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
1205 let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
1206 let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
1207 let empty = "[]";
1208 let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
1209 let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
1210
1211 test_ok(
1212 t,
1213 datum.clone(),
1214 opt_micros,
1215 Value::Union(1, Value::TimestampMicros(1500).into()),
1216 );
1217 test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
1218 test_ok(
1219 t,
1220 datum.clone(),
1221 opt_millis,
1222 Value::Union(1, Value::TimestampMillis(1).into()),
1223 );
1224 test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
1225
1226 test_err(
1227 t,
1228 datum.to_datum_ref(),
1229 both,
1230 r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
1231 );
1232
1233 test_err(
1234 t,
1235 datum.to_datum_ref(),
1236 empty,
1237 "encode '' error: cannot encode timestamp with time zone column as [] field",
1238 );
1239
1240 test_ok(
1241 t,
1242 datum.clone(),
1243 one,
1244 Value::Union(0, Value::TimestampMillis(1).into()),
1245 );
1246 test_err(t, None, one, "encode '' error: found null but required");
1247
1248 test_ok(
1249 t,
1250 datum.clone(),
1251 right,
1252 Value::Union(0, Value::TimestampMillis(1).into()),
1253 );
1254 test_ok(t, None, right, Value::Union(1, Value::Null.into()));
1255 }
1256
1257 #[test]
1260 fn test_encode_avro_lib_bug() {
1261 use apache_avro::{Reader, Writer};
1262
1263 let avro_schema = AvroSchema::parse_str(
1265 r#"{
1266 "type": "record",
1267 "name": "Root",
1268 "fields": [
1269 {
1270 "name": "f0",
1271 "type": ["null", "int"]
1272 },
1273 {
1274 "name": "f1",
1275 "type": ["null", "int"]
1276 }
1277 ]
1278 }"#,
1279 )
1280 .unwrap();
1281
1282 let mut writer = Writer::new(&avro_schema, Vec::new());
1283 let mut record = Record::new(writer.schema()).unwrap();
1284 record.put("f1", Value::Int(3));
1286 writer.append(record).unwrap();
1287 let encoded = writer.into_inner().unwrap();
1288 let reader = Reader::new(encoded.as_slice()).unwrap();
1290 for value in reader {
1291 assert_eq!(
1292 value.unwrap_err().to_string(),
1293 "Union index 3 out of bounds: 2"
1294 );
1295 }
1296
1297 let mut writer = Writer::new(&avro_schema, Vec::new());
1298 let mut record = Record::new(writer.schema()).unwrap();
1299 record.put("f1", Value::Union(1, Value::Int(3).into()));
1301 writer.append(record).unwrap();
1302 let encoded = writer.into_inner().unwrap();
1303 let reader = Reader::new(encoded.as_slice()).unwrap();
1305 for value in reader {
1306 assert_eq!(
1307 value.unwrap(),
1308 Value::Record(vec![
1309 ("f0".into(), Value::Union(1, Value::Int(3).into())),
1310 ("f1".into(), Value::Union(0, Value::Null.into())),
1311 ])
1312 );
1313 }
1314 }
1315}