1use std::collections::HashMap;
16use std::sync::Arc;
17
18use apache_avro::schema::{Name, RecordSchema, Schema as AvroSchema};
19use apache_avro::types::{Record, Value};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType};
23use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
24use risingwave_connector_codec::decoder::utils::rust_decimal_to_scaled_bigint;
25use thiserror_ext::AsReport;
26
27use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
28
29type Result<T> = std::result::Result<T, FieldEncodeError>;
30struct NamesRef(HashMap<Name, AvroSchema>);
31
32pub struct AvroEncoder {
33 schema: Schema,
34 col_indices: Option<Vec<usize>>,
35 avro_schema: Arc<AvroSchema>,
36 refs: NamesRef,
37 header: AvroHeader,
38}
39
40#[derive(Debug, Clone, Copy)]
41pub enum AvroHeader {
42 None,
43 SingleObject,
48 ContainerFile,
54 ConfluentSchemaRegistry(i32),
59 GlueSchemaRegistry(uuid::Uuid),
65}
66
67impl AvroEncoder {
68 pub fn new(
69 schema: Schema,
70 col_indices: Option<Vec<usize>>,
71 avro_schema: Arc<AvroSchema>,
72 header: AvroHeader,
73 ) -> SinkResult<Self> {
74 let refs = NamesRef::new(&avro_schema)?;
75 match &col_indices {
76 Some(col_indices) => validate_fields(
77 col_indices.iter().map(|idx| {
78 let f = &schema[*idx];
79 (f.name.as_str(), &f.data_type)
80 }),
81 &avro_schema,
82 &refs,
83 )?,
84 None => validate_fields(
85 schema
86 .fields
87 .iter()
88 .map(|f| (f.name.as_str(), &f.data_type)),
89 &avro_schema,
90 &refs,
91 )?,
92 };
93
94 Ok(Self {
95 schema,
96 col_indices,
97 avro_schema,
98 refs,
99 header,
100 })
101 }
102}
103
104impl NamesRef {
105 fn new(root: &AvroSchema) -> std::result::Result<Self, apache_avro::Error> {
106 let resolved = apache_avro::schema::ResolvedSchema::try_from(root)?;
107 let refs = resolved
108 .get_names()
109 .iter()
110 .map(|(k, v)| (k.to_owned(), (*v).to_owned()))
111 .collect();
112 Ok(Self(refs))
113 }
114
115 fn lookup<'a>(&'a self, avro: &'a AvroSchema) -> &'a AvroSchema {
116 match avro {
117 AvroSchema::Ref { name } => &self.0[name],
118 _ => avro,
119 }
120 }
121}
122
123pub struct AvroEncoded {
124 value: Value,
125 schema: Arc<AvroSchema>,
126 header: AvroHeader,
127}
128
129impl RowEncoder for AvroEncoder {
130 type Output = AvroEncoded;
131
132 fn schema(&self) -> &Schema {
133 &self.schema
134 }
135
136 fn col_indices(&self) -> Option<&[usize]> {
137 self.col_indices.as_deref()
138 }
139
140 fn encode_cols(
141 &self,
142 row: impl Row,
143 col_indices: impl Iterator<Item = usize>,
144 ) -> SinkResult<Self::Output> {
145 let record = encode_fields(
146 col_indices.map(|idx| {
147 let f = &self.schema[idx];
148 ((f.name.as_str(), &f.data_type), row.datum_at(idx))
149 }),
150 &self.avro_schema,
151 &self.refs,
152 )?;
153 Ok(AvroEncoded {
154 value: record.into(),
155 schema: self.avro_schema.clone(),
156 header: self.header,
157 })
158 }
159}
160
161impl SerTo<Vec<u8>> for AvroEncoded {
162 fn ser_to(self) -> SinkResult<Vec<u8>> {
163 use bytes::BufMut as _;
164
165 let header = match self.header {
166 AvroHeader::ConfluentSchemaRegistry(schema_id) => {
167 let mut buf = Vec::with_capacity(1 + 4);
168 buf.put_u8(0);
169 buf.put_i32(schema_id);
170 buf
171 }
172 AvroHeader::GlueSchemaRegistry(schema_version_id) => {
173 let mut buf = Vec::with_capacity(1 + 1 + 16);
174 buf.put_u8(3);
175 buf.put_u8(0);
176 buf.put_slice(schema_version_id.as_bytes());
177 buf
178 }
179 AvroHeader::None | AvroHeader::SingleObject | AvroHeader::ContainerFile => {
180 return Err(crate::sink::SinkError::Encode(format!(
181 "{:?} unsupported yet",
182 self.header
183 )));
184 }
185 };
186
187 let raw = apache_avro::to_avro_datum(&self.schema, self.value)
188 .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
189 let mut buf = Vec::with_capacity(header.len() + raw.len());
190 buf.put_slice(&header);
191 buf.put_slice(&raw);
192
193 Ok(buf)
194 }
195}
196
197enum OptIdx {
198 NotUnion,
200 Single,
202 NullLeft,
204 NullRight,
206}
207
208trait MaybeData: std::fmt::Debug {
214 type Out;
215
216 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
217
218 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
220
221 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
222
223 fn on_map(
224 self,
225 value_type: &DataType,
226 avro_value_schema: &AvroSchema,
227 refs: &NamesRef,
228 ) -> Result<Self::Out>;
229
230 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
231}
232
233impl MaybeData for () {
234 type Out = ();
235
236 fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
237 Ok(self)
238 }
239
240 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
241 validate_fields(st.iter(), avro, refs)
242 }
243
244 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
245 on_field(elem, (), avro, refs)
246 }
247
248 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
249 on_field(elem, (), avro, refs)
250 }
251
252 fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
253 Ok(out)
254 }
255}
256
257impl MaybeData for DatumRef<'_> {
258 type Out = Value;
259
260 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
261 match self {
262 Some(s) => f(s),
263 None => Ok(Value::Null),
264 }
265 }
266
267 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
268 let d = match self {
269 Some(s) => s.into_struct(),
270 None => return Ok(Value::Null),
271 };
272 let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro, refs)?;
273 Ok(record.into())
274 }
275
276 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
277 let d = match self {
278 Some(s) => s.into_list(),
279 None => return Ok(Value::Null),
280 };
281 let vs = d
282 .iter()
283 .map(|d| on_field(elem, d, avro, refs))
284 .try_collect()?;
285 Ok(Value::Array(vs))
286 }
287
288 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
289 let d = match self {
290 Some(s) => s.into_map(),
291 None => return Ok(Value::Null),
292 };
293 let vs = d
294 .iter()
295 .map(|(k, v)| {
296 let k = k.into_utf8().to_owned();
297 let v = on_field(elem, v, avro, refs)?;
298 Ok((k, v))
299 })
300 .try_collect()?;
301 Ok(Value::Map(vs))
302 }
303
304 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
305 use OptIdx::*;
306
307 match out == Value::Null {
308 true => {
309 let ni = match opt_idx {
310 NotUnion | Single => {
311 return Err(FieldEncodeError::new("found null but required"));
312 }
313 NullLeft => 0,
314 NullRight => 1,
315 };
316 Ok(Value::Union(ni, out.into()))
317 }
318 false => {
319 let vi = match opt_idx {
320 NotUnion => return Ok(out),
321 NullLeft => 1,
322 Single | NullRight => 0,
323 };
324 Ok(Value::Union(vi, out.into()))
325 }
326 }
327 }
328}
329
330fn validate_fields<'rw>(
331 rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
332 avro: &AvroSchema,
333 refs: &NamesRef,
334) -> Result<()> {
335 let avro = refs.lookup(avro);
336 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
337 return Err(FieldEncodeError::new(format!(
338 "expect avro record but got {}",
339 avro.canonical_form(),
340 )));
341 };
342 let mut present = vec![false; fields.len()];
343 for (name, t) in rw_fields {
344 let Some(&idx) = lookup.get(name) else {
345 return Err(FieldEncodeError::new("field not in avro").with_name(name));
346 };
347 present[idx] = true;
348 let avro_field = &fields[idx];
349 on_field(t, (), &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
350 }
351 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
352 if p {
353 continue;
354 }
355 if !avro_field.is_nullable() {
356 return Err(
357 FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
358 );
359 }
360 }
361 Ok(())
362}
363
364fn encode_fields<'avro, 'rw>(
365 fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
366 schema: &'avro AvroSchema,
367 refs: &'avro NamesRef,
368) -> Result<Record<'avro>> {
369 let schema = refs.lookup(schema);
370 let mut record = Record::new(schema).unwrap();
371 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
372 unreachable!()
373 };
374 let mut present = vec![false; fields.len()];
375 for ((name, t), d) in fields_with_datums {
376 let idx = lookup[name];
377 present[idx] = true;
378 let avro_field = &fields[idx];
379 let value = on_field(t, d, &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
380 record.put(name, value);
381 }
382 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
386 if p {
387 continue;
388 }
389 let AvroSchema::Union(u) = &avro_field.schema else {
390 unreachable!()
391 };
392 let ni = u
396 .variants()
397 .iter()
398 .position(|a| a == &AvroSchema::Null)
399 .unwrap();
400 record.put(
401 &avro_field.name,
402 Value::Union(ni.try_into().unwrap(), Value::Null.into()),
403 );
404 }
405 Ok(record)
406}
407
408fn on_field<D: MaybeData>(
411 data_type: &DataType,
412 maybe: D,
413 expected: &AvroSchema,
414 refs: &NamesRef,
415) -> Result<D::Out> {
416 use risingwave_common::types::Interval;
417
418 let no_match_err = || {
419 Err(FieldEncodeError::new(format!(
420 "cannot encode {} column as {} field",
421 data_type,
422 expected.canonical_form()
423 )))
424 };
425
426 let (inner, opt_idx) = match expected {
429 AvroSchema::Union(union) => match union.variants() {
430 [] => return no_match_err(),
431 [one] => (one, OptIdx::Single),
432 [AvroSchema::Null, r] => (r, OptIdx::NullLeft),
433 [l, AvroSchema::Null] => (l, OptIdx::NullRight),
434 _ => return no_match_err(),
435 },
436 _ => (expected, OptIdx::NotUnion),
437 };
438
439 let inner = refs.lookup(inner);
440
441 let value = match &data_type {
442 DataType::Boolean => match inner {
444 AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
445 _ => return no_match_err(),
446 },
447 DataType::Varchar => match inner {
448 AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
449 _ => return no_match_err(),
450 },
451 DataType::Bytea => match inner {
452 AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
453 _ => return no_match_err(),
454 },
455 DataType::Float32 => match inner {
456 AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
457 _ => return no_match_err(),
458 },
459 DataType::Float64 => match inner {
460 AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
461 _ => return no_match_err(),
462 },
463 DataType::Int32 => match inner {
464 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
465 _ => return no_match_err(),
466 },
467 DataType::Int64 => match inner {
468 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
469 _ => return no_match_err(),
470 },
471 DataType::Serial => match inner {
472 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
473 _ => return no_match_err(),
474 },
475 DataType::Struct(st) => match inner {
476 AvroSchema::Record { .. } => maybe.on_struct(st, inner, refs)?,
477 _ => return no_match_err(),
478 },
479 DataType::List(elem) => match inner {
480 AvroSchema::Array(avro_elem) => maybe.on_list(elem, avro_elem, refs)?,
481 _ => return no_match_err(),
482 },
483 DataType::Map(m) => {
484 if *m.key() != DataType::Varchar {
485 return no_match_err();
486 }
487 match inner {
488 AvroSchema::Map(avro_value_type) => {
489 maybe.on_map(m.value(), avro_value_type, refs)?
490 }
491 _ => return no_match_err(),
492 }
493 }
494
495 DataType::Timestamptz => match inner {
497 AvroSchema::TimestampMicros => maybe.on_base(|s| {
498 Ok(Value::TimestampMicros(
499 s.into_timestamptz().timestamp_micros(),
500 ))
501 })?,
502 AvroSchema::TimestampMillis => maybe.on_base(|s| {
503 Ok(Value::TimestampMillis(
504 s.into_timestamptz().timestamp_millis(),
505 ))
506 })?,
507 _ => return no_match_err(),
508 },
509 DataType::Timestamp => return no_match_err(),
510 DataType::Date => match inner {
511 AvroSchema::Date => {
512 maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
513 }
514 _ => return no_match_err(),
515 },
516 DataType::Time => match inner {
517 AvroSchema::TimeMicros => {
518 maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
519 }
520 AvroSchema::TimeMillis => maybe.on_base(|s| {
521 Ok(Value::TimeMillis(
522 (Interval::from(s.into_time()).usecs() / 1000)
523 .try_into()
524 .unwrap(),
525 ))
526 })?,
527 _ => return no_match_err(),
528 },
529 DataType::Interval => match inner {
530 AvroSchema::Duration => maybe.on_base(|s| {
531 use apache_avro::{Days, Duration, Millis, Months};
532 let iv = s.into_interval();
533
534 let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
535
536 Ok(Value::Duration(Duration::new(
537 Months::new(iv.months().try_into().map_err(overflow)?),
538 Days::new(iv.days().try_into().map_err(overflow)?),
539 Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
540 )))
541 })?,
542 _ => return no_match_err(),
543 },
544 DataType::Int16 => match inner {
546 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int16() as i32)))?,
547 _ => return no_match_err(),
548 },
549 DataType::Decimal => match inner {
550 AvroSchema::Decimal(decimal_schema) => {
551 maybe.on_base(|s| {
552 match s.into_decimal() {
553 risingwave_common::types::Decimal::Normalized(decimal) => {
554 let signed_bigint_bytes =
561 rust_decimal_to_scaled_bigint(decimal, decimal_schema.scale)
562 .map_err(FieldEncodeError::new)?;
563 Ok(Value::Decimal(apache_avro::Decimal::from(
564 &signed_bigint_bytes,
565 )))
566 }
567 d @ risingwave_common::types::Decimal::NaN
568 | d @ risingwave_common::types::Decimal::NegativeInf
569 | d @ risingwave_common::types::Decimal::PositiveInf => {
570 Err(FieldEncodeError::new(format!(
571 "Avro Decimal does not support NaN or Inf, but got {}",
572 d
573 )))
574 }
575 }
576 })?
577 }
578 _ => return no_match_err(),
579 },
580 DataType::Jsonb => match inner {
581 AvroSchema::String => {
582 maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?
583 }
584 _ => return no_match_err(),
585 },
586 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
587 DataType::Int256 => {
589 return no_match_err();
590 }
591 };
592
593 D::handle_nullable_union(value, opt_idx)
594}
595
596#[cfg(test)]
597mod tests {
598 use std::collections::HashMap;
599 use std::str::FromStr;
600
601 use expect_test::expect;
602 use itertools::Itertools;
603 use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
604 use risingwave_common::catalog::Field;
605 use risingwave_common::row::OwnedRow;
606 use risingwave_common::types::{
607 Date, Datum, Interval, JsonbVal, ListValue, MapType, MapValue, Scalar, ScalarImpl,
608 StructValue, Time, Timestamptz, ToDatumRef,
609 };
610
611 use super::*;
612
613 #[track_caller]
614 fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
615 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
616 let refs = NamesRef::new(&avro_schema).unwrap();
617 let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema, &refs).unwrap();
618 assert_eq!(actual, expected);
619 }
620
621 #[track_caller]
622 fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
623 where
624 D::Out: std::fmt::Debug,
625 {
626 let avro_schema = AvroSchema::parse_str(avro).unwrap();
627 let refs = NamesRef::new(&avro_schema).unwrap();
628 let err = on_field(t, d, &avro_schema, &refs).unwrap_err();
629 assert_eq!(err.to_string(), expected);
630 }
631
632 #[track_caller]
633 fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
634 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
635 let refs = NamesRef::new(&avro_schema).unwrap();
636 let rw_type = DataType::from_str(rw_type).unwrap();
637 let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
638
639 if let Err(validate_err) = on_field(&rw_type, (), &avro_schema, &refs) {
640 expected.assert_debug_eq(&validate_err);
641 return;
642 }
643 let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema, &refs);
644 match actual {
645 Ok(v) => expected.assert_eq(&print_avro_value(&v)),
646 Err(e) => expected.assert_debug_eq(&e),
647 }
648 }
649
650 fn print_avro_value(v: &Value) -> String {
651 match v {
652 Value::Map(m) => {
653 let mut res = "Map({".to_owned();
654 for (k, v) in m.iter().sorted_by_key(|x| x.0) {
655 res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
656 }
657 res.push_str("})");
658 res
659 }
660 _ => format!("{v:?}"),
661 }
662 }
663
664 #[test]
665 fn test_encode_v2() {
666 test_v2(
667 "boolean",
668 "false",
669 r#""int""#,
670 expect![[r#"
671 FieldEncodeError {
672 message: "cannot encode boolean column as \"int\" field",
673 rev_path: [],
674 }
675 "#]],
676 );
677 test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
678
679 test_v2(
680 "map(varchar,varchar)",
681 "{1:1,2:2,3:3}",
682 r#"{"type": "map","values": "string"}"#,
683 expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
684 );
685
686 test_v2(
687 "map(varchar,varchar)",
688 "{1:1,2:NULL,3:3}",
689 r#"{"type": "map","values": "string"}"#,
690 expect![[r#"
691 FieldEncodeError {
692 message: "found null but required",
693 rev_path: [],
694 }
695 "#]],
696 );
697
698 test_v2(
699 "map(varchar,varchar)",
700 "{1:1,2:NULL,3:3}",
701 r#"{"type": "map","values": ["null", "string"]}"#,
702 expect![[
703 r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
704 ]],
705 );
706
707 test_v2(
708 "map(int,varchar)",
709 "{1:1,2:NULL,3:3}",
710 r#"{"type": "map","values": ["null", "string"]}"#,
711 expect![[r#"
712 FieldEncodeError {
713 message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
714 rev_path: [],
715 }
716 "#]],
717 );
718 }
719
720 #[test]
721 fn test_encode_avro_ok() {
722 test_ok(
723 &DataType::Boolean,
724 Some(ScalarImpl::Bool(false)),
725 r#""boolean""#,
726 Value::Boolean(false),
727 );
728
729 test_ok(
730 &DataType::Varchar,
731 Some(ScalarImpl::Utf8("RisingWave".into())),
732 r#""string""#,
733 Value::String("RisingWave".into()),
734 );
735
736 test_ok(
737 &DataType::Bytea,
738 Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
739 r#""bytes""#,
740 Value::Bytes([0xbe, 0xef].into()),
741 );
742
743 test_ok(
744 &DataType::Float32,
745 Some(ScalarImpl::Float32(3.5f32.into())),
746 r#""float""#,
747 Value::Float(3.5f32),
748 );
749
750 test_ok(
751 &DataType::Float64,
752 Some(ScalarImpl::Float64(4.25f64.into())),
753 r#""double""#,
754 Value::Double(4.25f64),
755 );
756
757 test_ok(
758 &DataType::Int32,
759 Some(ScalarImpl::Int32(16)),
760 r#""int""#,
761 Value::Int(16),
762 );
763
764 test_ok(
765 &DataType::Int64,
766 Some(ScalarImpl::Int64(i64::MAX)),
767 r#""long""#,
768 Value::Long(i64::MAX),
769 );
770
771 test_ok(
772 &DataType::Serial,
773 Some(ScalarImpl::Serial(i64::MAX.into())),
774 r#""long""#,
775 Value::Long(i64::MAX),
776 );
777
778 let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
779 test_ok(
780 &DataType::Timestamptz,
781 Some(ScalarImpl::Timestamptz(tstz)),
782 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
783 Value::TimestampMicros(tstz.timestamp_micros()),
784 );
785 test_ok(
786 &DataType::Timestamptz,
787 Some(ScalarImpl::Timestamptz(tstz)),
788 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
789 Value::TimestampMillis(tstz.timestamp_millis()),
790 );
791
792 test_ok(
793 &DataType::Date,
794 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
795 r#"{"type": "int", "logicalType": "date"}"#,
796 Value::Date(1),
797 );
798
799 let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
800 test_ok(
801 &DataType::Time,
802 Some(ScalarImpl::Time(tm)),
803 r#"{"type": "long", "logicalType": "time-micros"}"#,
804 Value::TimeMicros(1000 * 1_000_000),
805 );
806 test_ok(
807 &DataType::Time,
808 Some(ScalarImpl::Time(tm)),
809 r#"{"type": "int", "logicalType": "time-millis"}"#,
810 Value::TimeMillis(1000 * 1000),
811 );
812
813 test_ok(
814 &DataType::Int16,
815 Some(ScalarImpl::Int16(i16::MAX)),
816 r#""int""#,
817 Value::Int(i16::MAX as i32),
818 );
819
820 test_ok(
821 &DataType::Int16,
822 Some(ScalarImpl::Int16(i16::MIN)),
823 r#""int""#,
824 Value::Int(i16::MIN as i32),
825 );
826
827 test_ok(
828 &DataType::Jsonb,
829 Some(ScalarImpl::Jsonb(
830 JsonbVal::from_str(r#"{"a": 1}"#).unwrap(),
831 )),
832 r#""string""#,
833 Value::String(r#"{"a": 1}"#.into()),
834 );
835
836 test_ok(
837 &DataType::Interval,
838 Some(ScalarImpl::Interval(Interval::from_month_day_usec(
839 13, 2, 1000000,
840 ))),
841 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
842 Value::Duration(apache_avro::Duration::new(
843 apache_avro::Months::new(13),
844 apache_avro::Days::new(2),
845 apache_avro::Millis::new(1000),
846 )),
847 );
848
849 let mut inner_map_array_builder = MapArrayBuilder::with_type(
850 2,
851 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
852 );
853 inner_map_array_builder.append(Some(
854 MapValue::try_from_kv(
855 ListValue::from_iter(["a", "b"]),
856 ListValue::from_iter([1, 2]),
857 )
858 .unwrap()
859 .as_scalar_ref(),
860 ));
861 inner_map_array_builder.append(Some(
862 MapValue::try_from_kv(
863 ListValue::from_iter(["c", "d"]),
864 ListValue::from_iter([3, 4]),
865 )
866 .unwrap()
867 .as_scalar_ref(),
868 ));
869 let inner_map_array = inner_map_array_builder.finish();
870 test_ok(
871 &DataType::Map(MapType::from_kv(
872 DataType::Varchar,
873 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
874 )),
875 Some(ScalarImpl::Map(
876 MapValue::try_from_kv(
877 ListValue::from_iter(["k1", "k2"]),
878 ListValue::new(inner_map_array.into()),
879 )
880 .unwrap(),
881 )),
882 r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
883 Value::Map(HashMap::from_iter([
884 (
885 "k1".into(),
886 Value::Map(HashMap::from_iter([
887 ("a".into(), Value::Int(1)),
888 ("b".into(), Value::Int(2)),
889 ])),
890 ),
891 (
892 "k2".into(),
893 Value::Map(HashMap::from_iter([
894 ("c".into(), Value::Int(3)),
895 ("d".into(), Value::Int(4)),
896 ])),
897 ),
898 ])),
899 );
900
901 test_ok(
902 &DataType::Struct(StructType::new(vec![
903 (
904 "p",
905 DataType::Struct(StructType::new(vec![
906 ("x", DataType::Int32),
907 ("y", DataType::Int32),
908 ])),
909 ),
910 (
911 "q",
912 DataType::Struct(StructType::new(vec![
913 ("x", DataType::Int32),
914 ("y", DataType::Int32),
915 ])),
916 ),
917 ])),
918 Some(ScalarImpl::Struct(StructValue::new(vec![
919 Some(ScalarImpl::Struct(StructValue::new(vec![
920 Some(ScalarImpl::Int32(-2)),
921 Some(ScalarImpl::Int32(-1)),
922 ]))),
923 Some(ScalarImpl::Struct(StructValue::new(vec![
924 Some(ScalarImpl::Int32(2)),
925 Some(ScalarImpl::Int32(1)),
926 ]))),
927 ]))),
928 r#"{
929 "type": "record",
930 "name": "Segment",
931 "fields": [
932 {
933 "name": "p",
934 "type": {
935 "type": "record",
936 "name": "Point",
937 "fields": [
938 {
939 "name": "x",
940 "type": "int"
941 },
942 {
943 "name": "y",
944 "type": "int"
945 }
946 ]
947 }
948 },
949 {
950 "name": "q",
951 "type": "Point"
952 }
953 ]
954 }"#,
955 Value::Record(vec![
956 (
957 "p".to_owned(),
958 Value::Record(vec![
959 ("x".to_owned(), Value::Int(-2)),
960 ("y".to_owned(), Value::Int(-1)),
961 ]),
962 ),
963 (
964 "q".to_owned(),
965 Value::Record(vec![
966 ("x".to_owned(), Value::Int(2)),
967 ("y".to_owned(), Value::Int(1)),
968 ]),
969 ),
970 ]),
971 );
972
973 let complex_json = r#"{
975 "person": {
976 "name": "John Doe",
977 "age": 30,
978 "address": {
979 "street": "123 Main St.",
980 "city": "New York",
981 "coordinates": [40.7128, -74.0060]
982 },
983 "contacts": [
984 {"type": "email", "value": "john@example.com"},
985 {"type": "phone", "value": "+1-555-123-4567"}
986 ],
987 "active": true,
988 "preferences": {
989 "notifications": true,
990 "theme": "dark",
991 "languages": ["en", "es"],
992 "lastLogin": null
993 },
994 "tags": ["premium", "verified"],
995 "unicode_test": "Hello, δΈη! π"
996 }
997 }"#;
998
999 let input_json = JsonbVal::from_str(complex_json).unwrap();
1000 let result = on_field(
1001 &DataType::Jsonb,
1002 Some(ScalarImpl::Jsonb(input_json.clone())).to_datum_ref(),
1003 &AvroSchema::parse_str(r#""string""#).unwrap(),
1004 &NamesRef::new(&AvroSchema::parse_str(r#""string""#).unwrap()).unwrap(),
1005 )
1006 .unwrap();
1007
1008 if let Value::String(result_str) = result {
1010 let expected_json: serde_json::Value = serde_json::from_str(complex_json).unwrap();
1011 let actual_json: serde_json::Value = serde_json::from_str(&result_str).unwrap();
1012 assert_eq!(
1013 expected_json, actual_json,
1014 "JSON values should be equivalent regardless of key order"
1015 );
1016 } else {
1017 panic!("Expected String value");
1018 };
1019 }
1020
1021 #[test]
1022 fn test_encode_avro_err() {
1023 test_err(
1024 &DataType::Interval,
1025 Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
1026 -1,
1027 -1,
1028 i64::MAX,
1029 ))),
1030 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
1031 "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
1032 );
1033
1034 let avro_schema = AvroSchema::parse_str(
1035 r#"{"type": "record", "name": "Root", "fields": [
1036 {"name": "f0", "type": "int"}
1037 ]}"#,
1038 )
1039 .unwrap();
1040 let mut record = Record::new(&avro_schema).unwrap();
1041 record.put("f0", Value::String("2".into()));
1042 let res: SinkResult<Vec<u8>> = AvroEncoded {
1043 value: Value::from(record),
1044 schema: Arc::new(avro_schema),
1045 header: AvroHeader::ConfluentSchemaRegistry(42),
1046 }
1047 .ser_to();
1048 assert_eq!(
1049 res.unwrap_err().to_string(),
1050 "Encode error: Value does not match schema"
1051 );
1052 }
1053
1054 #[test]
1055 fn test_encode_avro_record() {
1056 let avro_schema = AvroSchema::parse_str(
1057 r#"{
1058 "type": "record",
1059 "name": "Root",
1060 "fields": [
1061 {"name": "req", "type": "int"},
1062 {"name": "opt", "type": ["null", "long"]}
1063 ]
1064 }"#,
1065 )
1066 .unwrap();
1067 let avro_schema = Arc::new(avro_schema);
1068 let header = AvroHeader::None;
1069
1070 let schema = Schema::new(vec![
1071 Field::with_name(DataType::Int64, "opt"),
1072 Field::with_name(DataType::Int32, "req"),
1073 ]);
1074 let row = OwnedRow::new(vec![
1075 Some(ScalarImpl::Int64(31)),
1076 Some(ScalarImpl::Int32(15)),
1077 ]);
1078 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1079 let actual = encoder.encode(row).unwrap();
1080 assert_eq!(
1081 actual.value,
1082 Value::Record(vec![
1083 ("req".into(), Value::Int(15)),
1084 ("opt".into(), Value::Union(1, Value::Long(31).into())),
1085 ])
1086 );
1087
1088 let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
1089 let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
1090 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1091 let actual = encoder.encode(row).unwrap();
1092 assert_eq!(
1093 actual.value,
1094 Value::Record(vec![
1095 ("req".into(), Value::Int(15)),
1096 ("opt".into(), Value::Union(0, Value::Null.into())),
1097 ])
1098 );
1099
1100 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1101 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1102 panic!()
1103 };
1104 assert_eq!(
1105 err.to_string(),
1106 "Encode error: encode 'req' error: field not present but required"
1107 );
1108
1109 let schema = Schema::new(vec![
1110 Field::with_name(DataType::Int64, "opt"),
1111 Field::with_name(DataType::Int32, "req"),
1112 Field::with_name(DataType::Varchar, "extra"),
1113 ]);
1114 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1115 panic!()
1116 };
1117 assert_eq!(
1118 err.to_string(),
1119 "Encode error: encode 'extra' error: field not in avro"
1120 );
1121
1122 let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
1123 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1124 let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
1125 panic!()
1126 };
1127 assert_eq!(
1128 err.to_string(),
1129 r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
1130 );
1131
1132 test_err(
1133 &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
1134 (),
1135 r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
1136 "encode 'f0' error: cannot encode boolean column as \"int\" field",
1137 );
1138 }
1139
1140 #[test]
1141 fn test_encode_avro_array() {
1142 let avro_schema = r#"{
1143 "type": "array",
1144 "items": "int"
1145 }"#;
1146
1147 test_ok(
1148 &DataType::List(DataType::Int32.into()),
1149 Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
1150 avro_schema,
1151 Value::Array(vec![Value::Int(4), Value::Int(5)]),
1152 );
1153
1154 test_err(
1155 &DataType::List(DataType::Int32.into()),
1156 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
1157 avro_schema,
1158 "encode '' error: found null but required",
1159 );
1160
1161 test_ok(
1162 &DataType::List(DataType::Int32.into()),
1163 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
1164 r#"{
1165 "type": "array",
1166 "items": ["null", "int"]
1167 }"#,
1168 Value::Array(vec![
1169 Value::Union(1, Value::Int(4).into()),
1170 Value::Union(0, Value::Null.into()),
1171 ]),
1172 );
1173
1174 test_ok(
1175 &DataType::List(DataType::List(DataType::Int32.into()).into()),
1176 Some(ScalarImpl::List(ListValue::from_iter([
1177 ListValue::from_iter([26, 29]),
1178 ListValue::from_iter([46, 49]),
1179 ]))),
1180 r#"{
1181 "type": "array",
1182 "items": {
1183 "type": "array",
1184 "items": "int"
1185 }
1186 }"#,
1187 Value::Array(vec![
1188 Value::Array(vec![Value::Int(26), Value::Int(29)]),
1189 Value::Array(vec![Value::Int(46), Value::Int(49)]),
1190 ]),
1191 );
1192
1193 test_err(
1194 &DataType::List(DataType::Boolean.into()),
1195 (),
1196 r#"{"type": "array", "items": "int"}"#,
1197 "encode '' error: cannot encode boolean column as \"int\" field",
1198 );
1199 }
1200
1201 #[test]
1202 fn test_encode_avro_union() {
1203 let t = &DataType::Timestamptz;
1204 let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
1205 let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
1206 let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
1207 let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
1208 let empty = "[]";
1209 let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
1210 let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
1211
1212 test_ok(
1213 t,
1214 datum.clone(),
1215 opt_micros,
1216 Value::Union(1, Value::TimestampMicros(1500).into()),
1217 );
1218 test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
1219 test_ok(
1220 t,
1221 datum.clone(),
1222 opt_millis,
1223 Value::Union(1, Value::TimestampMillis(1).into()),
1224 );
1225 test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
1226
1227 test_err(
1228 t,
1229 datum.to_datum_ref(),
1230 both,
1231 r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
1232 );
1233
1234 test_err(
1235 t,
1236 datum.to_datum_ref(),
1237 empty,
1238 "encode '' error: cannot encode timestamp with time zone column as [] field",
1239 );
1240
1241 test_ok(
1242 t,
1243 datum.clone(),
1244 one,
1245 Value::Union(0, Value::TimestampMillis(1).into()),
1246 );
1247 test_err(t, None, one, "encode '' error: found null but required");
1248
1249 test_ok(
1250 t,
1251 datum.clone(),
1252 right,
1253 Value::Union(0, Value::TimestampMillis(1).into()),
1254 );
1255 test_ok(t, None, right, Value::Union(1, Value::Null.into()));
1256 }
1257
1258 #[test]
1261 fn test_encode_avro_lib_bug() {
1262 use apache_avro::{Reader, Writer};
1263
1264 let avro_schema = AvroSchema::parse_str(
1266 r#"{
1267 "type": "record",
1268 "name": "Root",
1269 "fields": [
1270 {
1271 "name": "f0",
1272 "type": ["null", "int"]
1273 },
1274 {
1275 "name": "f1",
1276 "type": ["null", "int"]
1277 }
1278 ]
1279 }"#,
1280 )
1281 .unwrap();
1282
1283 let mut writer = Writer::new(&avro_schema, Vec::new());
1284 let mut record = Record::new(writer.schema()).unwrap();
1285 record.put("f1", Value::Int(3));
1287 writer.append(record).unwrap();
1288 let encoded = writer.into_inner().unwrap();
1289 let reader = Reader::new(encoded.as_slice()).unwrap();
1291 for value in reader {
1292 assert_eq!(
1293 value.unwrap_err().to_string(),
1294 "Union index 3 out of bounds: 2"
1295 );
1296 }
1297
1298 let mut writer = Writer::new(&avro_schema, Vec::new());
1299 let mut record = Record::new(writer.schema()).unwrap();
1300 record.put("f1", Value::Union(1, Value::Int(3).into()));
1302 writer.append(record).unwrap();
1303 let encoded = writer.into_inner().unwrap();
1304 let reader = Reader::new(encoded.as_slice()).unwrap();
1306 for value in reader {
1307 assert_eq!(
1308 value.unwrap(),
1309 Value::Record(vec![
1310 ("f0".into(), Value::Union(1, Value::Int(3).into())),
1311 ("f1".into(), Value::Union(0, Value::Null.into())),
1312 ])
1313 );
1314 }
1315 }
1316}