risingwave_connector/sink/encoder/
avro.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use apache_avro::schema::{Name, RecordSchema, Schema as AvroSchema};
19use apache_avro::types::{Record, Value};
20use risingwave_common::array::VECTOR_AS_LIST_TYPE;
21use risingwave_common::catalog::Schema;
22use risingwave_common::row::Row;
23use risingwave_common::types::{DataType, DatumRef, ListType, ScalarRefImpl, StructType};
24use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
25use risingwave_connector_codec::decoder::utils::rust_decimal_to_scaled_bigint;
26use thiserror_ext::AsReport;
27
28use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
29
30type Result<T> = std::result::Result<T, FieldEncodeError>;
31struct NamesRef(HashMap<Name, AvroSchema>);
32
33pub struct AvroEncoder {
34    schema: Schema,
35    col_indices: Option<Vec<usize>>,
36    avro_schema: Arc<AvroSchema>,
37    refs: NamesRef,
38    header: AvroHeader,
39}
40
41#[derive(Debug, Clone, Copy)]
42pub enum AvroHeader {
43    None,
44    /// <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
45    ///
46    /// * C3 01
47    /// * 8-byte little-endian CRC-64-AVRO fingerprint
48    SingleObject,
49    /// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
50    ///
51    /// * 4F 62 6A 01
52    /// * schema
53    /// * 16-byte random sync marker
54    ContainerFile,
55    /// <https://docs.confluent.io/platform/7.5/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format>
56    ///
57    /// * 00
58    /// * 4-byte big-endian schema ID
59    ConfluentSchemaRegistry(i32),
60    /// <https://github.com/awslabs/aws-glue-schema-registry/blob/v1.1.20/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L59-L61>
61    ///
62    /// * 03
63    /// * 00
64    /// * 16-byte UUID identifying a specific schema version
65    GlueSchemaRegistry(uuid::Uuid),
66}
67
68impl AvroEncoder {
69    pub fn new(
70        schema: Schema,
71        col_indices: Option<Vec<usize>>,
72        avro_schema: Arc<AvroSchema>,
73        header: AvroHeader,
74    ) -> SinkResult<Self> {
75        let refs = NamesRef::new(&avro_schema)?;
76        match &col_indices {
77            Some(col_indices) => validate_fields(
78                col_indices.iter().map(|idx| {
79                    let f = &schema[*idx];
80                    (f.name.as_str(), &f.data_type)
81                }),
82                &avro_schema,
83                &refs,
84            )?,
85            None => validate_fields(
86                schema
87                    .fields
88                    .iter()
89                    .map(|f| (f.name.as_str(), &f.data_type)),
90                &avro_schema,
91                &refs,
92            )?,
93        };
94
95        Ok(Self {
96            schema,
97            col_indices,
98            avro_schema,
99            refs,
100            header,
101        })
102    }
103}
104
105impl NamesRef {
106    fn new(root: &AvroSchema) -> std::result::Result<Self, apache_avro::Error> {
107        let resolved = apache_avro::schema::ResolvedSchema::try_from(root)?;
108        let refs = resolved
109            .get_names()
110            .iter()
111            .map(|(k, v)| (k.to_owned(), (*v).to_owned()))
112            .collect();
113        Ok(Self(refs))
114    }
115
116    fn lookup<'a>(&'a self, avro: &'a AvroSchema) -> &'a AvroSchema {
117        match avro {
118            AvroSchema::Ref { name } => &self.0[name],
119            _ => avro,
120        }
121    }
122}
123
124pub struct AvroEncoded {
125    value: Value,
126    schema: Arc<AvroSchema>,
127    header: AvroHeader,
128}
129
130impl RowEncoder for AvroEncoder {
131    type Output = AvroEncoded;
132
133    fn schema(&self) -> &Schema {
134        &self.schema
135    }
136
137    fn col_indices(&self) -> Option<&[usize]> {
138        self.col_indices.as_deref()
139    }
140
141    fn encode_cols(
142        &self,
143        row: impl Row,
144        col_indices: impl Iterator<Item = usize>,
145    ) -> SinkResult<Self::Output> {
146        let record = encode_fields(
147            col_indices.map(|idx| {
148                let f = &self.schema[idx];
149                ((f.name.as_str(), &f.data_type), row.datum_at(idx))
150            }),
151            &self.avro_schema,
152            &self.refs,
153        )?;
154        Ok(AvroEncoded {
155            value: record.into(),
156            schema: self.avro_schema.clone(),
157            header: self.header,
158        })
159    }
160}
161
162impl SerTo<Vec<u8>> for AvroEncoded {
163    fn ser_to(self) -> SinkResult<Vec<u8>> {
164        use bytes::BufMut as _;
165
166        let header = match self.header {
167            AvroHeader::ConfluentSchemaRegistry(schema_id) => {
168                let mut buf = Vec::with_capacity(1 + 4);
169                buf.put_u8(0);
170                buf.put_i32(schema_id);
171                buf
172            }
173            AvroHeader::GlueSchemaRegistry(schema_version_id) => {
174                let mut buf = Vec::with_capacity(1 + 1 + 16);
175                buf.put_u8(3);
176                buf.put_u8(0);
177                buf.put_slice(schema_version_id.as_bytes());
178                buf
179            }
180            AvroHeader::None | AvroHeader::SingleObject | AvroHeader::ContainerFile => {
181                return Err(crate::sink::SinkError::Encode(format!(
182                    "{:?} unsupported yet",
183                    self.header
184                )));
185            }
186        };
187
188        let raw = apache_avro::to_avro_datum(&self.schema, self.value)
189            .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
190        let mut buf = Vec::with_capacity(header.len() + raw.len());
191        buf.put_slice(&header);
192        buf.put_slice(&raw);
193
194        Ok(buf)
195    }
196}
197
198enum OptIdx {
199    /// `T`
200    NotUnion,
201    /// `[T]`
202    Single,
203    /// `[null, T]`
204    NullLeft,
205    /// `[T, null]`
206    NullRight,
207}
208
209/// A trait that assists code reuse between `validate` and `encode`.
210/// * For `validate`, the inputs are (RisingWave type, ProtoBuf type).
211/// * For `encode`, the inputs are (RisingWave type, RisingWave data, ProtoBuf type).
212///
213/// Thus we impl [`MaybeData`] for both [`()`] and [`DatumRef`].
214trait MaybeData: std::fmt::Debug {
215    type Out;
216
217    fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
218
219    /// Switch to `RecordSchema` after #12562
220    fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
221
222    fn on_list(self, lt: &ListType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
223
224    fn on_map(
225        self,
226        value_type: &DataType,
227        avro_value_schema: &AvroSchema,
228        refs: &NamesRef,
229    ) -> Result<Self::Out>;
230
231    fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
232}
233
234impl MaybeData for () {
235    type Out = ();
236
237    fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
238        Ok(self)
239    }
240
241    fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
242        validate_fields(st.iter(), avro, refs)
243    }
244
245    fn on_list(self, lt: &ListType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
246        on_field(lt.elem(), (), avro, refs)
247    }
248
249    fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
250        on_field(elem, (), avro, refs)
251    }
252
253    fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
254        Ok(out)
255    }
256}
257
258impl MaybeData for DatumRef<'_> {
259    type Out = Value;
260
261    fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
262        match self {
263            Some(s) => f(s),
264            None => Ok(Value::Null),
265        }
266    }
267
268    fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
269        let d = match self {
270            Some(s) => s.into_struct(),
271            None => return Ok(Value::Null),
272        };
273        let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro, refs)?;
274        Ok(record.into())
275    }
276
277    fn on_list(self, lt: &ListType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
278        let d = match self {
279            Some(s) => s.into_list(),
280            None => return Ok(Value::Null),
281        };
282        let vs = d
283            .iter()
284            .map(|d| on_field(lt.elem(), d, avro, refs))
285            .try_collect()?;
286        Ok(Value::Array(vs))
287    }
288
289    fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
290        let d = match self {
291            Some(s) => s.into_map(),
292            None => return Ok(Value::Null),
293        };
294        let vs = d
295            .iter()
296            .map(|(k, v)| {
297                let k = k.into_utf8().to_owned();
298                let v = on_field(elem, v, avro, refs)?;
299                Ok((k, v))
300            })
301            .try_collect()?;
302        Ok(Value::Map(vs))
303    }
304
305    fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
306        use OptIdx::*;
307
308        match out == Value::Null {
309            true => {
310                let ni = match opt_idx {
311                    NotUnion | Single => {
312                        return Err(FieldEncodeError::new("found null but required"));
313                    }
314                    NullLeft => 0,
315                    NullRight => 1,
316                };
317                Ok(Value::Union(ni, out.into()))
318            }
319            false => {
320                let vi = match opt_idx {
321                    NotUnion => return Ok(out),
322                    NullLeft => 1,
323                    Single | NullRight => 0,
324                };
325                Ok(Value::Union(vi, out.into()))
326            }
327        }
328    }
329}
330
331fn validate_fields<'rw>(
332    rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
333    avro: &AvroSchema,
334    refs: &NamesRef,
335) -> Result<()> {
336    let avro = refs.lookup(avro);
337    let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
338        return Err(FieldEncodeError::new(format!(
339            "expect avro record but got {}",
340            avro.canonical_form(),
341        )));
342    };
343    let mut present = vec![false; fields.len()];
344    for (name, t) in rw_fields {
345        let Some(&idx) = lookup.get(name) else {
346            return Err(FieldEncodeError::new("field not in avro").with_name(name));
347        };
348        present[idx] = true;
349        let avro_field = &fields[idx];
350        on_field(t, (), &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
351    }
352    for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
353        if p {
354            continue;
355        }
356        if !avro_field.is_nullable() {
357            return Err(
358                FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
359            );
360        }
361    }
362    Ok(())
363}
364
365fn encode_fields<'avro, 'rw>(
366    fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
367    schema: &'avro AvroSchema,
368    refs: &'avro NamesRef,
369) -> Result<Record<'avro>> {
370    let schema = refs.lookup(schema);
371    let mut record = Record::new(schema).unwrap();
372    let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
373        unreachable!()
374    };
375    let mut present = vec![false; fields.len()];
376    for ((name, t), d) in fields_with_datums {
377        let idx = lookup[name];
378        present[idx] = true;
379        let avro_field = &fields[idx];
380        let value = on_field(t, d, &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
381        record.put(name, value);
382    }
383    // Unfortunately, the upstream `apache_avro` does not handle missing fields as nullable correctly.
384    // The correct encoding is `Value::Union(null_index, Value::Null)` but it simply writes `Value::Null`.
385    // See [`tests::test_encode_avro_lib_bug`].
386    for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
387        if p {
388            continue;
389        }
390        let AvroSchema::Union(u) = &avro_field.schema else {
391            unreachable!()
392        };
393        // We could have saved null index of each field during [`validate_fields`] to avoid repeated lookup.
394        // But in most cases it is the 0th.
395        // Alternatively, we can simplify by enforcing the best practice of `null at 0th`.
396        let ni = u
397            .variants()
398            .iter()
399            .position(|a| a == &AvroSchema::Null)
400            .unwrap();
401        record.put(
402            &avro_field.name,
403            Value::Union(ni.try_into().unwrap(), Value::Null.into()),
404        );
405    }
406    Ok(record)
407}
408
409/// Handles both `validate` (without actual data) and `encode`.
410/// See [`MaybeData`] for more info.
411fn on_field<D: MaybeData>(
412    data_type: &DataType,
413    maybe: D,
414    expected: &AvroSchema,
415    refs: &NamesRef,
416) -> Result<D::Out> {
417    use risingwave_common::types::Interval;
418
419    let no_match_err = || {
420        Err(FieldEncodeError::new(format!(
421            "cannot encode {} column as {} field",
422            data_type,
423            expected.canonical_form()
424        )))
425    };
426
427    // For now, we only support optional single type, rather than general union.
428    // For example, how do we encode int16 into avro `["int", "long"]`?
429    let (inner, opt_idx) = match expected {
430        AvroSchema::Union(union) => match union.variants() {
431            [] => return no_match_err(),
432            [one] => (one, OptIdx::Single),
433            [AvroSchema::Null, r] => (r, OptIdx::NullLeft),
434            [l, AvroSchema::Null] => (l, OptIdx::NullRight),
435            _ => return no_match_err(),
436        },
437        _ => (expected, OptIdx::NotUnion),
438    };
439
440    let inner = refs.lookup(inner);
441
442    let value = match &data_type {
443        // Group A: perfect match between RisingWave types and Avro types
444        DataType::Boolean => match inner {
445            AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
446            _ => return no_match_err(),
447        },
448        DataType::Varchar => match inner {
449            AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
450
451            // Add enum support
452            AvroSchema::Enum(enum_schema) => maybe.on_base(|s| {
453                let str_value = s.into_utf8();
454
455                if let Some(position) = enum_schema
456                    .symbols
457                    .iter()
458                    .position(|symbol| symbol == str_value)
459                {
460                    Ok(Value::Enum(position as u32, str_value.to_owned()))
461                } else {
462                    Err(FieldEncodeError::new(format!(
463                        "Value '{}' is not a valid enum symbol. Valid symbols are: {:?}",
464                        str_value, enum_schema.symbols
465                    )))
466                }
467            })?,
468
469            _ => return no_match_err(),
470        },
471        DataType::Bytea => match inner {
472            AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
473            _ => return no_match_err(),
474        },
475        DataType::Float32 => match inner {
476            AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
477            _ => return no_match_err(),
478        },
479        DataType::Float64 => match inner {
480            AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
481            _ => return no_match_err(),
482        },
483        DataType::Int32 => match inner {
484            AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
485            _ => return no_match_err(),
486        },
487        DataType::Int64 => match inner {
488            AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
489            _ => return no_match_err(),
490        },
491        DataType::Serial => match inner {
492            AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
493            _ => return no_match_err(),
494        },
495        DataType::Struct(st) => match inner {
496            AvroSchema::Record { .. } => maybe.on_struct(st, inner, refs)?,
497            _ => return no_match_err(),
498        },
499        DataType::List(lt) => match inner {
500            AvroSchema::Array(avro_elem) => maybe.on_list(lt, avro_elem, refs)?,
501            _ => return no_match_err(),
502        },
503        DataType::Map(m) => {
504            if *m.key() != DataType::Varchar {
505                return no_match_err();
506            }
507            match inner {
508                AvroSchema::Map(avro_value_type) => {
509                    maybe.on_map(m.value(), avro_value_type, refs)?
510                }
511                _ => return no_match_err(),
512            }
513        }
514
515        // Group B: match between RisingWave types and Avro logical types
516        DataType::Timestamptz => match inner {
517            AvroSchema::TimestampMicros => maybe.on_base(|s| {
518                Ok(Value::TimestampMicros(
519                    s.into_timestamptz().timestamp_micros(),
520                ))
521            })?,
522            AvroSchema::TimestampMillis => maybe.on_base(|s| {
523                Ok(Value::TimestampMillis(
524                    s.into_timestamptz().timestamp_millis(),
525                ))
526            })?,
527            _ => return no_match_err(),
528        },
529        DataType::Timestamp => return no_match_err(),
530        DataType::Date => match inner {
531            AvroSchema::Date => {
532                maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
533            }
534            _ => return no_match_err(),
535        },
536        DataType::Time => match inner {
537            AvroSchema::TimeMicros => {
538                maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
539            }
540            AvroSchema::TimeMillis => maybe.on_base(|s| {
541                Ok(Value::TimeMillis(
542                    (Interval::from(s.into_time()).usecs() / 1000)
543                        .try_into()
544                        .unwrap(),
545                ))
546            })?,
547            _ => return no_match_err(),
548        },
549        DataType::Interval => match inner {
550            AvroSchema::Duration => maybe.on_base(|s| {
551                use apache_avro::{Days, Duration, Millis, Months};
552                let iv = s.into_interval();
553
554                let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
555
556                Ok(Value::Duration(Duration::new(
557                    Months::new(iv.months().try_into().map_err(overflow)?),
558                    Days::new(iv.days().try_into().map_err(overflow)?),
559                    Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
560                )))
561            })?,
562            _ => return no_match_err(),
563        },
564        // Group C: experimental
565        DataType::Int16 => match inner {
566            AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int16() as i32)))?,
567            _ => return no_match_err(),
568        },
569        DataType::Decimal => match inner {
570            AvroSchema::Decimal(decimal_schema) => {
571                maybe.on_base(|s| {
572                    match s.into_decimal() {
573                        risingwave_common::types::Decimal::Normalized(decimal) => {
574                            // convert to bigint with scale
575                            // rescale the rust_decimal to the scale of the avro decimal
576                            //
577                            // From bigdecimal::BigDecimal::with_scale:
578                            // If the new_scale is lower than the current value (indicating a larger
579                            // power of 10), digits will be dropped (as precision is lower)
580                            let signed_bigint_bytes =
581                                rust_decimal_to_scaled_bigint(decimal, decimal_schema.scale)
582                                    .map_err(FieldEncodeError::new)?;
583                            Ok(Value::Decimal(apache_avro::Decimal::from(
584                                &signed_bigint_bytes,
585                            )))
586                        }
587                        d @ risingwave_common::types::Decimal::NaN
588                        | d @ risingwave_common::types::Decimal::NegativeInf
589                        | d @ risingwave_common::types::Decimal::PositiveInf => {
590                            Err(FieldEncodeError::new(format!(
591                                "Avro Decimal does not support NaN or Inf, but got {}",
592                                d
593                            )))
594                        }
595                    }
596                })?
597            }
598            _ => return no_match_err(),
599        },
600        DataType::Jsonb => match inner {
601            AvroSchema::String => {
602                maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?
603            }
604            _ => return no_match_err(),
605        },
606        DataType::Vector(_) => match inner {
607            AvroSchema::Array(avro_elem) => maybe.on_list(&VECTOR_AS_LIST_TYPE, avro_elem, refs)?,
608            _ => return no_match_err(),
609        },
610        // Group D: unsupported
611        DataType::Int256 => {
612            return no_match_err();
613        }
614    };
615
616    D::handle_nullable_union(value, opt_idx)
617}
618
619#[cfg(test)]
620mod tests {
621    use std::collections::HashMap;
622    use std::str::FromStr;
623
624    use expect_test::expect;
625    use itertools::Itertools;
626    use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
627    use risingwave_common::catalog::Field;
628    use risingwave_common::row::OwnedRow;
629    use risingwave_common::types::{
630        Date, Datum, Interval, JsonbVal, ListValue, MapType, MapValue, Scalar, ScalarImpl,
631        StructValue, Time, Timestamptz, ToDatumRef,
632    };
633
634    use super::*;
635
636    #[track_caller]
637    fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
638        let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
639        let refs = NamesRef::new(&avro_schema).unwrap();
640        let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema, &refs).unwrap();
641        assert_eq!(actual, expected);
642    }
643
644    #[track_caller]
645    fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
646    where
647        D::Out: std::fmt::Debug,
648    {
649        let avro_schema = AvroSchema::parse_str(avro).unwrap();
650        let refs = NamesRef::new(&avro_schema).unwrap();
651        let err = on_field(t, d, &avro_schema, &refs).unwrap_err();
652        assert_eq!(err.to_string(), expected);
653    }
654
655    #[track_caller]
656    fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
657        let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
658        let refs = NamesRef::new(&avro_schema).unwrap();
659        let rw_type = DataType::from_str(rw_type).unwrap();
660        let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
661
662        if let Err(validate_err) = on_field(&rw_type, (), &avro_schema, &refs) {
663            expected.assert_debug_eq(&validate_err);
664            return;
665        }
666        let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema, &refs);
667        match actual {
668            Ok(v) => expected.assert_eq(&print_avro_value(&v)),
669            Err(e) => expected.assert_debug_eq(&e),
670        }
671    }
672
673    fn print_avro_value(v: &Value) -> String {
674        match v {
675            Value::Map(m) => {
676                let mut res = "Map({".to_owned();
677                for (k, v) in m.iter().sorted_by_key(|x| x.0) {
678                    res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
679                }
680                res.push_str("})");
681                res
682            }
683            _ => format!("{v:?}"),
684        }
685    }
686
687    #[test]
688    fn test_encode_v2() {
689        test_v2(
690            "boolean",
691            "false",
692            r#""int""#,
693            expect![[r#"
694                FieldEncodeError {
695                    message: "cannot encode boolean column as \"int\" field",
696                    rev_path: [],
697                }
698            "#]],
699        );
700        test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
701
702        test_v2(
703            "map(varchar,varchar)",
704            "{1:1,2:2,3:3}",
705            r#"{"type": "map","values": "string"}"#,
706            expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
707        );
708
709        test_v2(
710            "map(varchar,varchar)",
711            "{1:1,2:NULL,3:3}",
712            r#"{"type": "map","values": "string"}"#,
713            expect![[r#"
714                FieldEncodeError {
715                    message: "found null but required",
716                    rev_path: [],
717                }
718            "#]],
719        );
720
721        test_v2(
722            "map(varchar,varchar)",
723            "{1:1,2:NULL,3:3}",
724            r#"{"type": "map","values": ["null", "string"]}"#,
725            expect![[
726                r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
727            ]],
728        );
729
730        test_v2(
731            "map(int,varchar)",
732            "{1:1,2:NULL,3:3}",
733            r#"{"type": "map","values": ["null", "string"]}"#,
734            expect![[r#"
735                FieldEncodeError {
736                    message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
737                    rev_path: [],
738                }
739            "#]],
740        );
741    }
742
743    #[test]
744    fn test_encode_avro_ok() {
745        test_ok(
746            &DataType::Boolean,
747            Some(ScalarImpl::Bool(false)),
748            r#""boolean""#,
749            Value::Boolean(false),
750        );
751
752        test_ok(
753            &DataType::Varchar,
754            Some(ScalarImpl::Utf8("RisingWave".into())),
755            r#""string""#,
756            Value::String("RisingWave".into()),
757        );
758
759        test_ok(
760            &DataType::Bytea,
761            Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
762            r#""bytes""#,
763            Value::Bytes([0xbe, 0xef].into()),
764        );
765
766        test_ok(
767            &DataType::Float32,
768            Some(ScalarImpl::Float32(3.5f32.into())),
769            r#""float""#,
770            Value::Float(3.5f32),
771        );
772
773        test_ok(
774            &DataType::Float64,
775            Some(ScalarImpl::Float64(4.25f64.into())),
776            r#""double""#,
777            Value::Double(4.25f64),
778        );
779
780        test_ok(
781            &DataType::Int32,
782            Some(ScalarImpl::Int32(16)),
783            r#""int""#,
784            Value::Int(16),
785        );
786
787        test_ok(
788            &DataType::Int64,
789            Some(ScalarImpl::Int64(i64::MAX)),
790            r#""long""#,
791            Value::Long(i64::MAX),
792        );
793
794        test_ok(
795            &DataType::Serial,
796            Some(ScalarImpl::Serial(i64::MAX.into())),
797            r#""long""#,
798            Value::Long(i64::MAX),
799        );
800
801        let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
802        test_ok(
803            &DataType::Timestamptz,
804            Some(ScalarImpl::Timestamptz(tstz)),
805            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
806            Value::TimestampMicros(tstz.timestamp_micros()),
807        );
808        test_ok(
809            &DataType::Timestamptz,
810            Some(ScalarImpl::Timestamptz(tstz)),
811            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
812            Value::TimestampMillis(tstz.timestamp_millis()),
813        );
814
815        test_ok(
816            &DataType::Date,
817            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
818            r#"{"type": "int", "logicalType": "date"}"#,
819            Value::Date(1),
820        );
821
822        let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
823        test_ok(
824            &DataType::Time,
825            Some(ScalarImpl::Time(tm)),
826            r#"{"type": "long", "logicalType": "time-micros"}"#,
827            Value::TimeMicros(1000 * 1_000_000),
828        );
829        test_ok(
830            &DataType::Time,
831            Some(ScalarImpl::Time(tm)),
832            r#"{"type": "int", "logicalType": "time-millis"}"#,
833            Value::TimeMillis(1000 * 1000),
834        );
835
836        test_ok(
837            &DataType::Int16,
838            Some(ScalarImpl::Int16(i16::MAX)),
839            r#""int""#,
840            Value::Int(i16::MAX as i32),
841        );
842
843        test_ok(
844            &DataType::Int16,
845            Some(ScalarImpl::Int16(i16::MIN)),
846            r#""int""#,
847            Value::Int(i16::MIN as i32),
848        );
849
850        test_ok(
851            &DataType::Jsonb,
852            Some(ScalarImpl::Jsonb(
853                JsonbVal::from_str(r#"{"a": 1}"#).unwrap(),
854            )),
855            r#""string""#,
856            Value::String(r#"{"a": 1}"#.into()),
857        );
858
859        test_ok(
860            &DataType::Interval,
861            Some(ScalarImpl::Interval(Interval::from_month_day_usec(
862                13, 2, 1000000,
863            ))),
864            r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
865            Value::Duration(apache_avro::Duration::new(
866                apache_avro::Months::new(13),
867                apache_avro::Days::new(2),
868                apache_avro::Millis::new(1000),
869            )),
870        );
871
872        let mut inner_map_array_builder = MapArrayBuilder::with_type(
873            2,
874            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
875        );
876        inner_map_array_builder.append(Some(
877            MapValue::try_from_kv(
878                ListValue::from_iter(["a", "b"]),
879                ListValue::from_iter([1, 2]),
880            )
881            .unwrap()
882            .as_scalar_ref(),
883        ));
884        inner_map_array_builder.append(Some(
885            MapValue::try_from_kv(
886                ListValue::from_iter(["c", "d"]),
887                ListValue::from_iter([3, 4]),
888            )
889            .unwrap()
890            .as_scalar_ref(),
891        ));
892        let inner_map_array = inner_map_array_builder.finish();
893        test_ok(
894            &DataType::Map(MapType::from_kv(
895                DataType::Varchar,
896                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
897            )),
898            Some(ScalarImpl::Map(
899                MapValue::try_from_kv(
900                    ListValue::from_iter(["k1", "k2"]),
901                    ListValue::new(inner_map_array.into()),
902                )
903                .unwrap(),
904            )),
905            r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
906            Value::Map(HashMap::from_iter([
907                (
908                    "k1".into(),
909                    Value::Map(HashMap::from_iter([
910                        ("a".into(), Value::Int(1)),
911                        ("b".into(), Value::Int(2)),
912                    ])),
913                ),
914                (
915                    "k2".into(),
916                    Value::Map(HashMap::from_iter([
917                        ("c".into(), Value::Int(3)),
918                        ("d".into(), Value::Int(4)),
919                    ])),
920                ),
921            ])),
922        );
923
924        test_ok(
925            &DataType::Struct(StructType::new(vec![
926                (
927                    "p",
928                    DataType::Struct(StructType::new(vec![
929                        ("x", DataType::Int32),
930                        ("y", DataType::Int32),
931                    ])),
932                ),
933                (
934                    "q",
935                    DataType::Struct(StructType::new(vec![
936                        ("x", DataType::Int32),
937                        ("y", DataType::Int32),
938                    ])),
939                ),
940            ])),
941            Some(ScalarImpl::Struct(StructValue::new(vec![
942                Some(ScalarImpl::Struct(StructValue::new(vec![
943                    Some(ScalarImpl::Int32(-2)),
944                    Some(ScalarImpl::Int32(-1)),
945                ]))),
946                Some(ScalarImpl::Struct(StructValue::new(vec![
947                    Some(ScalarImpl::Int32(2)),
948                    Some(ScalarImpl::Int32(1)),
949                ]))),
950            ]))),
951            r#"{
952                "type": "record",
953                "name": "Segment",
954                "fields": [
955                    {
956                        "name": "p",
957                        "type": {
958                            "type": "record",
959                            "name": "Point",
960                            "fields": [
961                                {
962                                    "name": "x",
963                                    "type": "int"
964                                },
965                                {
966                                    "name": "y",
967                                    "type": "int"
968                                }
969                            ]
970                        }
971                    },
972                    {
973                        "name": "q",
974                        "type": "Point"
975                    }
976                ]
977            }"#,
978            Value::Record(vec![
979                (
980                    "p".to_owned(),
981                    Value::Record(vec![
982                        ("x".to_owned(), Value::Int(-2)),
983                        ("y".to_owned(), Value::Int(-1)),
984                    ]),
985                ),
986                (
987                    "q".to_owned(),
988                    Value::Record(vec![
989                        ("x".to_owned(), Value::Int(2)),
990                        ("y".to_owned(), Value::Int(1)),
991                    ]),
992                ),
993            ]),
994        );
995
996        // NEW: Varchar to Enum tests
997        test_ok(
998            &DataType::Varchar,
999            Some(ScalarImpl::Utf8("RED".into())),
1000            r#"{"type": "enum", "name": "Color", "symbols": ["RED", "GREEN", "BLUE"]}"#,
1001            Value::Enum(0, "RED".to_owned()),
1002        );
1003
1004        test_ok(
1005            &DataType::Varchar,
1006            Some(ScalarImpl::Utf8("BLUE".into())),
1007            r#"{"type": "enum", "name": "Color", "symbols": ["RED", "GREEN", "BLUE"]}"#,
1008            Value::Enum(2, "BLUE".to_owned()),
1009        );
1010
1011        test_ok(
1012            &DataType::Varchar,
1013            Some(ScalarImpl::Utf8("ACTIVE".into())),
1014            r#"{"type": "enum", "name": "Status", "symbols": ["ACTIVE", "INACTIVE"]}"#,
1015            Value::Enum(0, "ACTIVE".to_owned()),
1016        );
1017
1018        // Test complex JSON with nested structures - using serde_json::Value comparison
1019        let complex_json = r#"{
1020            "person": {
1021                "name": "John Doe",
1022                "age": 30,
1023                "address": {
1024                    "street": "123 Main St.",
1025                    "city": "New York",
1026                    "coordinates": [40.7128, -74.0060]
1027                },
1028                "contacts": [
1029                    {"type": "email", "value": "john@example.com"},
1030                    {"type": "phone", "value": "+1-555-123-4567"}
1031                ],
1032                "active": true,
1033                "preferences": {
1034                    "notifications": true,
1035                    "theme": "dark",
1036                    "languages": ["en", "es"],
1037                    "lastLogin": null
1038                },
1039                "tags": ["premium", "verified"],
1040                "unicode_test": "Hello, δΈ–η•Œ! 🌍"
1041            }
1042        }"#;
1043
1044        let input_json = JsonbVal::from_str(complex_json).unwrap();
1045        let result = on_field(
1046            &DataType::Jsonb,
1047            Some(ScalarImpl::Jsonb(input_json.clone())).to_datum_ref(),
1048            &AvroSchema::parse_str(r#""string""#).unwrap(),
1049            &NamesRef::new(&AvroSchema::parse_str(r#""string""#).unwrap()).unwrap(),
1050        )
1051        .unwrap();
1052
1053        // Compare as parsed JSON values to handle key order randomness
1054        if let Value::String(result_str) = result {
1055            let expected_json: serde_json::Value = serde_json::from_str(complex_json).unwrap();
1056            let actual_json: serde_json::Value = serde_json::from_str(&result_str).unwrap();
1057            assert_eq!(
1058                expected_json, actual_json,
1059                "JSON values should be equivalent regardless of key order"
1060            );
1061        } else {
1062            panic!("Expected String value");
1063        };
1064    }
1065
1066    #[test]
1067    fn test_encode_avro_err() {
1068        test_err(
1069            &DataType::Interval,
1070            Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
1071                -1,
1072                -1,
1073                i64::MAX,
1074            ))),
1075            r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
1076            "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
1077        );
1078
1079        let avro_schema = AvroSchema::parse_str(
1080            r#"{"type": "record", "name": "Root", "fields": [
1081                {"name": "f0", "type": "int"}
1082            ]}"#,
1083        )
1084        .unwrap();
1085        let mut record = Record::new(&avro_schema).unwrap();
1086        record.put("f0", Value::String("2".into()));
1087        let res: SinkResult<Vec<u8>> = AvroEncoded {
1088            value: Value::from(record),
1089            schema: Arc::new(avro_schema),
1090            header: AvroHeader::ConfluentSchemaRegistry(42),
1091        }
1092        .ser_to();
1093        assert_eq!(
1094            res.unwrap_err().to_string(),
1095            "Encode error: Value does not match schema"
1096        );
1097    }
1098
1099    #[test]
1100    fn test_encode_avro_record() {
1101        let avro_schema = AvroSchema::parse_str(
1102            r#"{
1103                "type": "record",
1104                "name": "Root",
1105                "fields": [
1106                    {"name": "req", "type": "int"},
1107                    {"name": "opt", "type": ["null", "long"]}
1108                ]
1109            }"#,
1110        )
1111        .unwrap();
1112        let avro_schema = Arc::new(avro_schema);
1113        let header = AvroHeader::None;
1114
1115        let schema = Schema::new(vec![
1116            Field::with_name(DataType::Int64, "opt"),
1117            Field::with_name(DataType::Int32, "req"),
1118        ]);
1119        let row = OwnedRow::new(vec![
1120            Some(ScalarImpl::Int64(31)),
1121            Some(ScalarImpl::Int32(15)),
1122        ]);
1123        let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1124        let actual = encoder.encode(row).unwrap();
1125        assert_eq!(
1126            actual.value,
1127            Value::Record(vec![
1128                ("req".into(), Value::Int(15)),
1129                ("opt".into(), Value::Union(1, Value::Long(31).into())),
1130            ])
1131        );
1132
1133        let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
1134        let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
1135        let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1136        let actual = encoder.encode(row).unwrap();
1137        assert_eq!(
1138            actual.value,
1139            Value::Record(vec![
1140                ("req".into(), Value::Int(15)),
1141                ("opt".into(), Value::Union(0, Value::Null.into())),
1142            ])
1143        );
1144
1145        let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1146        let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1147            panic!()
1148        };
1149        assert_eq!(
1150            err.to_string(),
1151            "Encode error: encode 'req' error: field not present but required"
1152        );
1153
1154        let schema = Schema::new(vec![
1155            Field::with_name(DataType::Int64, "opt"),
1156            Field::with_name(DataType::Int32, "req"),
1157            Field::with_name(DataType::Varchar, "extra"),
1158        ]);
1159        let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1160            panic!()
1161        };
1162        assert_eq!(
1163            err.to_string(),
1164            "Encode error: encode 'extra' error: field not in avro"
1165        );
1166
1167        let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
1168        let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1169        let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
1170            panic!()
1171        };
1172        assert_eq!(
1173            err.to_string(),
1174            r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
1175        );
1176
1177        test_err(
1178            &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
1179            (),
1180            r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
1181            "encode 'f0' error: cannot encode boolean column as \"int\" field",
1182        );
1183    }
1184
1185    #[test]
1186    fn test_encode_avro_array() {
1187        let avro_schema = r#"{
1188            "type": "array",
1189            "items": "int"
1190        }"#;
1191
1192        test_ok(
1193            &DataType::Int32.list(),
1194            Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
1195            avro_schema,
1196            Value::Array(vec![Value::Int(4), Value::Int(5)]),
1197        );
1198
1199        test_err(
1200            &DataType::Int32.list(),
1201            Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
1202            avro_schema,
1203            "encode '' error: found null but required",
1204        );
1205
1206        test_ok(
1207            &DataType::Int32.list(),
1208            Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
1209            r#"{
1210                "type": "array",
1211                "items": ["null", "int"]
1212            }"#,
1213            Value::Array(vec![
1214                Value::Union(1, Value::Int(4).into()),
1215                Value::Union(0, Value::Null.into()),
1216            ]),
1217        );
1218
1219        test_ok(
1220            &DataType::Int32.list().list(),
1221            Some(ScalarImpl::List(ListValue::from_iter([
1222                ListValue::from_iter([26, 29]),
1223                ListValue::from_iter([46, 49]),
1224            ]))),
1225            r#"{
1226                "type": "array",
1227                "items": {
1228                    "type": "array",
1229                    "items": "int"
1230                }
1231            }"#,
1232            Value::Array(vec![
1233                Value::Array(vec![Value::Int(26), Value::Int(29)]),
1234                Value::Array(vec![Value::Int(46), Value::Int(49)]),
1235            ]),
1236        );
1237
1238        test_err(
1239            &DataType::Boolean.list(),
1240            (),
1241            r#"{"type": "array", "items": "int"}"#,
1242            "encode '' error: cannot encode boolean column as \"int\" field",
1243        );
1244    }
1245
1246    #[test]
1247    fn test_encode_avro_union() {
1248        let t = &DataType::Timestamptz;
1249        let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
1250        let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
1251        let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
1252        let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
1253        let empty = "[]";
1254        let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
1255        let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
1256
1257        test_ok(
1258            t,
1259            datum.clone(),
1260            opt_micros,
1261            Value::Union(1, Value::TimestampMicros(1500).into()),
1262        );
1263        test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
1264        test_ok(
1265            t,
1266            datum.clone(),
1267            opt_millis,
1268            Value::Union(1, Value::TimestampMillis(1).into()),
1269        );
1270        test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
1271
1272        test_err(
1273            t,
1274            datum.to_datum_ref(),
1275            both,
1276            r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
1277        );
1278
1279        test_err(
1280            t,
1281            datum.to_datum_ref(),
1282            empty,
1283            "encode '' error: cannot encode timestamp with time zone column as [] field",
1284        );
1285
1286        test_ok(
1287            t,
1288            datum.clone(),
1289            one,
1290            Value::Union(0, Value::TimestampMillis(1).into()),
1291        );
1292        test_err(t, None, one, "encode '' error: found null but required");
1293
1294        test_ok(
1295            t,
1296            datum.clone(),
1297            right,
1298            Value::Union(0, Value::TimestampMillis(1).into()),
1299        );
1300        test_ok(t, None, right, Value::Union(1, Value::Null.into()));
1301    }
1302
1303    /// This just demonstrates bugs of the upstream [`apache_avro`], rather than our encoder.
1304    /// The encoder is not using these buggy calls and is already tested above.
1305    #[test]
1306    fn test_encode_avro_lib_bug() {
1307        use apache_avro::{Reader, Writer};
1308
1309        // a record with 2 optional int fields
1310        let avro_schema = AvroSchema::parse_str(
1311            r#"{
1312                "type": "record",
1313                "name": "Root",
1314                "fields": [
1315                    {
1316                        "name": "f0",
1317                        "type": ["null", "int"]
1318                    },
1319                    {
1320                        "name": "f1",
1321                        "type": ["null", "int"]
1322                    }
1323                ]
1324            }"#,
1325        )
1326        .unwrap();
1327
1328        let mut writer = Writer::new(&avro_schema, Vec::new());
1329        let mut record = Record::new(writer.schema()).unwrap();
1330        // f0 omitted, f1 = Int(3)
1331        record.put("f1", Value::Int(3));
1332        writer.append(record).unwrap();
1333        let encoded = writer.into_inner().unwrap();
1334        // writing produced no error, but read fails
1335        let reader = Reader::new(encoded.as_slice()).unwrap();
1336        for value in reader {
1337            assert_eq!(
1338                value.unwrap_err().to_string(),
1339                "Union index 3 out of bounds: 2"
1340            );
1341        }
1342
1343        let mut writer = Writer::new(&avro_schema, Vec::new());
1344        let mut record = Record::new(writer.schema()).unwrap();
1345        // f0 omitted, f1 = Union(1, Int(3))
1346        record.put("f1", Value::Union(1, Value::Int(3).into()));
1347        writer.append(record).unwrap();
1348        let encoded = writer.into_inner().unwrap();
1349        // writing produced no error, but read returns wrong value
1350        let reader = Reader::new(encoded.as_slice()).unwrap();
1351        for value in reader {
1352            assert_eq!(
1353                value.unwrap(),
1354                Value::Record(vec![
1355                    ("f0".into(), Value::Union(1, Value::Int(3).into())),
1356                    ("f1".into(), Value::Union(0, Value::Null.into())),
1357                ])
1358            );
1359        }
1360    }
1361}