1use std::collections::HashMap;
16use std::sync::Arc;
17
18use apache_avro::schema::{Name, RecordSchema, Schema as AvroSchema};
19use apache_avro::types::{Record, Value};
20use risingwave_common::array::VECTOR_AS_LIST_TYPE;
21use risingwave_common::catalog::Schema;
22use risingwave_common::row::Row;
23use risingwave_common::types::{DataType, DatumRef, ListType, ScalarRefImpl, StructType};
24use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
25use risingwave_connector_codec::decoder::utils::rust_decimal_to_scaled_bigint;
26use thiserror_ext::AsReport;
27
28use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
29
30type Result<T> = std::result::Result<T, FieldEncodeError>;
31struct NamesRef(HashMap<Name, AvroSchema>);
32
33pub struct AvroEncoder {
34 schema: Schema,
35 col_indices: Option<Vec<usize>>,
36 avro_schema: Arc<AvroSchema>,
37 refs: NamesRef,
38 header: AvroHeader,
39}
40
41#[derive(Debug, Clone, Copy)]
42pub enum AvroHeader {
43 None,
44 SingleObject,
49 ContainerFile,
55 ConfluentSchemaRegistry(i32),
60 GlueSchemaRegistry(uuid::Uuid),
66}
67
68impl AvroEncoder {
69 pub fn new(
70 schema: Schema,
71 col_indices: Option<Vec<usize>>,
72 avro_schema: Arc<AvroSchema>,
73 header: AvroHeader,
74 ) -> SinkResult<Self> {
75 let refs = NamesRef::new(&avro_schema)?;
76 match &col_indices {
77 Some(col_indices) => validate_fields(
78 col_indices.iter().map(|idx| {
79 let f = &schema[*idx];
80 (f.name.as_str(), &f.data_type)
81 }),
82 &avro_schema,
83 &refs,
84 )?,
85 None => validate_fields(
86 schema
87 .fields
88 .iter()
89 .map(|f| (f.name.as_str(), &f.data_type)),
90 &avro_schema,
91 &refs,
92 )?,
93 };
94
95 Ok(Self {
96 schema,
97 col_indices,
98 avro_schema,
99 refs,
100 header,
101 })
102 }
103}
104
105impl NamesRef {
106 fn new(root: &AvroSchema) -> std::result::Result<Self, apache_avro::Error> {
107 let resolved = apache_avro::schema::ResolvedSchema::try_from(root)?;
108 let refs = resolved
109 .get_names()
110 .iter()
111 .map(|(k, v)| (k.to_owned(), (*v).to_owned()))
112 .collect();
113 Ok(Self(refs))
114 }
115
116 fn lookup<'a>(&'a self, avro: &'a AvroSchema) -> &'a AvroSchema {
117 match avro {
118 AvroSchema::Ref { name } => &self.0[name],
119 _ => avro,
120 }
121 }
122}
123
124pub struct AvroEncoded {
125 value: Value,
126 schema: Arc<AvroSchema>,
127 header: AvroHeader,
128}
129
130impl RowEncoder for AvroEncoder {
131 type Output = AvroEncoded;
132
133 fn schema(&self) -> &Schema {
134 &self.schema
135 }
136
137 fn col_indices(&self) -> Option<&[usize]> {
138 self.col_indices.as_deref()
139 }
140
141 fn encode_cols(
142 &self,
143 row: impl Row,
144 col_indices: impl Iterator<Item = usize>,
145 ) -> SinkResult<Self::Output> {
146 let record = encode_fields(
147 col_indices.map(|idx| {
148 let f = &self.schema[idx];
149 ((f.name.as_str(), &f.data_type), row.datum_at(idx))
150 }),
151 &self.avro_schema,
152 &self.refs,
153 )?;
154 Ok(AvroEncoded {
155 value: record.into(),
156 schema: self.avro_schema.clone(),
157 header: self.header,
158 })
159 }
160}
161
162impl SerTo<Vec<u8>> for AvroEncoded {
163 fn ser_to(self) -> SinkResult<Vec<u8>> {
164 use bytes::BufMut as _;
165
166 let header = match self.header {
167 AvroHeader::ConfluentSchemaRegistry(schema_id) => {
168 let mut buf = Vec::with_capacity(1 + 4);
169 buf.put_u8(0);
170 buf.put_i32(schema_id);
171 buf
172 }
173 AvroHeader::GlueSchemaRegistry(schema_version_id) => {
174 let mut buf = Vec::with_capacity(1 + 1 + 16);
175 buf.put_u8(3);
176 buf.put_u8(0);
177 buf.put_slice(schema_version_id.as_bytes());
178 buf
179 }
180 AvroHeader::None | AvroHeader::SingleObject | AvroHeader::ContainerFile => {
181 return Err(crate::sink::SinkError::Encode(format!(
182 "{:?} unsupported yet",
183 self.header
184 )));
185 }
186 };
187
188 let raw = apache_avro::to_avro_datum(&self.schema, self.value)
189 .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
190 let mut buf = Vec::with_capacity(header.len() + raw.len());
191 buf.put_slice(&header);
192 buf.put_slice(&raw);
193
194 Ok(buf)
195 }
196}
197
198enum OptIdx {
199 NotUnion,
201 Single,
203 NullLeft,
205 NullRight,
207}
208
209trait MaybeData: std::fmt::Debug {
215 type Out;
216
217 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
218
219 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
221
222 fn on_list(self, lt: &ListType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
223
224 fn on_map(
225 self,
226 value_type: &DataType,
227 avro_value_schema: &AvroSchema,
228 refs: &NamesRef,
229 ) -> Result<Self::Out>;
230
231 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
232}
233
234impl MaybeData for () {
235 type Out = ();
236
237 fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
238 Ok(self)
239 }
240
241 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
242 validate_fields(st.iter(), avro, refs)
243 }
244
245 fn on_list(self, lt: &ListType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
246 on_field(lt.elem(), (), avro, refs)
247 }
248
249 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
250 on_field(elem, (), avro, refs)
251 }
252
253 fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
254 Ok(out)
255 }
256}
257
258impl MaybeData for DatumRef<'_> {
259 type Out = Value;
260
261 fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
262 match self {
263 Some(s) => f(s),
264 None => Ok(Value::Null),
265 }
266 }
267
268 fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
269 let d = match self {
270 Some(s) => s.into_struct(),
271 None => return Ok(Value::Null),
272 };
273 let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro, refs)?;
274 Ok(record.into())
275 }
276
277 fn on_list(self, lt: &ListType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
278 let d = match self {
279 Some(s) => s.into_list(),
280 None => return Ok(Value::Null),
281 };
282 let vs = d
283 .iter()
284 .map(|d| on_field(lt.elem(), d, avro, refs))
285 .try_collect()?;
286 Ok(Value::Array(vs))
287 }
288
289 fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
290 let d = match self {
291 Some(s) => s.into_map(),
292 None => return Ok(Value::Null),
293 };
294 let vs = d
295 .iter()
296 .map(|(k, v)| {
297 let k = k.into_utf8().to_owned();
298 let v = on_field(elem, v, avro, refs)?;
299 Ok((k, v))
300 })
301 .try_collect()?;
302 Ok(Value::Map(vs))
303 }
304
305 fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
306 use OptIdx::*;
307
308 match out == Value::Null {
309 true => {
310 let ni = match opt_idx {
311 NotUnion | Single => {
312 return Err(FieldEncodeError::new("found null but required"));
313 }
314 NullLeft => 0,
315 NullRight => 1,
316 };
317 Ok(Value::Union(ni, out.into()))
318 }
319 false => {
320 let vi = match opt_idx {
321 NotUnion => return Ok(out),
322 NullLeft => 1,
323 Single | NullRight => 0,
324 };
325 Ok(Value::Union(vi, out.into()))
326 }
327 }
328 }
329}
330
331fn validate_fields<'rw>(
332 rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
333 avro: &AvroSchema,
334 refs: &NamesRef,
335) -> Result<()> {
336 let avro = refs.lookup(avro);
337 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
338 return Err(FieldEncodeError::new(format!(
339 "expect avro record but got {}",
340 avro.canonical_form(),
341 )));
342 };
343 let mut present = vec![false; fields.len()];
344 for (name, t) in rw_fields {
345 let Some(&idx) = lookup.get(name) else {
346 return Err(FieldEncodeError::new("field not in avro").with_name(name));
347 };
348 present[idx] = true;
349 let avro_field = &fields[idx];
350 on_field(t, (), &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
351 }
352 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
353 if p {
354 continue;
355 }
356 if !avro_field.is_nullable() {
357 return Err(
358 FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
359 );
360 }
361 }
362 Ok(())
363}
364
365fn encode_fields<'avro, 'rw>(
366 fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
367 schema: &'avro AvroSchema,
368 refs: &'avro NamesRef,
369) -> Result<Record<'avro>> {
370 let schema = refs.lookup(schema);
371 let mut record = Record::new(schema).unwrap();
372 let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
373 unreachable!()
374 };
375 let mut present = vec![false; fields.len()];
376 for ((name, t), d) in fields_with_datums {
377 let idx = lookup[name];
378 present[idx] = true;
379 let avro_field = &fields[idx];
380 let value = on_field(t, d, &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
381 record.put(name, value);
382 }
383 for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
387 if p {
388 continue;
389 }
390 let AvroSchema::Union(u) = &avro_field.schema else {
391 unreachable!()
392 };
393 let ni = u
397 .variants()
398 .iter()
399 .position(|a| a == &AvroSchema::Null)
400 .unwrap();
401 record.put(
402 &avro_field.name,
403 Value::Union(ni.try_into().unwrap(), Value::Null.into()),
404 );
405 }
406 Ok(record)
407}
408
409fn on_field<D: MaybeData>(
412 data_type: &DataType,
413 maybe: D,
414 expected: &AvroSchema,
415 refs: &NamesRef,
416) -> Result<D::Out> {
417 use risingwave_common::types::Interval;
418
419 let no_match_err = || {
420 Err(FieldEncodeError::new(format!(
421 "cannot encode {} column as {} field",
422 data_type,
423 expected.canonical_form()
424 )))
425 };
426
427 let (inner, opt_idx) = match expected {
430 AvroSchema::Union(union) => match union.variants() {
431 [] => return no_match_err(),
432 [one] => (one, OptIdx::Single),
433 [AvroSchema::Null, r] => (r, OptIdx::NullLeft),
434 [l, AvroSchema::Null] => (l, OptIdx::NullRight),
435 _ => return no_match_err(),
436 },
437 _ => (expected, OptIdx::NotUnion),
438 };
439
440 let inner = refs.lookup(inner);
441
442 let value = match &data_type {
443 DataType::Boolean => match inner {
445 AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
446 _ => return no_match_err(),
447 },
448 DataType::Varchar => match inner {
449 AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
450
451 AvroSchema::Enum(enum_schema) => maybe.on_base(|s| {
453 let str_value = s.into_utf8();
454
455 if let Some(position) = enum_schema
456 .symbols
457 .iter()
458 .position(|symbol| symbol == str_value)
459 {
460 Ok(Value::Enum(position as u32, str_value.to_owned()))
461 } else {
462 Err(FieldEncodeError::new(format!(
463 "Value '{}' is not a valid enum symbol. Valid symbols are: {:?}",
464 str_value, enum_schema.symbols
465 )))
466 }
467 })?,
468
469 _ => return no_match_err(),
470 },
471 DataType::Bytea => match inner {
472 AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
473 _ => return no_match_err(),
474 },
475 DataType::Float32 => match inner {
476 AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
477 _ => return no_match_err(),
478 },
479 DataType::Float64 => match inner {
480 AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
481 _ => return no_match_err(),
482 },
483 DataType::Int32 => match inner {
484 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
485 _ => return no_match_err(),
486 },
487 DataType::Int64 => match inner {
488 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
489 _ => return no_match_err(),
490 },
491 DataType::Serial => match inner {
492 AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
493 _ => return no_match_err(),
494 },
495 DataType::Struct(st) => match inner {
496 AvroSchema::Record { .. } => maybe.on_struct(st, inner, refs)?,
497 _ => return no_match_err(),
498 },
499 DataType::List(lt) => match inner {
500 AvroSchema::Array(avro_elem) => maybe.on_list(lt, avro_elem, refs)?,
501 _ => return no_match_err(),
502 },
503 DataType::Map(m) => {
504 if *m.key() != DataType::Varchar {
505 return no_match_err();
506 }
507 match inner {
508 AvroSchema::Map(avro_value_type) => {
509 maybe.on_map(m.value(), avro_value_type, refs)?
510 }
511 _ => return no_match_err(),
512 }
513 }
514
515 DataType::Timestamptz => match inner {
517 AvroSchema::TimestampMicros => maybe.on_base(|s| {
518 Ok(Value::TimestampMicros(
519 s.into_timestamptz().timestamp_micros(),
520 ))
521 })?,
522 AvroSchema::TimestampMillis => maybe.on_base(|s| {
523 Ok(Value::TimestampMillis(
524 s.into_timestamptz().timestamp_millis(),
525 ))
526 })?,
527 _ => return no_match_err(),
528 },
529 DataType::Timestamp => return no_match_err(),
530 DataType::Date => match inner {
531 AvroSchema::Date => {
532 maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
533 }
534 _ => return no_match_err(),
535 },
536 DataType::Time => match inner {
537 AvroSchema::TimeMicros => {
538 maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
539 }
540 AvroSchema::TimeMillis => maybe.on_base(|s| {
541 Ok(Value::TimeMillis(
542 (Interval::from(s.into_time()).usecs() / 1000)
543 .try_into()
544 .unwrap(),
545 ))
546 })?,
547 _ => return no_match_err(),
548 },
549 DataType::Interval => match inner {
550 AvroSchema::Duration => maybe.on_base(|s| {
551 use apache_avro::{Days, Duration, Millis, Months};
552 let iv = s.into_interval();
553
554 let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
555
556 Ok(Value::Duration(Duration::new(
557 Months::new(iv.months().try_into().map_err(overflow)?),
558 Days::new(iv.days().try_into().map_err(overflow)?),
559 Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
560 )))
561 })?,
562 _ => return no_match_err(),
563 },
564 DataType::Int16 => match inner {
566 AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int16() as i32)))?,
567 _ => return no_match_err(),
568 },
569 DataType::Decimal => match inner {
570 AvroSchema::Decimal(decimal_schema) => {
571 maybe.on_base(|s| {
572 match s.into_decimal() {
573 risingwave_common::types::Decimal::Normalized(decimal) => {
574 let signed_bigint_bytes =
581 rust_decimal_to_scaled_bigint(decimal, decimal_schema.scale)
582 .map_err(FieldEncodeError::new)?;
583 Ok(Value::Decimal(apache_avro::Decimal::from(
584 &signed_bigint_bytes,
585 )))
586 }
587 d @ risingwave_common::types::Decimal::NaN
588 | d @ risingwave_common::types::Decimal::NegativeInf
589 | d @ risingwave_common::types::Decimal::PositiveInf => {
590 Err(FieldEncodeError::new(format!(
591 "Avro Decimal does not support NaN or Inf, but got {}",
592 d
593 )))
594 }
595 }
596 })?
597 }
598 _ => return no_match_err(),
599 },
600 DataType::Jsonb => match inner {
601 AvroSchema::String => {
602 maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?
603 }
604 _ => return no_match_err(),
605 },
606 DataType::Vector(_) => match inner {
607 AvroSchema::Array(avro_elem) => maybe.on_list(&VECTOR_AS_LIST_TYPE, avro_elem, refs)?,
608 _ => return no_match_err(),
609 },
610 DataType::Int256 => {
612 return no_match_err();
613 }
614 };
615
616 D::handle_nullable_union(value, opt_idx)
617}
618
619#[cfg(test)]
620mod tests {
621 use std::collections::HashMap;
622 use std::str::FromStr;
623
624 use expect_test::expect;
625 use itertools::Itertools;
626 use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
627 use risingwave_common::catalog::Field;
628 use risingwave_common::row::OwnedRow;
629 use risingwave_common::types::{
630 Date, Datum, Interval, JsonbVal, ListValue, MapType, MapValue, Scalar, ScalarImpl,
631 StructValue, Time, Timestamptz, ToDatumRef,
632 };
633
634 use super::*;
635
636 #[track_caller]
637 fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
638 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
639 let refs = NamesRef::new(&avro_schema).unwrap();
640 let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema, &refs).unwrap();
641 assert_eq!(actual, expected);
642 }
643
644 #[track_caller]
645 fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
646 where
647 D::Out: std::fmt::Debug,
648 {
649 let avro_schema = AvroSchema::parse_str(avro).unwrap();
650 let refs = NamesRef::new(&avro_schema).unwrap();
651 let err = on_field(t, d, &avro_schema, &refs).unwrap_err();
652 assert_eq!(err.to_string(), expected);
653 }
654
655 #[track_caller]
656 fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
657 let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
658 let refs = NamesRef::new(&avro_schema).unwrap();
659 let rw_type = DataType::from_str(rw_type).unwrap();
660 let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
661
662 if let Err(validate_err) = on_field(&rw_type, (), &avro_schema, &refs) {
663 expected.assert_debug_eq(&validate_err);
664 return;
665 }
666 let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema, &refs);
667 match actual {
668 Ok(v) => expected.assert_eq(&print_avro_value(&v)),
669 Err(e) => expected.assert_debug_eq(&e),
670 }
671 }
672
673 fn print_avro_value(v: &Value) -> String {
674 match v {
675 Value::Map(m) => {
676 let mut res = "Map({".to_owned();
677 for (k, v) in m.iter().sorted_by_key(|x| x.0) {
678 res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
679 }
680 res.push_str("})");
681 res
682 }
683 _ => format!("{v:?}"),
684 }
685 }
686
687 #[test]
688 fn test_encode_v2() {
689 test_v2(
690 "boolean",
691 "false",
692 r#""int""#,
693 expect![[r#"
694 FieldEncodeError {
695 message: "cannot encode boolean column as \"int\" field",
696 rev_path: [],
697 }
698 "#]],
699 );
700 test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
701
702 test_v2(
703 "map(varchar,varchar)",
704 "{1:1,2:2,3:3}",
705 r#"{"type": "map","values": "string"}"#,
706 expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
707 );
708
709 test_v2(
710 "map(varchar,varchar)",
711 "{1:1,2:NULL,3:3}",
712 r#"{"type": "map","values": "string"}"#,
713 expect![[r#"
714 FieldEncodeError {
715 message: "found null but required",
716 rev_path: [],
717 }
718 "#]],
719 );
720
721 test_v2(
722 "map(varchar,varchar)",
723 "{1:1,2:NULL,3:3}",
724 r#"{"type": "map","values": ["null", "string"]}"#,
725 expect![[
726 r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
727 ]],
728 );
729
730 test_v2(
731 "map(int,varchar)",
732 "{1:1,2:NULL,3:3}",
733 r#"{"type": "map","values": ["null", "string"]}"#,
734 expect![[r#"
735 FieldEncodeError {
736 message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
737 rev_path: [],
738 }
739 "#]],
740 );
741 }
742
743 #[test]
744 fn test_encode_avro_ok() {
745 test_ok(
746 &DataType::Boolean,
747 Some(ScalarImpl::Bool(false)),
748 r#""boolean""#,
749 Value::Boolean(false),
750 );
751
752 test_ok(
753 &DataType::Varchar,
754 Some(ScalarImpl::Utf8("RisingWave".into())),
755 r#""string""#,
756 Value::String("RisingWave".into()),
757 );
758
759 test_ok(
760 &DataType::Bytea,
761 Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
762 r#""bytes""#,
763 Value::Bytes([0xbe, 0xef].into()),
764 );
765
766 test_ok(
767 &DataType::Float32,
768 Some(ScalarImpl::Float32(3.5f32.into())),
769 r#""float""#,
770 Value::Float(3.5f32),
771 );
772
773 test_ok(
774 &DataType::Float64,
775 Some(ScalarImpl::Float64(4.25f64.into())),
776 r#""double""#,
777 Value::Double(4.25f64),
778 );
779
780 test_ok(
781 &DataType::Int32,
782 Some(ScalarImpl::Int32(16)),
783 r#""int""#,
784 Value::Int(16),
785 );
786
787 test_ok(
788 &DataType::Int64,
789 Some(ScalarImpl::Int64(i64::MAX)),
790 r#""long""#,
791 Value::Long(i64::MAX),
792 );
793
794 test_ok(
795 &DataType::Serial,
796 Some(ScalarImpl::Serial(i64::MAX.into())),
797 r#""long""#,
798 Value::Long(i64::MAX),
799 );
800
801 let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
802 test_ok(
803 &DataType::Timestamptz,
804 Some(ScalarImpl::Timestamptz(tstz)),
805 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
806 Value::TimestampMicros(tstz.timestamp_micros()),
807 );
808 test_ok(
809 &DataType::Timestamptz,
810 Some(ScalarImpl::Timestamptz(tstz)),
811 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
812 Value::TimestampMillis(tstz.timestamp_millis()),
813 );
814
815 test_ok(
816 &DataType::Date,
817 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
818 r#"{"type": "int", "logicalType": "date"}"#,
819 Value::Date(1),
820 );
821
822 let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
823 test_ok(
824 &DataType::Time,
825 Some(ScalarImpl::Time(tm)),
826 r#"{"type": "long", "logicalType": "time-micros"}"#,
827 Value::TimeMicros(1000 * 1_000_000),
828 );
829 test_ok(
830 &DataType::Time,
831 Some(ScalarImpl::Time(tm)),
832 r#"{"type": "int", "logicalType": "time-millis"}"#,
833 Value::TimeMillis(1000 * 1000),
834 );
835
836 test_ok(
837 &DataType::Int16,
838 Some(ScalarImpl::Int16(i16::MAX)),
839 r#""int""#,
840 Value::Int(i16::MAX as i32),
841 );
842
843 test_ok(
844 &DataType::Int16,
845 Some(ScalarImpl::Int16(i16::MIN)),
846 r#""int""#,
847 Value::Int(i16::MIN as i32),
848 );
849
850 test_ok(
851 &DataType::Jsonb,
852 Some(ScalarImpl::Jsonb(
853 JsonbVal::from_str(r#"{"a": 1}"#).unwrap(),
854 )),
855 r#""string""#,
856 Value::String(r#"{"a": 1}"#.into()),
857 );
858
859 test_ok(
860 &DataType::Interval,
861 Some(ScalarImpl::Interval(Interval::from_month_day_usec(
862 13, 2, 1000000,
863 ))),
864 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
865 Value::Duration(apache_avro::Duration::new(
866 apache_avro::Months::new(13),
867 apache_avro::Days::new(2),
868 apache_avro::Millis::new(1000),
869 )),
870 );
871
872 let mut inner_map_array_builder = MapArrayBuilder::with_type(
873 2,
874 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
875 );
876 inner_map_array_builder.append(Some(
877 MapValue::try_from_kv(
878 ListValue::from_iter(["a", "b"]),
879 ListValue::from_iter([1, 2]),
880 )
881 .unwrap()
882 .as_scalar_ref(),
883 ));
884 inner_map_array_builder.append(Some(
885 MapValue::try_from_kv(
886 ListValue::from_iter(["c", "d"]),
887 ListValue::from_iter([3, 4]),
888 )
889 .unwrap()
890 .as_scalar_ref(),
891 ));
892 let inner_map_array = inner_map_array_builder.finish();
893 test_ok(
894 &DataType::Map(MapType::from_kv(
895 DataType::Varchar,
896 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
897 )),
898 Some(ScalarImpl::Map(
899 MapValue::try_from_kv(
900 ListValue::from_iter(["k1", "k2"]),
901 ListValue::new(inner_map_array.into()),
902 )
903 .unwrap(),
904 )),
905 r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
906 Value::Map(HashMap::from_iter([
907 (
908 "k1".into(),
909 Value::Map(HashMap::from_iter([
910 ("a".into(), Value::Int(1)),
911 ("b".into(), Value::Int(2)),
912 ])),
913 ),
914 (
915 "k2".into(),
916 Value::Map(HashMap::from_iter([
917 ("c".into(), Value::Int(3)),
918 ("d".into(), Value::Int(4)),
919 ])),
920 ),
921 ])),
922 );
923
924 test_ok(
925 &DataType::Struct(StructType::new(vec![
926 (
927 "p",
928 DataType::Struct(StructType::new(vec![
929 ("x", DataType::Int32),
930 ("y", DataType::Int32),
931 ])),
932 ),
933 (
934 "q",
935 DataType::Struct(StructType::new(vec![
936 ("x", DataType::Int32),
937 ("y", DataType::Int32),
938 ])),
939 ),
940 ])),
941 Some(ScalarImpl::Struct(StructValue::new(vec![
942 Some(ScalarImpl::Struct(StructValue::new(vec![
943 Some(ScalarImpl::Int32(-2)),
944 Some(ScalarImpl::Int32(-1)),
945 ]))),
946 Some(ScalarImpl::Struct(StructValue::new(vec![
947 Some(ScalarImpl::Int32(2)),
948 Some(ScalarImpl::Int32(1)),
949 ]))),
950 ]))),
951 r#"{
952 "type": "record",
953 "name": "Segment",
954 "fields": [
955 {
956 "name": "p",
957 "type": {
958 "type": "record",
959 "name": "Point",
960 "fields": [
961 {
962 "name": "x",
963 "type": "int"
964 },
965 {
966 "name": "y",
967 "type": "int"
968 }
969 ]
970 }
971 },
972 {
973 "name": "q",
974 "type": "Point"
975 }
976 ]
977 }"#,
978 Value::Record(vec![
979 (
980 "p".to_owned(),
981 Value::Record(vec![
982 ("x".to_owned(), Value::Int(-2)),
983 ("y".to_owned(), Value::Int(-1)),
984 ]),
985 ),
986 (
987 "q".to_owned(),
988 Value::Record(vec![
989 ("x".to_owned(), Value::Int(2)),
990 ("y".to_owned(), Value::Int(1)),
991 ]),
992 ),
993 ]),
994 );
995
996 test_ok(
998 &DataType::Varchar,
999 Some(ScalarImpl::Utf8("RED".into())),
1000 r#"{"type": "enum", "name": "Color", "symbols": ["RED", "GREEN", "BLUE"]}"#,
1001 Value::Enum(0, "RED".to_owned()),
1002 );
1003
1004 test_ok(
1005 &DataType::Varchar,
1006 Some(ScalarImpl::Utf8("BLUE".into())),
1007 r#"{"type": "enum", "name": "Color", "symbols": ["RED", "GREEN", "BLUE"]}"#,
1008 Value::Enum(2, "BLUE".to_owned()),
1009 );
1010
1011 test_ok(
1012 &DataType::Varchar,
1013 Some(ScalarImpl::Utf8("ACTIVE".into())),
1014 r#"{"type": "enum", "name": "Status", "symbols": ["ACTIVE", "INACTIVE"]}"#,
1015 Value::Enum(0, "ACTIVE".to_owned()),
1016 );
1017
1018 let complex_json = r#"{
1020 "person": {
1021 "name": "John Doe",
1022 "age": 30,
1023 "address": {
1024 "street": "123 Main St.",
1025 "city": "New York",
1026 "coordinates": [40.7128, -74.0060]
1027 },
1028 "contacts": [
1029 {"type": "email", "value": "john@example.com"},
1030 {"type": "phone", "value": "+1-555-123-4567"}
1031 ],
1032 "active": true,
1033 "preferences": {
1034 "notifications": true,
1035 "theme": "dark",
1036 "languages": ["en", "es"],
1037 "lastLogin": null
1038 },
1039 "tags": ["premium", "verified"],
1040 "unicode_test": "Hello, δΈη! π"
1041 }
1042 }"#;
1043
1044 let input_json = JsonbVal::from_str(complex_json).unwrap();
1045 let result = on_field(
1046 &DataType::Jsonb,
1047 Some(ScalarImpl::Jsonb(input_json.clone())).to_datum_ref(),
1048 &AvroSchema::parse_str(r#""string""#).unwrap(),
1049 &NamesRef::new(&AvroSchema::parse_str(r#""string""#).unwrap()).unwrap(),
1050 )
1051 .unwrap();
1052
1053 if let Value::String(result_str) = result {
1055 let expected_json: serde_json::Value = serde_json::from_str(complex_json).unwrap();
1056 let actual_json: serde_json::Value = serde_json::from_str(&result_str).unwrap();
1057 assert_eq!(
1058 expected_json, actual_json,
1059 "JSON values should be equivalent regardless of key order"
1060 );
1061 } else {
1062 panic!("Expected String value");
1063 };
1064 }
1065
1066 #[test]
1067 fn test_encode_avro_err() {
1068 test_err(
1069 &DataType::Interval,
1070 Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
1071 -1,
1072 -1,
1073 i64::MAX,
1074 ))),
1075 r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
1076 "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
1077 );
1078
1079 let avro_schema = AvroSchema::parse_str(
1080 r#"{"type": "record", "name": "Root", "fields": [
1081 {"name": "f0", "type": "int"}
1082 ]}"#,
1083 )
1084 .unwrap();
1085 let mut record = Record::new(&avro_schema).unwrap();
1086 record.put("f0", Value::String("2".into()));
1087 let res: SinkResult<Vec<u8>> = AvroEncoded {
1088 value: Value::from(record),
1089 schema: Arc::new(avro_schema),
1090 header: AvroHeader::ConfluentSchemaRegistry(42),
1091 }
1092 .ser_to();
1093 assert_eq!(
1094 res.unwrap_err().to_string(),
1095 "Encode error: Value does not match schema"
1096 );
1097 }
1098
1099 #[test]
1100 fn test_encode_avro_record() {
1101 let avro_schema = AvroSchema::parse_str(
1102 r#"{
1103 "type": "record",
1104 "name": "Root",
1105 "fields": [
1106 {"name": "req", "type": "int"},
1107 {"name": "opt", "type": ["null", "long"]}
1108 ]
1109 }"#,
1110 )
1111 .unwrap();
1112 let avro_schema = Arc::new(avro_schema);
1113 let header = AvroHeader::None;
1114
1115 let schema = Schema::new(vec![
1116 Field::with_name(DataType::Int64, "opt"),
1117 Field::with_name(DataType::Int32, "req"),
1118 ]);
1119 let row = OwnedRow::new(vec![
1120 Some(ScalarImpl::Int64(31)),
1121 Some(ScalarImpl::Int32(15)),
1122 ]);
1123 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1124 let actual = encoder.encode(row).unwrap();
1125 assert_eq!(
1126 actual.value,
1127 Value::Record(vec![
1128 ("req".into(), Value::Int(15)),
1129 ("opt".into(), Value::Union(1, Value::Long(31).into())),
1130 ])
1131 );
1132
1133 let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
1134 let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
1135 let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1136 let actual = encoder.encode(row).unwrap();
1137 assert_eq!(
1138 actual.value,
1139 Value::Record(vec![
1140 ("req".into(), Value::Int(15)),
1141 ("opt".into(), Value::Union(0, Value::Null.into())),
1142 ])
1143 );
1144
1145 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1146 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1147 panic!()
1148 };
1149 assert_eq!(
1150 err.to_string(),
1151 "Encode error: encode 'req' error: field not present but required"
1152 );
1153
1154 let schema = Schema::new(vec![
1155 Field::with_name(DataType::Int64, "opt"),
1156 Field::with_name(DataType::Int32, "req"),
1157 Field::with_name(DataType::Varchar, "extra"),
1158 ]);
1159 let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1160 panic!()
1161 };
1162 assert_eq!(
1163 err.to_string(),
1164 "Encode error: encode 'extra' error: field not in avro"
1165 );
1166
1167 let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
1168 let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1169 let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
1170 panic!()
1171 };
1172 assert_eq!(
1173 err.to_string(),
1174 r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
1175 );
1176
1177 test_err(
1178 &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
1179 (),
1180 r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
1181 "encode 'f0' error: cannot encode boolean column as \"int\" field",
1182 );
1183 }
1184
1185 #[test]
1186 fn test_encode_avro_array() {
1187 let avro_schema = r#"{
1188 "type": "array",
1189 "items": "int"
1190 }"#;
1191
1192 test_ok(
1193 &DataType::Int32.list(),
1194 Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
1195 avro_schema,
1196 Value::Array(vec![Value::Int(4), Value::Int(5)]),
1197 );
1198
1199 test_err(
1200 &DataType::Int32.list(),
1201 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
1202 avro_schema,
1203 "encode '' error: found null but required",
1204 );
1205
1206 test_ok(
1207 &DataType::Int32.list(),
1208 Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
1209 r#"{
1210 "type": "array",
1211 "items": ["null", "int"]
1212 }"#,
1213 Value::Array(vec![
1214 Value::Union(1, Value::Int(4).into()),
1215 Value::Union(0, Value::Null.into()),
1216 ]),
1217 );
1218
1219 test_ok(
1220 &DataType::Int32.list().list(),
1221 Some(ScalarImpl::List(ListValue::from_iter([
1222 ListValue::from_iter([26, 29]),
1223 ListValue::from_iter([46, 49]),
1224 ]))),
1225 r#"{
1226 "type": "array",
1227 "items": {
1228 "type": "array",
1229 "items": "int"
1230 }
1231 }"#,
1232 Value::Array(vec![
1233 Value::Array(vec![Value::Int(26), Value::Int(29)]),
1234 Value::Array(vec![Value::Int(46), Value::Int(49)]),
1235 ]),
1236 );
1237
1238 test_err(
1239 &DataType::Boolean.list(),
1240 (),
1241 r#"{"type": "array", "items": "int"}"#,
1242 "encode '' error: cannot encode boolean column as \"int\" field",
1243 );
1244 }
1245
1246 #[test]
1247 fn test_encode_avro_union() {
1248 let t = &DataType::Timestamptz;
1249 let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
1250 let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
1251 let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
1252 let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
1253 let empty = "[]";
1254 let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
1255 let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
1256
1257 test_ok(
1258 t,
1259 datum.clone(),
1260 opt_micros,
1261 Value::Union(1, Value::TimestampMicros(1500).into()),
1262 );
1263 test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
1264 test_ok(
1265 t,
1266 datum.clone(),
1267 opt_millis,
1268 Value::Union(1, Value::TimestampMillis(1).into()),
1269 );
1270 test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
1271
1272 test_err(
1273 t,
1274 datum.to_datum_ref(),
1275 both,
1276 r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
1277 );
1278
1279 test_err(
1280 t,
1281 datum.to_datum_ref(),
1282 empty,
1283 "encode '' error: cannot encode timestamp with time zone column as [] field",
1284 );
1285
1286 test_ok(
1287 t,
1288 datum.clone(),
1289 one,
1290 Value::Union(0, Value::TimestampMillis(1).into()),
1291 );
1292 test_err(t, None, one, "encode '' error: found null but required");
1293
1294 test_ok(
1295 t,
1296 datum.clone(),
1297 right,
1298 Value::Union(0, Value::TimestampMillis(1).into()),
1299 );
1300 test_ok(t, None, right, Value::Union(1, Value::Null.into()));
1301 }
1302
1303 #[test]
1306 fn test_encode_avro_lib_bug() {
1307 use apache_avro::{Reader, Writer};
1308
1309 let avro_schema = AvroSchema::parse_str(
1311 r#"{
1312 "type": "record",
1313 "name": "Root",
1314 "fields": [
1315 {
1316 "name": "f0",
1317 "type": ["null", "int"]
1318 },
1319 {
1320 "name": "f1",
1321 "type": ["null", "int"]
1322 }
1323 ]
1324 }"#,
1325 )
1326 .unwrap();
1327
1328 let mut writer = Writer::new(&avro_schema, Vec::new());
1329 let mut record = Record::new(writer.schema()).unwrap();
1330 record.put("f1", Value::Int(3));
1332 writer.append(record).unwrap();
1333 let encoded = writer.into_inner().unwrap();
1334 let reader = Reader::new(encoded.as_slice()).unwrap();
1336 for value in reader {
1337 assert_eq!(
1338 value.unwrap_err().to_string(),
1339 "Union index 3 out of bounds: 2"
1340 );
1341 }
1342
1343 let mut writer = Writer::new(&avro_schema, Vec::new());
1344 let mut record = Record::new(writer.schema()).unwrap();
1345 record.put("f1", Value::Union(1, Value::Int(3).into()));
1347 writer.append(record).unwrap();
1348 let encoded = writer.into_inner().unwrap();
1349 let reader = Reader::new(encoded.as_slice()).unwrap();
1351 for value in reader {
1352 assert_eq!(
1353 value.unwrap(),
1354 Value::Record(vec![
1355 ("f0".into(), Value::Union(1, Value::Int(3).into())),
1356 ("f1".into(), Value::Union(0, Value::Null.into())),
1357 ])
1358 );
1359 }
1360 }
1361}