1use std::collections::HashMap;
16use std::sync::Arc;
17
18use apache_avro::schema::{Name, RecordSchema, Schema as AvroSchema};
19use apache_avro::types::{Record, Value};
20use risingwave_common::array::VECTOR_ITEM_TYPE;
21use risingwave_common::catalog::Schema;
22use risingwave_common::row::Row;
23use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType};
24use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
25use risingwave_connector_codec::decoder::utils::rust_decimal_to_scaled_bigint;
26use thiserror_ext::AsReport;
27
28use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
29
30type Result<T> = std::result::Result<T, FieldEncodeError>;
31struct NamesRef(HashMap<Name, AvroSchema>);
32
33pub struct AvroEncoder {
34 schema: Schema,
35 col_indices: Option<Vec<usize>>,
36 avro_schema: Arc<AvroSchema>,
37 refs: NamesRef,
38 header: AvroHeader,
39}
40
41#[derive(Debug, Clone, Copy)]
42pub enum AvroHeader {
43 None,
44 SingleObject,
49 ContainerFile,
55 ConfluentSchemaRegistry(i32),
60 GlueSchemaRegistry(uuid::Uuid),
66}
67
68impl AvroEncoder {
69 pub fn new(
70 schema: Schema,
71 col_indices: Option<Vec<usize>>,
72 avro_schema: Arc<AvroSchema>,
73 header: AvroHeader,
74 ) -> SinkResult<Self> {
75 let refs = NamesRef::new(&avro_schema)?;
76 match &col_indices {
77 Some(col_indices) => validate_fields(
78 col_indices.iter().map(|idx| {
79 let f = &schema[*idx];
80 (f.name.as_str(), &f.data_type)
81 }),
82 &avro_schema,
83 &refs,
84 )?,
85 None => validate_fields(
86 schema
87 .fields
88 .iter()
89 .map(|f| (f.name.as_str(), &f.data_type)),
90 &avro_schema,
91 &refs,
92 )?,
93 };
94
95 Ok(Self {
96 schema,
97 col_indices,
98 avro_schema,
99 refs,
100 header,
101 })
102 }
103}
104
105impl NamesRef {
106 fn new(root: &AvroSchema) -> std::result::Result<Self, apache_avro::Error> {
107 let resolved = apache_avro::schema::ResolvedSchema::try_from(root)?;
108 let refs = resolved
109 .get_names()
110 .iter()
111 .map(|(k, v)| (k.to_owned(), (*v).to_owned()))
112 .collect();
113 Ok(Self(refs))
114 }
115
116 fn lookup<'a>(&'a self, avro: &'a AvroSchema) -> &'a AvroSchema {
117 match avro {
118 AvroSchema::Ref { name } => &self.0[name],
119 _ => avro,
120 }
121 }
122}
123
124pub struct AvroEncoded {
125 value: Value,
126 schema: Arc<AvroSchema>,
127 header: AvroHeader,
128}
129
130impl RowEncoder for AvroEncoder {
131 type Output = AvroEncoded;
132
133 fn schema(&self) -> &Schema {
134 &self.schema
135 }
136
137 fn col_indices(&self) -> Option<&[usize]> {
138 self.col_indices.as_deref()
139 }
140
141 fn encode_cols(
142 &self,
143 row: impl Row,
144 col_indices: impl Iterator<Item = usize>,
145 ) -> SinkResult<Self::Output> {
146 let record = encode_fields(
147 col_indices.map(|idx| {
148 let f = &self.schema[idx];
149 ((f.name.as_str(), &f.data_type), row.datum_at(idx))
150 }),
151 &self.avro_schema,
152 &self.refs,
153 )?;
154 Ok(AvroEncoded {
155 value: record.into(),
156 schema: self.avro_schema.clone(),
157 header: self.header,
158 })
159 }
160}
161
162impl SerTo<Vec<u8>> for AvroEncoded {
163 fn ser_to(self) -> SinkResult<Vec<u8>> {
164 use bytes::BufMut as _;
165
166 let header = match self.header {
167 AvroHeader::ConfluentSchemaRegistry(schema_id) => {
168 let mut buf = Vec::with_capacity(1 + 4);
169 buf.put_u8(0);
170 buf.put_i32(schema_id);
171 buf
172 }
173 AvroHeader::GlueSchemaRegistry(schema_version_id) => {
174 let mut buf = Vec::with_capacity(1 + 1 + 16);
175 buf.put_u8(3);
176 buf.put_u8(0);
177 buf.put_slice(schema_version_id.as_bytes());
178 buf
179 }
180 AvroHeader::None | AvroHeader::SingleObject | AvroHeader::ContainerFile => {
181 return Err(crate::sink::SinkError::Encode(format!(
182 "{:?} unsupported yet",
183 self.header
184 )));
185 }
186 };
187
188 let raw = apache_avro::to_avro_datum(&self.schema, self.value)
189 .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
190 let mut buf = Vec::with_capacity(header.len() + raw.len());
191 buf.put_slice(&header);
192 buf.put_slice(&raw);
193
194 Ok(buf)
195 }
196}
197
198enum OptIdx {
199 NotUnion,
201 Single,
203 NullLeft,
205 NullRight,
207}
208
209trait MaybeData: std::fmt::Debug {
215 type Out;
216
217 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
218
219 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
221
222 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
223
224 fn on_map(
225 self,
226 value_type: &DataType,
227 avro_value_schema: &AvroSchema,
228 refs: &NamesRef,
229 ) -> Result<Self::Out>;
230
231 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
232}
233
234impl MaybeData for () {
235 type Out = ();
236
237 fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
238 Ok(self)
239 }
240
241 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
242 validate_fields(st.iter(), avro, refs)
243 }
244
245 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
246 on_field(elem, (), avro, refs)
247 }
248
249 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
250 on_field(elem, (), avro, refs)
251 }
252
253 fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
254 Ok(out)
255 }
256}
257
258impl MaybeData for DatumRef<'_> {
259 type Out = Value;
260
261 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
262 match self {
263 Some(s) => f(s),
264 None => Ok(Value::Null),
265 }
266 }
267
268 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
269 let d = match self {
270 Some(s) => s.into_struct(),
271 None => return Ok(Value::Null),
272 };
273 let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro, refs)?;
274 Ok(record.into())
275 }
276
277 fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
278 let d = match self {
279 Some(s) => s.into_list(),
280 None => return Ok(Value::Null),
281 };
282 let vs = d
283 .iter()
284 .map(|d| on_field(elem, d, avro, refs))
285 .try_collect()?;
286 Ok(Value::Array(vs))
287 }
288
289 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
290 let d = match self {
291 Some(s) => s.into_map(),
292 None => return Ok(Value::Null),
293 };
294 let vs = d
295 .iter()
296 .map(|(k, v)| {
297 let k = k.into_utf8().to_owned();
298 let v = on_field(elem, v, avro, refs)?;
299 Ok((k, v))
300 })
301 .try_collect()?;
302 Ok(Value::Map(vs))
303 }
304
305 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
306 use OptIdx::*;
307
308 match out == Value::Null {
309 true => {
310 let ni = match opt_idx {
311 NotUnion | Single => {
312 return Err(FieldEncodeError::new("found null but required"));
313 }
314 NullLeft => 0,
315 NullRight => 1,
316 };
317 Ok(Value::Union(ni, out.into()))
318 }
319 false => {
320 let vi = match opt_idx {
321 NotUnion => return Ok(out),
322 NullLeft => 1,
323 Single | NullRight => 0,
324 };
325 Ok(Value::Union(vi, out.into()))
326 }
327 }
328 }
329}
330
331fn validate_fields<'rw>(
332 rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
333 avro: &AvroSchema,
334 refs: &NamesRef,
335) -> Result<()> {
336 let avro = refs.lookup(avro);
337 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
338 return Err(FieldEncodeError::new(format!(
339 "expect avro record but got {}",
340 avro.canonical_form(),
341 )));
342 };
343 let mut present = vec![false; fields.len()];
344 for (name, t) in rw_fields {
345 let Some(&idx) = lookup.get(name) else {
346 return Err(FieldEncodeError::new("field not in avro").with_name(name));
347 };
348 present[idx] = true;
349 let avro_field = &fields[idx];
350 on_field(t, (), &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
351 }
352 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
353 if p {
354 continue;
355 }
356 if !avro_field.is_nullable() {
357 return Err(
358 FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
359 );
360 }
361 }
362 Ok(())
363}
364
365fn encode_fields<'avro, 'rw>(
366 fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
367 schema: &'avro AvroSchema,
368 refs: &'avro NamesRef,
369) -> Result<Record<'avro>> {
370 let schema = refs.lookup(schema);
371 let mut record = Record::new(schema).unwrap();
372 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
373 unreachable!()
374 };
375 let mut present = vec![false; fields.len()];
376 for ((name, t), d) in fields_with_datums {
377 let idx = lookup[name];
378 present[idx] = true;
379 let avro_field = &fields[idx];
380 let value = on_field(t, d, &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
381 record.put(name, value);
382 }
383 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
387 if p {
388 continue;
389 }
390 let AvroSchema::Union(u) = &avro_field.schema else {
391 unreachable!()
392 };
393 let ni = u
397 .variants()
398 .iter()
399 .position(|a| a == &AvroSchema::Null)
400 .unwrap();
401 record.put(
402 &avro_field.name,
403 Value::Union(ni.try_into().unwrap(), Value::Null.into()),
404 );
405 }
406 Ok(record)
407}
408
409fn on_field<D: MaybeData>(
412 data_type: &DataType,
413 maybe: D,
414 expected: &AvroSchema,
415 refs: &NamesRef,
416) -> Result<D::Out> {
417 use risingwave_common::types::Interval;
418
419 let no_match_err = || {
420 Err(FieldEncodeError::new(format!(
421 "cannot encode {} column as {} field",
422 data_type,
423 expected.canonical_form()
424 )))
425 };
426
427 let (inner, opt_idx) = match expected {
430 AvroSchema::Union(union) => match union.variants() {
431 [] => return no_match_err(),
432 [one] => (one, OptIdx::Single),
433 [AvroSchema::Null, r] => (r, OptIdx::NullLeft),
434 [l, AvroSchema::Null] => (l, OptIdx::NullRight),
435 _ => return no_match_err(),
436 },
437 _ => (expected, OptIdx::NotUnion),
438 };
439
440 let inner = refs.lookup(inner);
441
442 let value = match &data_type {
443 DataType::Boolean => match inner {
445 AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
446 _ => return no_match_err(),
447 },
448 DataType::Varchar => match inner {
449 AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
450 _ => return no_match_err(),
451 },
452 DataType::Bytea => match inner {
453 AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
454 _ => return no_match_err(),
455 },
456 DataType::Float32 => match inner {
457 AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
458 _ => return no_match_err(),
459 },
460 DataType::Float64 => match inner {
461 AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
462 _ => return no_match_err(),
463 },
464 DataType::Int32 => match inner {
465 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
466 _ => return no_match_err(),
467 },
468 DataType::Int64 => match inner {
469 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
470 _ => return no_match_err(),
471 },
472 DataType::Serial => match inner {
473 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
474 _ => return no_match_err(),
475 },
476 DataType::Struct(st) => match inner {
477 AvroSchema::Record { .. } => maybe.on_struct(st, inner, refs)?,
478 _ => return no_match_err(),
479 },
480 DataType::List(elem) => match inner {
481 AvroSchema::Array(avro_elem) => maybe.on_list(elem, avro_elem, refs)?,
482 _ => return no_match_err(),
483 },
484 DataType::Map(m) => {
485 if *m.key() != DataType::Varchar {
486 return no_match_err();
487 }
488 match inner {
489 AvroSchema::Map(avro_value_type) => {
490 maybe.on_map(m.value(), avro_value_type, refs)?
491 }
492 _ => return no_match_err(),
493 }
494 }
495
496 DataType::Timestamptz => match inner {
498 AvroSchema::TimestampMicros => maybe.on_base(|s| {
499 Ok(Value::TimestampMicros(
500 s.into_timestamptz().timestamp_micros(),
501 ))
502 })?,
503 AvroSchema::TimestampMillis => maybe.on_base(|s| {
504 Ok(Value::TimestampMillis(
505 s.into_timestamptz().timestamp_millis(),
506 ))
507 })?,
508 _ => return no_match_err(),
509 },
510 DataType::Timestamp => return no_match_err(),
511 DataType::Date => match inner {
512 AvroSchema::Date => {
513 maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
514 }
515 _ => return no_match_err(),
516 },
517 DataType::Time => match inner {
518 AvroSchema::TimeMicros => {
519 maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
520 }
521 AvroSchema::TimeMillis => maybe.on_base(|s| {
522 Ok(Value::TimeMillis(
523 (Interval::from(s.into_time()).usecs() / 1000)
524 .try_into()
525 .unwrap(),
526 ))
527 })?,
528 _ => return no_match_err(),
529 },
530 DataType::Interval => match inner {
531 AvroSchema::Duration => maybe.on_base(|s| {
532 use apache_avro::{Days, Duration, Millis, Months};
533 let iv = s.into_interval();
534
535 let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
536
537 Ok(Value::Duration(Duration::new(
538 Months::new(iv.months().try_into().map_err(overflow)?),
539 Days::new(iv.days().try_into().map_err(overflow)?),
540 Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
541 )))
542 })?,
543 _ => return no_match_err(),
544 },
545 DataType::Int16 => match inner {
547 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int16() as i32)))?,
548 _ => return no_match_err(),
549 },
550 DataType::Decimal => match inner {
551 AvroSchema::Decimal(decimal_schema) => {
552 maybe.on_base(|s| {
553 match s.into_decimal() {
554 risingwave_common::types::Decimal::Normalized(decimal) => {
555 let signed_bigint_bytes =
562 rust_decimal_to_scaled_bigint(decimal, decimal_schema.scale)
563 .map_err(FieldEncodeError::new)?;
564 Ok(Value::Decimal(apache_avro::Decimal::from(
565 &signed_bigint_bytes,
566 )))
567 }
568 d @ risingwave_common::types::Decimal::NaN
569 | d @ risingwave_common::types::Decimal::NegativeInf
570 | d @ risingwave_common::types::Decimal::PositiveInf => {
571 Err(FieldEncodeError::new(format!(
572 "Avro Decimal does not support NaN or Inf, but got {}",
573 d
574 )))
575 }
576 }
577 })?
578 }
579 _ => return no_match_err(),
580 },
581 DataType::Jsonb => match inner {
582 AvroSchema::String => {
583 maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?
584 }
585 _ => return no_match_err(),
586 },
587 DataType::Vector(_) => match inner {
588 AvroSchema::Array(avro_elem) => maybe.on_list(&VECTOR_ITEM_TYPE, avro_elem, refs)?,
589 _ => return no_match_err(),
590 },
591 DataType::Int256 => {
593 return no_match_err();
594 }
595 };
596
597 D::handle_nullable_union(value, opt_idx)
598}
599
600#[cfg(test)]
601mod tests {
602 use std::collections::HashMap;
603 use std::str::FromStr;
604
605 use expect_test::expect;
606 use itertools::Itertools;
607 use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
608 use risingwave_common::catalog::Field;
609 use risingwave_common::row::OwnedRow;
610 use risingwave_common::types::{
611 Date, Datum, Interval, JsonbVal, ListValue, MapType, MapValue, Scalar, ScalarImpl,
612 StructValue, Time, Timestamptz, ToDatumRef,
613 };
614
615 use super::*;
616
617 #[track_caller]
618 fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
619 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
620 let refs = NamesRef::new(&avro_schema).unwrap();
621 let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema, &refs).unwrap();
622 assert_eq!(actual, expected);
623 }
624
625 #[track_caller]
626 fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
627 where
628 D::Out: std::fmt::Debug,
629 {
630 let avro_schema = AvroSchema::parse_str(avro).unwrap();
631 let refs = NamesRef::new(&avro_schema).unwrap();
632 let err = on_field(t, d, &avro_schema, &refs).unwrap_err();
633 assert_eq!(err.to_string(), expected);
634 }
635
636 #[track_caller]
637 fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
638 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
639 let refs = NamesRef::new(&avro_schema).unwrap();
640 let rw_type = DataType::from_str(rw_type).unwrap();
641 let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
642
643 if let Err(validate_err) = on_field(&rw_type, (), &avro_schema, &refs) {
644 expected.assert_debug_eq(&validate_err);
645 return;
646 }
647 let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema, &refs);
648 match actual {
649 Ok(v) => expected.assert_eq(&print_avro_value(&v)),
650 Err(e) => expected.assert_debug_eq(&e),
651 }
652 }
653
654 fn print_avro_value(v: &Value) -> String {
655 match v {
656 Value::Map(m) => {
657 let mut res = "Map({".to_owned();
658 for (k, v) in m.iter().sorted_by_key(|x| x.0) {
659 res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
660 }
661 res.push_str("})");
662 res
663 }
664 _ => format!("{v:?}"),
665 }
666 }
667
668 #[test]
669 fn test_encode_v2() {
670 test_v2(
671 "boolean",
672 "false",
673 r#""int""#,
674 expect![[r#"
675 FieldEncodeError {
676 message: "cannot encode boolean column as \"int\" field",
677 rev_path: [],
678 }
679 "#]],
680 );
681 test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
682
683 test_v2(
684 "map(varchar,varchar)",
685 "{1:1,2:2,3:3}",
686 r#"{"type": "map","values": "string"}"#,
687 expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
688 );
689
690 test_v2(
691 "map(varchar,varchar)",
692 "{1:1,2:NULL,3:3}",
693 r#"{"type": "map","values": "string"}"#,
694 expect![[r#"
695 FieldEncodeError {
696 message: "found null but required",
697 rev_path: [],
698 }
699 "#]],
700 );
701
702 test_v2(
703 "map(varchar,varchar)",
704 "{1:1,2:NULL,3:3}",
705 r#"{"type": "map","values": ["null", "string"]}"#,
706 expect![[
707 r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
708 ]],
709 );
710
711 test_v2(
712 "map(int,varchar)",
713 "{1:1,2:NULL,3:3}",
714 r#"{"type": "map","values": ["null", "string"]}"#,
715 expect![[r#"
716 FieldEncodeError {
717 message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
718 rev_path: [],
719 }
720 "#]],
721 );
722 }
723
724 #[test]
725 fn test_encode_avro_ok() {
726 test_ok(
727 &DataType::Boolean,
728 Some(ScalarImpl::Bool(false)),
729 r#""boolean""#,
730 Value::Boolean(false),
731 );
732
733 test_ok(
734 &DataType::Varchar,
735 Some(ScalarImpl::Utf8("RisingWave".into())),
736 r#""string""#,
737 Value::String("RisingWave".into()),
738 );
739
740 test_ok(
741 &DataType::Bytea,
742 Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
743 r#""bytes""#,
744 Value::Bytes([0xbe, 0xef].into()),
745 );
746
747 test_ok(
748 &DataType::Float32,
749 Some(ScalarImpl::Float32(3.5f32.into())),
750 r#""float""#,
751 Value::Float(3.5f32),
752 );
753
754 test_ok(
755 &DataType::Float64,
756 Some(ScalarImpl::Float64(4.25f64.into())),
757 r#""double""#,
758 Value::Double(4.25f64),
759 );
760
761 test_ok(
762 &DataType::Int32,
763 Some(ScalarImpl::Int32(16)),
764 r#""int""#,
765 Value::Int(16),
766 );
767
768 test_ok(
769 &DataType::Int64,
770 Some(ScalarImpl::Int64(i64::MAX)),
771 r#""long""#,
772 Value::Long(i64::MAX),
773 );
774
775 test_ok(
776 &DataType::Serial,
777 Some(ScalarImpl::Serial(i64::MAX.into())),
778 r#""long""#,
779 Value::Long(i64::MAX),
780 );
781
782 let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
783 test_ok(
784 &DataType::Timestamptz,
785 Some(ScalarImpl::Timestamptz(tstz)),
786 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
787 Value::TimestampMicros(tstz.timestamp_micros()),
788 );
789 test_ok(
790 &DataType::Timestamptz,
791 Some(ScalarImpl::Timestamptz(tstz)),
792 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
793 Value::TimestampMillis(tstz.timestamp_millis()),
794 );
795
796 test_ok(
797 &DataType::Date,
798 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
799 r#"{"type": "int", "logicalType": "date"}"#,
800 Value::Date(1),
801 );
802
803 let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
804 test_ok(
805 &DataType::Time,
806 Some(ScalarImpl::Time(tm)),
807 r#"{"type": "long", "logicalType": "time-micros"}"#,
808 Value::TimeMicros(1000 * 1_000_000),
809 );
810 test_ok(
811 &DataType::Time,
812 Some(ScalarImpl::Time(tm)),
813 r#"{"type": "int", "logicalType": "time-millis"}"#,
814 Value::TimeMillis(1000 * 1000),
815 );
816
817 test_ok(
818 &DataType::Int16,
819 Some(ScalarImpl::Int16(i16::MAX)),
820 r#""int""#,
821 Value::Int(i16::MAX as i32),
822 );
823
824 test_ok(
825 &DataType::Int16,
826 Some(ScalarImpl::Int16(i16::MIN)),
827 r#""int""#,
828 Value::Int(i16::MIN as i32),
829 );
830
831 test_ok(
832 &DataType::Jsonb,
833 Some(ScalarImpl::Jsonb(
834 JsonbVal::from_str(r#"{"a": 1}"#).unwrap(),
835 )),
836 r#""string""#,
837 Value::String(r#"{"a": 1}"#.into()),
838 );
839
840 test_ok(
841 &DataType::Interval,
842 Some(ScalarImpl::Interval(Interval::from_month_day_usec(
843 13, 2, 1000000,
844 ))),
845 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
846 Value::Duration(apache_avro::Duration::new(
847 apache_avro::Months::new(13),
848 apache_avro::Days::new(2),
849 apache_avro::Millis::new(1000),
850 )),
851 );
852
853 let mut inner_map_array_builder = MapArrayBuilder::with_type(
854 2,
855 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
856 );
857 inner_map_array_builder.append(Some(
858 MapValue::try_from_kv(
859 ListValue::from_iter(["a", "b"]),
860 ListValue::from_iter([1, 2]),
861 )
862 .unwrap()
863 .as_scalar_ref(),
864 ));
865 inner_map_array_builder.append(Some(
866 MapValue::try_from_kv(
867 ListValue::from_iter(["c", "d"]),
868 ListValue::from_iter([3, 4]),
869 )
870 .unwrap()
871 .as_scalar_ref(),
872 ));
873 let inner_map_array = inner_map_array_builder.finish();
874 test_ok(
875 &DataType::Map(MapType::from_kv(
876 DataType::Varchar,
877 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
878 )),
879 Some(ScalarImpl::Map(
880 MapValue::try_from_kv(
881 ListValue::from_iter(["k1", "k2"]),
882 ListValue::new(inner_map_array.into()),
883 )
884 .unwrap(),
885 )),
886 r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
887 Value::Map(HashMap::from_iter([
888 (
889 "k1".into(),
890 Value::Map(HashMap::from_iter([
891 ("a".into(), Value::Int(1)),
892 ("b".into(), Value::Int(2)),
893 ])),
894 ),
895 (
896 "k2".into(),
897 Value::Map(HashMap::from_iter([
898 ("c".into(), Value::Int(3)),
899 ("d".into(), Value::Int(4)),
900 ])),
901 ),
902 ])),
903 );
904
905 test_ok(
906 &DataType::Struct(StructType::new(vec![
907 (
908 "p",
909 DataType::Struct(StructType::new(vec![
910 ("x", DataType::Int32),
911 ("y", DataType::Int32),
912 ])),
913 ),
914 (
915 "q",
916 DataType::Struct(StructType::new(vec![
917 ("x", DataType::Int32),
918 ("y", DataType::Int32),
919 ])),
920 ),
921 ])),
922 Some(ScalarImpl::Struct(StructValue::new(vec![
923 Some(ScalarImpl::Struct(StructValue::new(vec![
924 Some(ScalarImpl::Int32(-2)),
925 Some(ScalarImpl::Int32(-1)),
926 ]))),
927 Some(ScalarImpl::Struct(StructValue::new(vec![
928 Some(ScalarImpl::Int32(2)),
929 Some(ScalarImpl::Int32(1)),
930 ]))),
931 ]))),
932 r#"{
933 "type": "record",
934 "name": "Segment",
935 "fields": [
936 {
937 "name": "p",
938 "type": {
939 "type": "record",
940 "name": "Point",
941 "fields": [
942 {
943 "name": "x",
944 "type": "int"
945 },
946 {
947 "name": "y",
948 "type": "int"
949 }
950 ]
951 }
952 },
953 {
954 "name": "q",
955 "type": "Point"
956 }
957 ]
958 }"#,
959 Value::Record(vec![
960 (
961 "p".to_owned(),
962 Value::Record(vec![
963 ("x".to_owned(), Value::Int(-2)),
964 ("y".to_owned(), Value::Int(-1)),
965 ]),
966 ),
967 (
968 "q".to_owned(),
969 Value::Record(vec![
970 ("x".to_owned(), Value::Int(2)),
971 ("y".to_owned(), Value::Int(1)),
972 ]),
973 ),
974 ]),
975 );
976
977 let complex_json = r#"{
979 "person": {
980 "name": "John Doe",
981 "age": 30,
982 "address": {
983 "street": "123 Main St.",
984 "city": "New York",
985 "coordinates": [40.7128, -74.0060]
986 },
987 "contacts": [
988 {"type": "email", "value": "john@example.com"},
989 {"type": "phone", "value": "+1-555-123-4567"}
990 ],
991 "active": true,
992 "preferences": {
993 "notifications": true,
994 "theme": "dark",
995 "languages": ["en", "es"],
996 "lastLogin": null
997 },
998 "tags": ["premium", "verified"],
999 "unicode_test": "Hello, δΈη! π"
1000 }
1001 }"#;
1002
1003 let input_json = JsonbVal::from_str(complex_json).unwrap();
1004 let result = on_field(
1005 &DataType::Jsonb,
1006 Some(ScalarImpl::Jsonb(input_json.clone())).to_datum_ref(),
1007 &AvroSchema::parse_str(r#""string""#).unwrap(),
1008 &NamesRef::new(&AvroSchema::parse_str(r#""string""#).unwrap()).unwrap(),
1009 )
1010 .unwrap();
1011
1012 if let Value::String(result_str) = result {
1014 let expected_json: serde_json::Value = serde_json::from_str(complex_json).unwrap();
1015 let actual_json: serde_json::Value = serde_json::from_str(&result_str).unwrap();
1016 assert_eq!(
1017 expected_json, actual_json,
1018 "JSON values should be equivalent regardless of key order"
1019 );
1020 } else {
1021 panic!("Expected String value");
1022 };
1023 }
1024
1025 #[test]
1026 fn test_encode_avro_err() {
1027 test_err(
1028 &DataType::Interval,
1029 Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
1030 -1,
1031 -1,
1032 i64::MAX,
1033 ))),
1034 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
1035 "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
1036 );
1037
1038 let avro_schema = AvroSchema::parse_str(
1039 r#"{"type": "record", "name": "Root", "fields": [
1040 {"name": "f0", "type": "int"}
1041 ]}"#,
1042 )
1043 .unwrap();
1044 let mut record = Record::new(&avro_schema).unwrap();
1045 record.put("f0", Value::String("2".into()));
1046 let res: SinkResult<Vec<u8>> = AvroEncoded {
1047 value: Value::from(record),
1048 schema: Arc::new(avro_schema),
1049 header: AvroHeader::ConfluentSchemaRegistry(42),
1050 }
1051 .ser_to();
1052 assert_eq!(
1053 res.unwrap_err().to_string(),
1054 "Encode error: Value does not match schema"
1055 );
1056 }
1057
1058 #[test]
1059 fn test_encode_avro_record() {
1060 let avro_schema = AvroSchema::parse_str(
1061 r#"{
1062 "type": "record",
1063 "name": "Root",
1064 "fields": [
1065 {"name": "req", "type": "int"},
1066 {"name": "opt", "type": ["null", "long"]}
1067 ]
1068 }"#,
1069 )
1070 .unwrap();
1071 let avro_schema = Arc::new(avro_schema);
1072 let header = AvroHeader::None;
1073
1074 let schema = Schema::new(vec![
1075 Field::with_name(DataType::Int64, "opt"),
1076 Field::with_name(DataType::Int32, "req"),
1077 ]);
1078 let row = OwnedRow::new(vec![
1079 Some(ScalarImpl::Int64(31)),
1080 Some(ScalarImpl::Int32(15)),
1081 ]);
1082 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1083 let actual = encoder.encode(row).unwrap();
1084 assert_eq!(
1085 actual.value,
1086 Value::Record(vec![
1087 ("req".into(), Value::Int(15)),
1088 ("opt".into(), Value::Union(1, Value::Long(31).into())),
1089 ])
1090 );
1091
1092 let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
1093 let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
1094 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1095 let actual = encoder.encode(row).unwrap();
1096 assert_eq!(
1097 actual.value,
1098 Value::Record(vec![
1099 ("req".into(), Value::Int(15)),
1100 ("opt".into(), Value::Union(0, Value::Null.into())),
1101 ])
1102 );
1103
1104 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1105 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1106 panic!()
1107 };
1108 assert_eq!(
1109 err.to_string(),
1110 "Encode error: encode 'req' error: field not present but required"
1111 );
1112
1113 let schema = Schema::new(vec![
1114 Field::with_name(DataType::Int64, "opt"),
1115 Field::with_name(DataType::Int32, "req"),
1116 Field::with_name(DataType::Varchar, "extra"),
1117 ]);
1118 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1119 panic!()
1120 };
1121 assert_eq!(
1122 err.to_string(),
1123 "Encode error: encode 'extra' error: field not in avro"
1124 );
1125
1126 let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
1127 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1128 let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
1129 panic!()
1130 };
1131 assert_eq!(
1132 err.to_string(),
1133 r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
1134 );
1135
1136 test_err(
1137 &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
1138 (),
1139 r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
1140 "encode 'f0' error: cannot encode boolean column as \"int\" field",
1141 );
1142 }
1143
1144 #[test]
1145 fn test_encode_avro_array() {
1146 let avro_schema = r#"{
1147 "type": "array",
1148 "items": "int"
1149 }"#;
1150
1151 test_ok(
1152 &DataType::List(DataType::Int32.into()),
1153 Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
1154 avro_schema,
1155 Value::Array(vec![Value::Int(4), Value::Int(5)]),
1156 );
1157
1158 test_err(
1159 &DataType::List(DataType::Int32.into()),
1160 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
1161 avro_schema,
1162 "encode '' error: found null but required",
1163 );
1164
1165 test_ok(
1166 &DataType::List(DataType::Int32.into()),
1167 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
1168 r#"{
1169 "type": "array",
1170 "items": ["null", "int"]
1171 }"#,
1172 Value::Array(vec![
1173 Value::Union(1, Value::Int(4).into()),
1174 Value::Union(0, Value::Null.into()),
1175 ]),
1176 );
1177
1178 test_ok(
1179 &DataType::List(DataType::List(DataType::Int32.into()).into()),
1180 Some(ScalarImpl::List(ListValue::from_iter([
1181 ListValue::from_iter([26, 29]),
1182 ListValue::from_iter([46, 49]),
1183 ]))),
1184 r#"{
1185 "type": "array",
1186 "items": {
1187 "type": "array",
1188 "items": "int"
1189 }
1190 }"#,
1191 Value::Array(vec![
1192 Value::Array(vec![Value::Int(26), Value::Int(29)]),
1193 Value::Array(vec![Value::Int(46), Value::Int(49)]),
1194 ]),
1195 );
1196
1197 test_err(
1198 &DataType::List(DataType::Boolean.into()),
1199 (),
1200 r#"{"type": "array", "items": "int"}"#,
1201 "encode '' error: cannot encode boolean column as \"int\" field",
1202 );
1203 }
1204
1205 #[test]
1206 fn test_encode_avro_union() {
1207 let t = &DataType::Timestamptz;
1208 let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
1209 let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
1210 let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
1211 let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
1212 let empty = "[]";
1213 let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
1214 let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
1215
1216 test_ok(
1217 t,
1218 datum.clone(),
1219 opt_micros,
1220 Value::Union(1, Value::TimestampMicros(1500).into()),
1221 );
1222 test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
1223 test_ok(
1224 t,
1225 datum.clone(),
1226 opt_millis,
1227 Value::Union(1, Value::TimestampMillis(1).into()),
1228 );
1229 test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
1230
1231 test_err(
1232 t,
1233 datum.to_datum_ref(),
1234 both,
1235 r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
1236 );
1237
1238 test_err(
1239 t,
1240 datum.to_datum_ref(),
1241 empty,
1242 "encode '' error: cannot encode timestamp with time zone column as [] field",
1243 );
1244
1245 test_ok(
1246 t,
1247 datum.clone(),
1248 one,
1249 Value::Union(0, Value::TimestampMillis(1).into()),
1250 );
1251 test_err(t, None, one, "encode '' error: found null but required");
1252
1253 test_ok(
1254 t,
1255 datum.clone(),
1256 right,
1257 Value::Union(0, Value::TimestampMillis(1).into()),
1258 );
1259 test_ok(t, None, right, Value::Union(1, Value::Null.into()));
1260 }
1261
1262 #[test]
1265 fn test_encode_avro_lib_bug() {
1266 use apache_avro::{Reader, Writer};
1267
1268 let avro_schema = AvroSchema::parse_str(
1270 r#"{
1271 "type": "record",
1272 "name": "Root",
1273 "fields": [
1274 {
1275 "name": "f0",
1276 "type": ["null", "int"]
1277 },
1278 {
1279 "name": "f1",
1280 "type": ["null", "int"]
1281 }
1282 ]
1283 }"#,
1284 )
1285 .unwrap();
1286
1287 let mut writer = Writer::new(&avro_schema, Vec::new());
1288 let mut record = Record::new(writer.schema()).unwrap();
1289 record.put("f1", Value::Int(3));
1291 writer.append(record).unwrap();
1292 let encoded = writer.into_inner().unwrap();
1293 let reader = Reader::new(encoded.as_slice()).unwrap();
1295 for value in reader {
1296 assert_eq!(
1297 value.unwrap_err().to_string(),
1298 "Union index 3 out of bounds: 2"
1299 );
1300 }
1301
1302 let mut writer = Writer::new(&avro_schema, Vec::new());
1303 let mut record = Record::new(writer.schema()).unwrap();
1304 record.put("f1", Value::Union(1, Value::Int(3).into()));
1306 writer.append(record).unwrap();
1307 let encoded = writer.into_inner().unwrap();
1308 let reader = Reader::new(encoded.as_slice()).unwrap();
1310 for value in reader {
1311 assert_eq!(
1312 value.unwrap(),
1313 Value::Record(vec![
1314 ("f0".into(), Value::Union(1, Value::Int(3).into())),
1315 ("f1".into(), Value::Union(0, Value::Null.into())),
1316 ])
1317 );
1318 }
1319 }
1320}