use std::sync::Arc;
use apache_avro::schema::{RecordSchema, Schema as AvroSchema};
use apache_avro::types::{Record, Value};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType};
use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
use thiserror_ext::AsReport;
use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
type Result<T> = std::result::Result<T, FieldEncodeError>;
pub struct AvroEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
avro_schema: Arc<AvroSchema>,
header: AvroHeader,
}
#[derive(Debug, Clone, Copy)]
pub enum AvroHeader {
None,
SingleObject,
ContainerFile,
ConfluentSchemaRegistry(i32),
}
impl AvroEncoder {
pub fn new(
schema: Schema,
col_indices: Option<Vec<usize>>,
avro_schema: Arc<AvroSchema>,
header: AvroHeader,
) -> SinkResult<Self> {
match &col_indices {
Some(col_indices) => validate_fields(
col_indices.iter().map(|idx| {
let f = &schema[*idx];
(f.name.as_str(), &f.data_type)
}),
&avro_schema,
)?,
None => validate_fields(
schema
.fields
.iter()
.map(|f| (f.name.as_str(), &f.data_type)),
&avro_schema,
)?,
};
Ok(Self {
schema,
col_indices,
avro_schema,
header,
})
}
}
pub struct AvroEncoded {
value: Value,
schema: Arc<AvroSchema>,
header: AvroHeader,
}
impl RowEncoder for AvroEncoder {
type Output = AvroEncoded;
fn schema(&self) -> &Schema {
&self.schema
}
fn col_indices(&self) -> Option<&[usize]> {
self.col_indices.as_deref()
}
fn encode_cols(
&self,
row: impl Row,
col_indices: impl Iterator<Item = usize>,
) -> SinkResult<Self::Output> {
let record = encode_fields(
col_indices.map(|idx| {
let f = &self.schema[idx];
((f.name.as_str(), &f.data_type), row.datum_at(idx))
}),
&self.avro_schema,
)?;
Ok(AvroEncoded {
value: record.into(),
schema: self.avro_schema.clone(),
header: self.header,
})
}
}
impl SerTo<Vec<u8>> for AvroEncoded {
fn ser_to(self) -> SinkResult<Vec<u8>> {
use bytes::BufMut as _;
let AvroHeader::ConfluentSchemaRegistry(schema_id) = self.header else {
return Err(crate::sink::SinkError::Encode(format!(
"{:?} unsupported yet",
self.header
)));
};
let raw = apache_avro::to_avro_datum(&self.schema, self.value)
.map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
let mut buf = Vec::with_capacity(1 + 4 + raw.len());
buf.put_u8(0);
buf.put_i32(schema_id);
buf.put_slice(&raw);
Ok(buf)
}
}
enum OptIdx {
NotUnion,
Single,
NullLeft,
NullRight,
}
trait MaybeData: std::fmt::Debug {
type Out;
fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
fn on_struct(self, st: &StructType, avro: &AvroSchema) -> Result<Self::Out>;
fn on_list(self, elem: &DataType, avro: &AvroSchema) -> Result<Self::Out>;
fn on_map(self, value_type: &DataType, avro_value_schema: &AvroSchema) -> Result<Self::Out>;
fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
}
impl MaybeData for () {
type Out = ();
fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
Ok(self)
}
fn on_struct(self, st: &StructType, avro: &AvroSchema) -> Result<Self::Out> {
validate_fields(st.iter(), avro)
}
fn on_list(self, elem: &DataType, avro: &AvroSchema) -> Result<Self::Out> {
on_field(elem, (), avro)
}
fn on_map(self, elem: &DataType, avro: &AvroSchema) -> Result<Self::Out> {
on_field(elem, (), avro)
}
fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
Ok(out)
}
}
impl MaybeData for DatumRef<'_> {
type Out = Value;
fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
match self {
Some(s) => f(s),
None => Ok(Value::Null),
}
}
fn on_struct(self, st: &StructType, avro: &AvroSchema) -> Result<Self::Out> {
let d = match self {
Some(s) => s.into_struct(),
None => return Ok(Value::Null),
};
let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro)?;
Ok(record.into())
}
fn on_list(self, elem: &DataType, avro: &AvroSchema) -> Result<Self::Out> {
let d = match self {
Some(s) => s.into_list(),
None => return Ok(Value::Null),
};
let vs = d.iter().map(|d| on_field(elem, d, avro)).try_collect()?;
Ok(Value::Array(vs))
}
fn on_map(self, elem: &DataType, avro: &AvroSchema) -> Result<Self::Out> {
let d = match self {
Some(s) => s.into_map(),
None => return Ok(Value::Null),
};
let vs = d
.iter()
.map(|(k, v)| {
let k = k.into_utf8().to_string();
let v = on_field(elem, v, avro)?;
Ok((k, v))
})
.try_collect()?;
Ok(Value::Map(vs))
}
fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
use OptIdx::*;
match out == Value::Null {
true => {
let ni = match opt_idx {
NotUnion | Single => {
return Err(FieldEncodeError::new("found null but required"))
}
NullLeft => 0,
NullRight => 1,
};
Ok(Value::Union(ni, out.into()))
}
false => {
let vi = match opt_idx {
NotUnion => return Ok(out),
NullLeft => 1,
Single | NullRight => 0,
};
Ok(Value::Union(vi, out.into()))
}
}
}
}
fn validate_fields<'rw>(
rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
avro: &AvroSchema,
) -> Result<()> {
let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
return Err(FieldEncodeError::new(format!(
"expect avro record but got {}",
avro.canonical_form(),
)));
};
let mut present = vec![false; fields.len()];
for (name, t) in rw_fields {
let Some(&idx) = lookup.get(name) else {
return Err(FieldEncodeError::new("field not in avro").with_name(name));
};
present[idx] = true;
let avro_field = &fields[idx];
on_field(t, (), &avro_field.schema).map_err(|e| e.with_name(name))?;
}
for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
if p {
continue;
}
if !avro_field.is_nullable() {
return Err(
FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
);
}
}
Ok(())
}
fn encode_fields<'avro, 'rw>(
fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
schema: &'avro AvroSchema,
) -> Result<Record<'avro>> {
let mut record = Record::new(schema).unwrap();
let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
unreachable!()
};
let mut present = vec![false; fields.len()];
for ((name, t), d) in fields_with_datums {
let idx = lookup[name];
present[idx] = true;
let avro_field = &fields[idx];
let value = on_field(t, d, &avro_field.schema).map_err(|e| e.with_name(name))?;
record.put(name, value);
}
for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
if p {
continue;
}
let AvroSchema::Union(u) = &avro_field.schema else {
unreachable!()
};
let ni = u
.variants()
.iter()
.position(|a| a == &AvroSchema::Null)
.unwrap();
record.put(
&avro_field.name,
Value::Union(ni.try_into().unwrap(), Value::Null.into()),
);
}
Ok(record)
}
fn on_field<D: MaybeData>(data_type: &DataType, maybe: D, expected: &AvroSchema) -> Result<D::Out> {
use risingwave_common::types::Interval;
let no_match_err = || {
Err(FieldEncodeError::new(format!(
"cannot encode {} column as {} field",
data_type,
expected.canonical_form()
)))
};
if let AvroSchema::Ref { .. } = expected {
return Err(FieldEncodeError::new("avro name ref unsupported yet"));
}
let (inner, opt_idx) = match expected {
AvroSchema::Union(union) => match union.variants() {
[] => return no_match_err(),
[one] => (one, OptIdx::Single),
[AvroSchema::Null, r] => (r, OptIdx::NullLeft),
[l, AvroSchema::Null] => (l, OptIdx::NullRight),
_ => return no_match_err(),
},
_ => (expected, OptIdx::NotUnion),
};
let value = match &data_type {
DataType::Boolean => match inner {
AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
_ => return no_match_err(),
},
DataType::Varchar => match inner {
AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
_ => return no_match_err(),
},
DataType::Bytea => match inner {
AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
_ => return no_match_err(),
},
DataType::Float32 => match inner {
AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
_ => return no_match_err(),
},
DataType::Float64 => match inner {
AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
_ => return no_match_err(),
},
DataType::Int32 => match inner {
AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
_ => return no_match_err(),
},
DataType::Int64 => match inner {
AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
_ => return no_match_err(),
},
DataType::Serial => match inner {
AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
_ => return no_match_err(),
},
DataType::Struct(st) => match inner {
AvroSchema::Record { .. } => maybe.on_struct(st, inner)?,
_ => return no_match_err(),
},
DataType::List(elem) => match inner {
AvroSchema::Array(avro_elem) => maybe.on_list(elem, avro_elem)?,
_ => return no_match_err(),
},
DataType::Map(m) => {
if *m.key() != DataType::Varchar {
return no_match_err();
}
match inner {
AvroSchema::Map(avro_value_type) => maybe.on_map(m.value(), avro_value_type)?,
_ => return no_match_err(),
}
}
DataType::Timestamptz => match inner {
AvroSchema::TimestampMicros => maybe.on_base(|s| {
Ok(Value::TimestampMicros(
s.into_timestamptz().timestamp_micros(),
))
})?,
AvroSchema::TimestampMillis => maybe.on_base(|s| {
Ok(Value::TimestampMillis(
s.into_timestamptz().timestamp_millis(),
))
})?,
_ => return no_match_err(),
},
DataType::Timestamp => return no_match_err(),
DataType::Date => match inner {
AvroSchema::Date => {
maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
}
_ => return no_match_err(),
},
DataType::Time => match inner {
AvroSchema::TimeMicros => {
maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
}
AvroSchema::TimeMillis => maybe.on_base(|s| {
Ok(Value::TimeMillis(
(Interval::from(s.into_time()).usecs() / 1000)
.try_into()
.unwrap(),
))
})?,
_ => return no_match_err(),
},
DataType::Interval => match inner {
AvroSchema::Duration => maybe.on_base(|s| {
use apache_avro::{Days, Duration, Millis, Months};
let iv = s.into_interval();
let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
Ok(Value::Duration(Duration::new(
Months::new(iv.months().try_into().map_err(overflow)?),
Days::new(iv.days().try_into().map_err(overflow)?),
Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
)))
})?,
_ => return no_match_err(),
},
DataType::Int16 => return no_match_err(),
DataType::Decimal => return no_match_err(),
DataType::Jsonb => return no_match_err(),
DataType::Int256 => {
return no_match_err();
}
};
D::handle_nullable_union(value, opt_idx)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::str::FromStr;
use expect_test::expect;
use itertools::Itertools;
use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
use risingwave_common::catalog::Field;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{
Date, Datum, Interval, ListValue, MapType, MapValue, Scalar, ScalarImpl, StructValue, Time,
Timestamptz, ToDatumRef,
};
use super::*;
#[track_caller]
fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema).unwrap();
assert_eq!(actual, expected);
}
#[track_caller]
fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
where
D::Out: std::fmt::Debug,
{
let avro_schema = AvroSchema::parse_str(avro).unwrap();
let err = on_field(t, d, &avro_schema).unwrap_err();
assert_eq!(err.to_string(), expected);
}
#[track_caller]
fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
let rw_type = DataType::from_str(rw_type).unwrap();
let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
if let Err(validate_err) = on_field(&rw_type, (), &avro_schema) {
expected.assert_debug_eq(&validate_err);
return;
}
let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema);
match actual {
Ok(v) => expected.assert_eq(&print_avro_value(&v)),
Err(e) => expected.assert_debug_eq(&e),
}
}
fn print_avro_value(v: &Value) -> String {
match v {
Value::Map(m) => {
let mut res = "Map({".to_string();
for (k, v) in m.iter().sorted_by_key(|x| x.0) {
res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
}
res.push_str("})");
res
}
_ => format!("{v:?}"),
}
}
#[test]
fn test_encode_v2() {
test_v2(
"boolean",
"false",
r#""int""#,
expect![[r#"
FieldEncodeError {
message: "cannot encode boolean column as \"int\" field",
rev_path: [],
}
"#]],
);
test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
test_v2(
"map(varchar,varchar)",
"{1:1,2:2,3:3}",
r#"{"type": "map","values": "string"}"#,
expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
);
test_v2(
"map(varchar,varchar)",
"{1:1,2:NULL,3:3}",
r#"{"type": "map","values": "string"}"#,
expect![[r#"
FieldEncodeError {
message: "found null but required",
rev_path: [],
}
"#]],
);
test_v2(
"map(varchar,varchar)",
"{1:1,2:NULL,3:3}",
r#"{"type": "map","values": ["null", "string"]}"#,
expect![[
r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
]],
);
test_v2(
"map(int,varchar)",
"{1:1,2:NULL,3:3}",
r#"{"type": "map","values": ["null", "string"]}"#,
expect![[r#"
FieldEncodeError {
message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
rev_path: [],
}
"#]],
);
}
#[test]
fn test_encode_avro_ok() {
test_ok(
&DataType::Boolean,
Some(ScalarImpl::Bool(false)),
r#""boolean""#,
Value::Boolean(false),
);
test_ok(
&DataType::Varchar,
Some(ScalarImpl::Utf8("RisingWave".into())),
r#""string""#,
Value::String("RisingWave".into()),
);
test_ok(
&DataType::Bytea,
Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
r#""bytes""#,
Value::Bytes([0xbe, 0xef].into()),
);
test_ok(
&DataType::Float32,
Some(ScalarImpl::Float32(3.5f32.into())),
r#""float""#,
Value::Float(3.5f32),
);
test_ok(
&DataType::Float64,
Some(ScalarImpl::Float64(4.25f64.into())),
r#""double""#,
Value::Double(4.25f64),
);
test_ok(
&DataType::Int32,
Some(ScalarImpl::Int32(16)),
r#""int""#,
Value::Int(16),
);
test_ok(
&DataType::Int64,
Some(ScalarImpl::Int64(i64::MAX)),
r#""long""#,
Value::Long(i64::MAX),
);
test_ok(
&DataType::Serial,
Some(ScalarImpl::Serial(i64::MAX.into())),
r#""long""#,
Value::Long(i64::MAX),
);
let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
test_ok(
&DataType::Timestamptz,
Some(ScalarImpl::Timestamptz(tstz)),
r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
Value::TimestampMicros(tstz.timestamp_micros()),
);
test_ok(
&DataType::Timestamptz,
Some(ScalarImpl::Timestamptz(tstz)),
r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
Value::TimestampMillis(tstz.timestamp_millis()),
);
test_ok(
&DataType::Date,
Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
r#"{"type": "int", "logicalType": "date"}"#,
Value::Date(1),
);
let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
test_ok(
&DataType::Time,
Some(ScalarImpl::Time(tm)),
r#"{"type": "long", "logicalType": "time-micros"}"#,
Value::TimeMicros(1000 * 1_000_000),
);
test_ok(
&DataType::Time,
Some(ScalarImpl::Time(tm)),
r#"{"type": "int", "logicalType": "time-millis"}"#,
Value::TimeMillis(1000 * 1000),
);
test_ok(
&DataType::Interval,
Some(ScalarImpl::Interval(Interval::from_month_day_usec(
13, 2, 1000000,
))),
r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
Value::Duration(apache_avro::Duration::new(
apache_avro::Months::new(13),
apache_avro::Days::new(2),
apache_avro::Millis::new(1000),
)),
);
let mut inner_map_array_builder = MapArrayBuilder::with_type(
2,
DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
);
inner_map_array_builder.append(Some(
MapValue::try_from_kv(
ListValue::from_iter(["a", "b"]),
ListValue::from_iter([1, 2]),
)
.unwrap()
.as_scalar_ref(),
));
inner_map_array_builder.append(Some(
MapValue::try_from_kv(
ListValue::from_iter(["c", "d"]),
ListValue::from_iter([3, 4]),
)
.unwrap()
.as_scalar_ref(),
));
let inner_map_array = inner_map_array_builder.finish();
test_ok(
&DataType::Map(MapType::from_kv(
DataType::Varchar,
DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
)),
Some(ScalarImpl::Map(
MapValue::try_from_kv(
ListValue::from_iter(["k1", "k2"]),
ListValue::new(inner_map_array.into()),
)
.unwrap(),
)),
r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
Value::Map(HashMap::from_iter([
(
"k1".into(),
Value::Map(HashMap::from_iter([
("a".into(), Value::Int(1)),
("b".into(), Value::Int(2)),
])),
),
(
"k2".into(),
Value::Map(HashMap::from_iter([
("c".into(), Value::Int(3)),
("d".into(), Value::Int(4)),
])),
),
])),
);
}
#[test]
fn test_encode_avro_err() {
test_err(
&DataType::Struct(StructType::new(vec![
(
"p",
DataType::Struct(StructType::new(vec![
("x", DataType::Int32),
("y", DataType::Int32),
])),
),
(
"q",
DataType::Struct(StructType::new(vec![
("x", DataType::Int32),
("y", DataType::Int32),
])),
),
])),
Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Int32(-2)),
Some(ScalarImpl::Int32(-1)),
]))),
Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int32(1)),
]))),
])))
.to_datum_ref(),
r#"{
"type": "record",
"name": "Segment",
"fields": [
{
"name": "p",
"type": {
"type": "record",
"name": "Point",
"fields": [
{
"name": "x",
"type": "int"
},
{
"name": "y",
"type": "int"
}
]
}
},
{
"name": "q",
"type": "Point"
}
]
}"#,
"encode 'q' error: avro name ref unsupported yet",
);
test_err(
&DataType::Interval,
Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
-1,
-1,
i64::MAX,
))),
r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
"encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
);
let avro_schema = AvroSchema::parse_str(
r#"{"type": "record", "name": "Root", "fields": [
{"name": "f0", "type": "int"}
]}"#,
)
.unwrap();
let mut record = Record::new(&avro_schema).unwrap();
record.put("f0", Value::String("2".into()));
let res: SinkResult<Vec<u8>> = AvroEncoded {
value: Value::from(record),
schema: Arc::new(avro_schema),
header: AvroHeader::ConfluentSchemaRegistry(42),
}
.ser_to();
assert_eq!(
res.unwrap_err().to_string(),
"Encode error: Value does not match schema"
);
}
#[test]
fn test_encode_avro_record() {
let avro_schema = AvroSchema::parse_str(
r#"{
"type": "record",
"name": "Root",
"fields": [
{"name": "req", "type": "int"},
{"name": "opt", "type": ["null", "long"]}
]
}"#,
)
.unwrap();
let avro_schema = Arc::new(avro_schema);
let header = AvroHeader::None;
let schema = Schema::new(vec![
Field::with_name(DataType::Int64, "opt"),
Field::with_name(DataType::Int32, "req"),
]);
let row = OwnedRow::new(vec![
Some(ScalarImpl::Int64(31)),
Some(ScalarImpl::Int32(15)),
]);
let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
let actual = encoder.encode(row).unwrap();
assert_eq!(
actual.value,
Value::Record(vec![
("req".into(), Value::Int(15)),
("opt".into(), Value::Union(1, Value::Long(31).into())),
])
);
let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
let actual = encoder.encode(row).unwrap();
assert_eq!(
actual.value,
Value::Record(vec![
("req".into(), Value::Int(15)),
("opt".into(), Value::Union(0, Value::Null.into())),
])
);
let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
panic!()
};
assert_eq!(
err.to_string(),
"Encode error: encode 'req' error: field not present but required"
);
let schema = Schema::new(vec![
Field::with_name(DataType::Int64, "opt"),
Field::with_name(DataType::Int32, "req"),
Field::with_name(DataType::Varchar, "extra"),
]);
let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
panic!()
};
assert_eq!(
err.to_string(),
"Encode error: encode 'extra' error: field not in avro"
);
let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
panic!()
};
assert_eq!(
err.to_string(),
r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
);
test_err(
&DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
(),
r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
"encode 'f0' error: cannot encode boolean column as \"int\" field",
);
}
#[test]
fn test_encode_avro_array() {
let avro_schema = r#"{
"type": "array",
"items": "int"
}"#;
test_ok(
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
avro_schema,
Value::Array(vec![Value::Int(4), Value::Int(5)]),
);
test_err(
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
avro_schema,
"encode '' error: found null but required",
);
test_ok(
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
r#"{
"type": "array",
"items": ["null", "int"]
}"#,
Value::Array(vec![
Value::Union(1, Value::Int(4).into()),
Value::Union(0, Value::Null.into()),
]),
);
test_ok(
&DataType::List(DataType::List(DataType::Int32.into()).into()),
Some(ScalarImpl::List(ListValue::from_iter([
ListValue::from_iter([26, 29]),
ListValue::from_iter([46, 49]),
]))),
r#"{
"type": "array",
"items": {
"type": "array",
"items": "int"
}
}"#,
Value::Array(vec![
Value::Array(vec![Value::Int(26), Value::Int(29)]),
Value::Array(vec![Value::Int(46), Value::Int(49)]),
]),
);
test_err(
&DataType::List(DataType::Boolean.into()),
(),
r#"{"type": "array", "items": "int"}"#,
"encode '' error: cannot encode boolean column as \"int\" field",
);
}
#[test]
fn test_encode_avro_union() {
let t = &DataType::Timestamptz;
let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
let empty = "[]";
let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
test_ok(
t,
datum.clone(),
opt_micros,
Value::Union(1, Value::TimestampMicros(1500).into()),
);
test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
test_ok(
t,
datum.clone(),
opt_millis,
Value::Union(1, Value::TimestampMillis(1).into()),
);
test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
test_err(
t,
datum.to_datum_ref(),
both,
r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
);
test_err(
t,
datum.to_datum_ref(),
empty,
"encode '' error: cannot encode timestamp with time zone column as [] field",
);
test_ok(
t,
datum.clone(),
one,
Value::Union(0, Value::TimestampMillis(1).into()),
);
test_err(t, None, one, "encode '' error: found null but required");
test_ok(
t,
datum.clone(),
right,
Value::Union(0, Value::TimestampMillis(1).into()),
);
test_ok(t, None, right, Value::Union(1, Value::Null.into()));
}
#[test]
fn test_encode_avro_lib_bug() {
use apache_avro::{Reader, Writer};
let avro_schema = AvroSchema::parse_str(
r#"{
"type": "record",
"name": "Root",
"fields": [
{
"name": "f0",
"type": ["null", "int"]
},
{
"name": "f1",
"type": ["null", "int"]
}
]
}"#,
)
.unwrap();
let mut writer = Writer::new(&avro_schema, Vec::new());
let mut record = Record::new(writer.schema()).unwrap();
record.put("f1", Value::Int(3));
writer.append(record).unwrap();
let encoded = writer.into_inner().unwrap();
let reader = Reader::new(encoded.as_slice()).unwrap();
for value in reader {
assert_eq!(
value.unwrap_err().to_string(),
"Union index 3 out of bounds: 2"
);
}
let mut writer = Writer::new(&avro_schema, Vec::new());
let mut record = Record::new(writer.schema()).unwrap();
record.put("f1", Value::Union(1, Value::Int(3).into()));
writer.append(record).unwrap();
let encoded = writer.into_inner().unwrap();
let reader = Reader::new(encoded.as_slice()).unwrap();
for value in reader {
assert_eq!(
value.unwrap(),
Value::Record(vec![
("f0".into(), Value::Union(1, Value::Int(3).into())),
("f1".into(), Value::Union(0, Value::Null.into())),
])
);
}
}
}