risingwave_connector/sink/encoder/
avro.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use apache_avro::schema::{Name, RecordSchema, Schema as AvroSchema};
19use apache_avro::types::{Record, Value};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType};
23use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
24use risingwave_connector_codec::decoder::utils::rust_decimal_to_scaled_bigint;
25use thiserror_ext::AsReport;
26
27use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};
28
29type Result<T> = std::result::Result<T, FieldEncodeError>;
30struct NamesRef(HashMap<Name, AvroSchema>);
31
32pub struct AvroEncoder {
33    schema: Schema,
34    col_indices: Option<Vec<usize>>,
35    avro_schema: Arc<AvroSchema>,
36    refs: NamesRef,
37    header: AvroHeader,
38}
39
40#[derive(Debug, Clone, Copy)]
41pub enum AvroHeader {
42    None,
43    /// <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
44    ///
45    /// * C3 01
46    /// * 8-byte little-endian CRC-64-AVRO fingerprint
47    SingleObject,
48    /// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
49    ///
50    /// * 4F 62 6A 01
51    /// * schema
52    /// * 16-byte random sync marker
53    ContainerFile,
54    /// <https://docs.confluent.io/platform/7.5/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format>
55    ///
56    /// * 00
57    /// * 4-byte big-endian schema ID
58    ConfluentSchemaRegistry(i32),
59    /// <https://github.com/awslabs/aws-glue-schema-registry/blob/v1.1.20/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L59-L61>
60    ///
61    /// * 03
62    /// * 00
63    /// * 16-byte UUID identifying a specific schema version
64    GlueSchemaRegistry(uuid::Uuid),
65}
66
67impl AvroEncoder {
68    pub fn new(
69        schema: Schema,
70        col_indices: Option<Vec<usize>>,
71        avro_schema: Arc<AvroSchema>,
72        header: AvroHeader,
73    ) -> SinkResult<Self> {
74        let refs = NamesRef::new(&avro_schema)?;
75        match &col_indices {
76            Some(col_indices) => validate_fields(
77                col_indices.iter().map(|idx| {
78                    let f = &schema[*idx];
79                    (f.name.as_str(), &f.data_type)
80                }),
81                &avro_schema,
82                &refs,
83            )?,
84            None => validate_fields(
85                schema
86                    .fields
87                    .iter()
88                    .map(|f| (f.name.as_str(), &f.data_type)),
89                &avro_schema,
90                &refs,
91            )?,
92        };
93
94        Ok(Self {
95            schema,
96            col_indices,
97            avro_schema,
98            refs,
99            header,
100        })
101    }
102}
103
104impl NamesRef {
105    fn new(root: &AvroSchema) -> std::result::Result<Self, apache_avro::Error> {
106        let resolved = apache_avro::schema::ResolvedSchema::try_from(root)?;
107        let refs = resolved
108            .get_names()
109            .iter()
110            .map(|(k, v)| (k.to_owned(), (*v).to_owned()))
111            .collect();
112        Ok(Self(refs))
113    }
114
115    fn lookup<'a>(&'a self, avro: &'a AvroSchema) -> &'a AvroSchema {
116        match avro {
117            AvroSchema::Ref { name } => &self.0[name],
118            _ => avro,
119        }
120    }
121}
122
123pub struct AvroEncoded {
124    value: Value,
125    schema: Arc<AvroSchema>,
126    header: AvroHeader,
127}
128
129impl RowEncoder for AvroEncoder {
130    type Output = AvroEncoded;
131
132    fn schema(&self) -> &Schema {
133        &self.schema
134    }
135
136    fn col_indices(&self) -> Option<&[usize]> {
137        self.col_indices.as_deref()
138    }
139
140    fn encode_cols(
141        &self,
142        row: impl Row,
143        col_indices: impl Iterator<Item = usize>,
144    ) -> SinkResult<Self::Output> {
145        let record = encode_fields(
146            col_indices.map(|idx| {
147                let f = &self.schema[idx];
148                ((f.name.as_str(), &f.data_type), row.datum_at(idx))
149            }),
150            &self.avro_schema,
151            &self.refs,
152        )?;
153        Ok(AvroEncoded {
154            value: record.into(),
155            schema: self.avro_schema.clone(),
156            header: self.header,
157        })
158    }
159}
160
161impl SerTo<Vec<u8>> for AvroEncoded {
162    fn ser_to(self) -> SinkResult<Vec<u8>> {
163        use bytes::BufMut as _;
164
165        let header = match self.header {
166            AvroHeader::ConfluentSchemaRegistry(schema_id) => {
167                let mut buf = Vec::with_capacity(1 + 4);
168                buf.put_u8(0);
169                buf.put_i32(schema_id);
170                buf
171            }
172            AvroHeader::GlueSchemaRegistry(schema_version_id) => {
173                let mut buf = Vec::with_capacity(1 + 1 + 16);
174                buf.put_u8(3);
175                buf.put_u8(0);
176                buf.put_slice(schema_version_id.as_bytes());
177                buf
178            }
179            AvroHeader::None | AvroHeader::SingleObject | AvroHeader::ContainerFile => {
180                return Err(crate::sink::SinkError::Encode(format!(
181                    "{:?} unsupported yet",
182                    self.header
183                )));
184            }
185        };
186
187        let raw = apache_avro::to_avro_datum(&self.schema, self.value)
188            .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
189        let mut buf = Vec::with_capacity(header.len() + raw.len());
190        buf.put_slice(&header);
191        buf.put_slice(&raw);
192
193        Ok(buf)
194    }
195}
196
197enum OptIdx {
198    /// `T`
199    NotUnion,
200    /// `[T]`
201    Single,
202    /// `[null, T]`
203    NullLeft,
204    /// `[T, null]`
205    NullRight,
206}
207
208/// A trait that assists code reuse between `validate` and `encode`.
209/// * For `validate`, the inputs are (RisingWave type, ProtoBuf type).
210/// * For `encode`, the inputs are (RisingWave type, RisingWave data, ProtoBuf type).
211///
212/// Thus we impl [`MaybeData`] for both [`()`] and [`DatumRef`].
213trait MaybeData: std::fmt::Debug {
214    type Out;
215
216    fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out>;
217
218    /// Switch to `RecordSchema` after #12562
219    fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
220
221    fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out>;
222
223    fn on_map(
224        self,
225        value_type: &DataType,
226        avro_value_schema: &AvroSchema,
227        refs: &NamesRef,
228    ) -> Result<Self::Out>;
229
230    fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out>;
231}
232
233impl MaybeData for () {
234    type Out = ();
235
236    fn on_base(self, _: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
237        Ok(self)
238    }
239
240    fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
241        validate_fields(st.iter(), avro, refs)
242    }
243
244    fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
245        on_field(elem, (), avro, refs)
246    }
247
248    fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
249        on_field(elem, (), avro, refs)
250    }
251
252    fn handle_nullable_union(out: Self::Out, _: OptIdx) -> Result<Self::Out> {
253        Ok(out)
254    }
255}
256
257impl MaybeData for DatumRef<'_> {
258    type Out = Value;
259
260    fn on_base(self, f: impl FnOnce(ScalarRefImpl<'_>) -> Result<Value>) -> Result<Self::Out> {
261        match self {
262            Some(s) => f(s),
263            None => Ok(Value::Null),
264        }
265    }
266
267    fn on_struct(self, st: &StructType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
268        let d = match self {
269            Some(s) => s.into_struct(),
270            None => return Ok(Value::Null),
271        };
272        let record = encode_fields(st.iter().zip_eq_debug(d.iter_fields_ref()), avro, refs)?;
273        Ok(record.into())
274    }
275
276    fn on_list(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
277        let d = match self {
278            Some(s) => s.into_list(),
279            None => return Ok(Value::Null),
280        };
281        let vs = d
282            .iter()
283            .map(|d| on_field(elem, d, avro, refs))
284            .try_collect()?;
285        Ok(Value::Array(vs))
286    }
287
288    fn on_map(self, elem: &DataType, avro: &AvroSchema, refs: &NamesRef) -> Result<Self::Out> {
289        let d = match self {
290            Some(s) => s.into_map(),
291            None => return Ok(Value::Null),
292        };
293        let vs = d
294            .iter()
295            .map(|(k, v)| {
296                let k = k.into_utf8().to_owned();
297                let v = on_field(elem, v, avro, refs)?;
298                Ok((k, v))
299            })
300            .try_collect()?;
301        Ok(Value::Map(vs))
302    }
303
304    fn handle_nullable_union(out: Self::Out, opt_idx: OptIdx) -> Result<Self::Out> {
305        use OptIdx::*;
306
307        match out == Value::Null {
308            true => {
309                let ni = match opt_idx {
310                    NotUnion | Single => {
311                        return Err(FieldEncodeError::new("found null but required"));
312                    }
313                    NullLeft => 0,
314                    NullRight => 1,
315                };
316                Ok(Value::Union(ni, out.into()))
317            }
318            false => {
319                let vi = match opt_idx {
320                    NotUnion => return Ok(out),
321                    NullLeft => 1,
322                    Single | NullRight => 0,
323                };
324                Ok(Value::Union(vi, out.into()))
325            }
326        }
327    }
328}
329
330fn validate_fields<'rw>(
331    rw_fields: impl Iterator<Item = (&'rw str, &'rw DataType)>,
332    avro: &AvroSchema,
333    refs: &NamesRef,
334) -> Result<()> {
335    let avro = refs.lookup(avro);
336    let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else {
337        return Err(FieldEncodeError::new(format!(
338            "expect avro record but got {}",
339            avro.canonical_form(),
340        )));
341    };
342    let mut present = vec![false; fields.len()];
343    for (name, t) in rw_fields {
344        let Some(&idx) = lookup.get(name) else {
345            return Err(FieldEncodeError::new("field not in avro").with_name(name));
346        };
347        present[idx] = true;
348        let avro_field = &fields[idx];
349        on_field(t, (), &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
350    }
351    for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
352        if p {
353            continue;
354        }
355        if !avro_field.is_nullable() {
356            return Err(
357                FieldEncodeError::new("field not present but required").with_name(&avro_field.name)
358            );
359        }
360    }
361    Ok(())
362}
363
364fn encode_fields<'avro, 'rw>(
365    fields_with_datums: impl Iterator<Item = ((&'rw str, &'rw DataType), DatumRef<'rw>)>,
366    schema: &'avro AvroSchema,
367    refs: &'avro NamesRef,
368) -> Result<Record<'avro>> {
369    let schema = refs.lookup(schema);
370    let mut record = Record::new(schema).unwrap();
371    let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else {
372        unreachable!()
373    };
374    let mut present = vec![false; fields.len()];
375    for ((name, t), d) in fields_with_datums {
376        let idx = lookup[name];
377        present[idx] = true;
378        let avro_field = &fields[idx];
379        let value = on_field(t, d, &avro_field.schema, refs).map_err(|e| e.with_name(name))?;
380        record.put(name, value);
381    }
382    // Unfortunately, the upstream `apache_avro` does not handle missing fields as nullable correctly.
383    // The correct encoding is `Value::Union(null_index, Value::Null)` but it simply writes `Value::Null`.
384    // See [`tests::test_encode_avro_lib_bug`].
385    for (p, avro_field) in present.into_iter().zip_eq_fast(fields) {
386        if p {
387            continue;
388        }
389        let AvroSchema::Union(u) = &avro_field.schema else {
390            unreachable!()
391        };
392        // We could have saved null index of each field during [`validate_fields`] to avoid repeated lookup.
393        // But in most cases it is the 0th.
394        // Alternatively, we can simplify by enforcing the best practice of `null at 0th`.
395        let ni = u
396            .variants()
397            .iter()
398            .position(|a| a == &AvroSchema::Null)
399            .unwrap();
400        record.put(
401            &avro_field.name,
402            Value::Union(ni.try_into().unwrap(), Value::Null.into()),
403        );
404    }
405    Ok(record)
406}
407
408/// Handles both `validate` (without actual data) and `encode`.
409/// See [`MaybeData`] for more info.
410fn on_field<D: MaybeData>(
411    data_type: &DataType,
412    maybe: D,
413    expected: &AvroSchema,
414    refs: &NamesRef,
415) -> Result<D::Out> {
416    use risingwave_common::types::Interval;
417
418    let no_match_err = || {
419        Err(FieldEncodeError::new(format!(
420            "cannot encode {} column as {} field",
421            data_type,
422            expected.canonical_form()
423        )))
424    };
425
426    // For now, we only support optional single type, rather than general union.
427    // For example, how do we encode int16 into avro `["int", "long"]`?
428    let (inner, opt_idx) = match expected {
429        AvroSchema::Union(union) => match union.variants() {
430            [] => return no_match_err(),
431            [one] => (one, OptIdx::Single),
432            [AvroSchema::Null, r] => (r, OptIdx::NullLeft),
433            [l, AvroSchema::Null] => (l, OptIdx::NullRight),
434            _ => return no_match_err(),
435        },
436        _ => (expected, OptIdx::NotUnion),
437    };
438
439    let inner = refs.lookup(inner);
440
441    let value = match &data_type {
442        // Group A: perfect match between RisingWave types and Avro types
443        DataType::Boolean => match inner {
444            AvroSchema::Boolean => maybe.on_base(|s| Ok(Value::Boolean(s.into_bool())))?,
445            _ => return no_match_err(),
446        },
447        DataType::Varchar => match inner {
448            AvroSchema::String => maybe.on_base(|s| Ok(Value::String(s.into_utf8().into())))?,
449            _ => return no_match_err(),
450        },
451        DataType::Bytea => match inner {
452            AvroSchema::Bytes => maybe.on_base(|s| Ok(Value::Bytes(s.into_bytea().into())))?,
453            _ => return no_match_err(),
454        },
455        DataType::Float32 => match inner {
456            AvroSchema::Float => maybe.on_base(|s| Ok(Value::Float(s.into_float32().into())))?,
457            _ => return no_match_err(),
458        },
459        DataType::Float64 => match inner {
460            AvroSchema::Double => maybe.on_base(|s| Ok(Value::Double(s.into_float64().into())))?,
461            _ => return no_match_err(),
462        },
463        DataType::Int32 => match inner {
464            AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int32())))?,
465            _ => return no_match_err(),
466        },
467        DataType::Int64 => match inner {
468            AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_int64())))?,
469            _ => return no_match_err(),
470        },
471        DataType::Serial => match inner {
472            AvroSchema::Long => maybe.on_base(|s| Ok(Value::Long(s.into_serial().into_inner())))?,
473            _ => return no_match_err(),
474        },
475        DataType::Struct(st) => match inner {
476            AvroSchema::Record { .. } => maybe.on_struct(st, inner, refs)?,
477            _ => return no_match_err(),
478        },
479        DataType::List(elem) => match inner {
480            AvroSchema::Array(avro_elem) => maybe.on_list(elem, avro_elem, refs)?,
481            _ => return no_match_err(),
482        },
483        DataType::Map(m) => {
484            if *m.key() != DataType::Varchar {
485                return no_match_err();
486            }
487            match inner {
488                AvroSchema::Map(avro_value_type) => {
489                    maybe.on_map(m.value(), avro_value_type, refs)?
490                }
491                _ => return no_match_err(),
492            }
493        }
494
495        // Group B: match between RisingWave types and Avro logical types
496        DataType::Timestamptz => match inner {
497            AvroSchema::TimestampMicros => maybe.on_base(|s| {
498                Ok(Value::TimestampMicros(
499                    s.into_timestamptz().timestamp_micros(),
500                ))
501            })?,
502            AvroSchema::TimestampMillis => maybe.on_base(|s| {
503                Ok(Value::TimestampMillis(
504                    s.into_timestamptz().timestamp_millis(),
505                ))
506            })?,
507            _ => return no_match_err(),
508        },
509        DataType::Timestamp => return no_match_err(),
510        DataType::Date => match inner {
511            AvroSchema::Date => {
512                maybe.on_base(|s| Ok(Value::Date(s.into_date().get_nums_days_unix_epoch())))?
513            }
514            _ => return no_match_err(),
515        },
516        DataType::Time => match inner {
517            AvroSchema::TimeMicros => {
518                maybe.on_base(|s| Ok(Value::TimeMicros(Interval::from(s.into_time()).usecs())))?
519            }
520            AvroSchema::TimeMillis => maybe.on_base(|s| {
521                Ok(Value::TimeMillis(
522                    (Interval::from(s.into_time()).usecs() / 1000)
523                        .try_into()
524                        .unwrap(),
525                ))
526            })?,
527            _ => return no_match_err(),
528        },
529        DataType::Interval => match inner {
530            AvroSchema::Duration => maybe.on_base(|s| {
531                use apache_avro::{Days, Duration, Millis, Months};
532                let iv = s.into_interval();
533
534                let overflow = |_| FieldEncodeError::new(format!("{iv} overflows avro duration"));
535
536                Ok(Value::Duration(Duration::new(
537                    Months::new(iv.months().try_into().map_err(overflow)?),
538                    Days::new(iv.days().try_into().map_err(overflow)?),
539                    Millis::new((iv.usecs() / 1000).try_into().map_err(overflow)?),
540                )))
541            })?,
542            _ => return no_match_err(),
543        },
544        // Group C: experimental
545        DataType::Int16 => match inner {
546            AvroSchema::Int => maybe.on_base(|s| Ok(Value::Int(s.into_int16() as i32)))?,
547            _ => return no_match_err(),
548        },
549        DataType::Decimal => match inner {
550            AvroSchema::Decimal(decimal_schema) => {
551                maybe.on_base(|s| {
552                    match s.into_decimal() {
553                        risingwave_common::types::Decimal::Normalized(decimal) => {
554                            // convert to bigint with scale
555                            // rescale the rust_decimal to the scale of the avro decimal
556                            //
557                            // From bigdecimal::BigDecimal::with_scale:
558                            // If the new_scale is lower than the current value (indicating a larger
559                            // power of 10), digits will be dropped (as precision is lower)
560                            let signed_bigint_bytes =
561                                rust_decimal_to_scaled_bigint(decimal, decimal_schema.scale)
562                                    .map_err(FieldEncodeError::new)?;
563                            Ok(Value::Decimal(apache_avro::Decimal::from(
564                                &signed_bigint_bytes,
565                            )))
566                        }
567                        d @ risingwave_common::types::Decimal::NaN
568                        | d @ risingwave_common::types::Decimal::NegativeInf
569                        | d @ risingwave_common::types::Decimal::PositiveInf => {
570                            Err(FieldEncodeError::new(format!(
571                                "Avro Decimal does not support NaN or Inf, but got {}",
572                                d
573                            )))
574                        }
575                    }
576                })?
577            }
578            _ => return no_match_err(),
579        },
580        DataType::Jsonb => match inner {
581            AvroSchema::String => {
582                maybe.on_base(|s| Ok(Value::String(s.into_jsonb().to_string())))?
583            }
584            _ => return no_match_err(),
585        },
586        DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
587        // Group D: unsupported
588        DataType::Int256 => {
589            return no_match_err();
590        }
591    };
592
593    D::handle_nullable_union(value, opt_idx)
594}
595
596#[cfg(test)]
597mod tests {
598    use std::collections::HashMap;
599    use std::str::FromStr;
600
601    use expect_test::expect;
602    use itertools::Itertools;
603    use risingwave_common::array::{ArrayBuilder, MapArrayBuilder};
604    use risingwave_common::catalog::Field;
605    use risingwave_common::row::OwnedRow;
606    use risingwave_common::types::{
607        Date, Datum, Interval, JsonbVal, ListValue, MapType, MapValue, Scalar, ScalarImpl,
608        StructValue, Time, Timestamptz, ToDatumRef,
609    };
610
611    use super::*;
612
613    #[track_caller]
614    fn test_ok(rw_type: &DataType, rw_datum: Datum, avro_type: &str, expected: Value) {
615        let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
616        let refs = NamesRef::new(&avro_schema).unwrap();
617        let actual = on_field(rw_type, rw_datum.to_datum_ref(), &avro_schema, &refs).unwrap();
618        assert_eq!(actual, expected);
619    }
620
621    #[track_caller]
622    fn test_err<D: MaybeData>(t: &DataType, d: D, avro: &str, expected: &str)
623    where
624        D::Out: std::fmt::Debug,
625    {
626        let avro_schema = AvroSchema::parse_str(avro).unwrap();
627        let refs = NamesRef::new(&avro_schema).unwrap();
628        let err = on_field(t, d, &avro_schema, &refs).unwrap_err();
629        assert_eq!(err.to_string(), expected);
630    }
631
632    #[track_caller]
633    fn test_v2(rw_type: &str, rw_scalar: &str, avro_type: &str, expected: expect_test::Expect) {
634        let avro_schema = AvroSchema::parse_str(avro_type).unwrap();
635        let refs = NamesRef::new(&avro_schema).unwrap();
636        let rw_type = DataType::from_str(rw_type).unwrap();
637        let rw_datum = ScalarImpl::from_text_for_test(rw_scalar, &rw_type).unwrap();
638
639        if let Err(validate_err) = on_field(&rw_type, (), &avro_schema, &refs) {
640            expected.assert_debug_eq(&validate_err);
641            return;
642        }
643        let actual = on_field(&rw_type, Some(rw_datum).to_datum_ref(), &avro_schema, &refs);
644        match actual {
645            Ok(v) => expected.assert_eq(&print_avro_value(&v)),
646            Err(e) => expected.assert_debug_eq(&e),
647        }
648    }
649
650    fn print_avro_value(v: &Value) -> String {
651        match v {
652            Value::Map(m) => {
653                let mut res = "Map({".to_owned();
654                for (k, v) in m.iter().sorted_by_key(|x| x.0) {
655                    res.push_str(&format!("{}: {}, ", k, print_avro_value(v)));
656                }
657                res.push_str("})");
658                res
659            }
660            _ => format!("{v:?}"),
661        }
662    }
663
664    #[test]
665    fn test_encode_v2() {
666        test_v2(
667            "boolean",
668            "false",
669            r#""int""#,
670            expect![[r#"
671                FieldEncodeError {
672                    message: "cannot encode boolean column as \"int\" field",
673                    rev_path: [],
674                }
675            "#]],
676        );
677        test_v2("boolean", "true", r#""boolean""#, expect!["Boolean(true)"]);
678
679        test_v2(
680            "map(varchar,varchar)",
681            "{1:1,2:2,3:3}",
682            r#"{"type": "map","values": "string"}"#,
683            expect![[r#"Map({1: String("1"), 2: String("2"), 3: String("3"), })"#]],
684        );
685
686        test_v2(
687            "map(varchar,varchar)",
688            "{1:1,2:NULL,3:3}",
689            r#"{"type": "map","values": "string"}"#,
690            expect![[r#"
691                FieldEncodeError {
692                    message: "found null but required",
693                    rev_path: [],
694                }
695            "#]],
696        );
697
698        test_v2(
699            "map(varchar,varchar)",
700            "{1:1,2:NULL,3:3}",
701            r#"{"type": "map","values": ["null", "string"]}"#,
702            expect![[
703                r#"Map({1: Union(1, String("1")), 2: Union(0, Null), 3: Union(1, String("3")), })"#
704            ]],
705        );
706
707        test_v2(
708            "map(int,varchar)",
709            "{1:1,2:NULL,3:3}",
710            r#"{"type": "map","values": ["null", "string"]}"#,
711            expect![[r#"
712                FieldEncodeError {
713                    message: "cannot encode map(integer,character varying) column as {\"type\":\"map\",\"values\":[\"null\",\"string\"]} field",
714                    rev_path: [],
715                }
716            "#]],
717        );
718    }
719
720    #[test]
721    fn test_encode_avro_ok() {
722        test_ok(
723            &DataType::Boolean,
724            Some(ScalarImpl::Bool(false)),
725            r#""boolean""#,
726            Value::Boolean(false),
727        );
728
729        test_ok(
730            &DataType::Varchar,
731            Some(ScalarImpl::Utf8("RisingWave".into())),
732            r#""string""#,
733            Value::String("RisingWave".into()),
734        );
735
736        test_ok(
737            &DataType::Bytea,
738            Some(ScalarImpl::Bytea([0xbe, 0xef].into())),
739            r#""bytes""#,
740            Value::Bytes([0xbe, 0xef].into()),
741        );
742
743        test_ok(
744            &DataType::Float32,
745            Some(ScalarImpl::Float32(3.5f32.into())),
746            r#""float""#,
747            Value::Float(3.5f32),
748        );
749
750        test_ok(
751            &DataType::Float64,
752            Some(ScalarImpl::Float64(4.25f64.into())),
753            r#""double""#,
754            Value::Double(4.25f64),
755        );
756
757        test_ok(
758            &DataType::Int32,
759            Some(ScalarImpl::Int32(16)),
760            r#""int""#,
761            Value::Int(16),
762        );
763
764        test_ok(
765            &DataType::Int64,
766            Some(ScalarImpl::Int64(i64::MAX)),
767            r#""long""#,
768            Value::Long(i64::MAX),
769        );
770
771        test_ok(
772            &DataType::Serial,
773            Some(ScalarImpl::Serial(i64::MAX.into())),
774            r#""long""#,
775            Value::Long(i64::MAX),
776        );
777
778        let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
779        test_ok(
780            &DataType::Timestamptz,
781            Some(ScalarImpl::Timestamptz(tstz)),
782            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
783            Value::TimestampMicros(tstz.timestamp_micros()),
784        );
785        test_ok(
786            &DataType::Timestamptz,
787            Some(ScalarImpl::Timestamptz(tstz)),
788            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
789            Value::TimestampMillis(tstz.timestamp_millis()),
790        );
791
792        test_ok(
793            &DataType::Date,
794            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 2))),
795            r#"{"type": "int", "logicalType": "date"}"#,
796            Value::Date(1),
797        );
798
799        let tm = Time::from_num_seconds_from_midnight_uncheck(1000, 0);
800        test_ok(
801            &DataType::Time,
802            Some(ScalarImpl::Time(tm)),
803            r#"{"type": "long", "logicalType": "time-micros"}"#,
804            Value::TimeMicros(1000 * 1_000_000),
805        );
806        test_ok(
807            &DataType::Time,
808            Some(ScalarImpl::Time(tm)),
809            r#"{"type": "int", "logicalType": "time-millis"}"#,
810            Value::TimeMillis(1000 * 1000),
811        );
812
813        test_ok(
814            &DataType::Int16,
815            Some(ScalarImpl::Int16(i16::MAX)),
816            r#""int""#,
817            Value::Int(i16::MAX as i32),
818        );
819
820        test_ok(
821            &DataType::Int16,
822            Some(ScalarImpl::Int16(i16::MIN)),
823            r#""int""#,
824            Value::Int(i16::MIN as i32),
825        );
826
827        test_ok(
828            &DataType::Jsonb,
829            Some(ScalarImpl::Jsonb(
830                JsonbVal::from_str(r#"{"a": 1}"#).unwrap(),
831            )),
832            r#""string""#,
833            Value::String(r#"{"a": 1}"#.into()),
834        );
835
836        test_ok(
837            &DataType::Interval,
838            Some(ScalarImpl::Interval(Interval::from_month_day_usec(
839                13, 2, 1000000,
840            ))),
841            r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
842            Value::Duration(apache_avro::Duration::new(
843                apache_avro::Months::new(13),
844                apache_avro::Days::new(2),
845                apache_avro::Millis::new(1000),
846            )),
847        );
848
849        let mut inner_map_array_builder = MapArrayBuilder::with_type(
850            2,
851            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
852        );
853        inner_map_array_builder.append(Some(
854            MapValue::try_from_kv(
855                ListValue::from_iter(["a", "b"]),
856                ListValue::from_iter([1, 2]),
857            )
858            .unwrap()
859            .as_scalar_ref(),
860        ));
861        inner_map_array_builder.append(Some(
862            MapValue::try_from_kv(
863                ListValue::from_iter(["c", "d"]),
864                ListValue::from_iter([3, 4]),
865            )
866            .unwrap()
867            .as_scalar_ref(),
868        ));
869        let inner_map_array = inner_map_array_builder.finish();
870        test_ok(
871            &DataType::Map(MapType::from_kv(
872                DataType::Varchar,
873                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Int32)),
874            )),
875            Some(ScalarImpl::Map(
876                MapValue::try_from_kv(
877                    ListValue::from_iter(["k1", "k2"]),
878                    ListValue::new(inner_map_array.into()),
879                )
880                .unwrap(),
881            )),
882            r#"{"type": "map","values": {"type": "map","values": "int"}}"#,
883            Value::Map(HashMap::from_iter([
884                (
885                    "k1".into(),
886                    Value::Map(HashMap::from_iter([
887                        ("a".into(), Value::Int(1)),
888                        ("b".into(), Value::Int(2)),
889                    ])),
890                ),
891                (
892                    "k2".into(),
893                    Value::Map(HashMap::from_iter([
894                        ("c".into(), Value::Int(3)),
895                        ("d".into(), Value::Int(4)),
896                    ])),
897                ),
898            ])),
899        );
900
901        test_ok(
902            &DataType::Struct(StructType::new(vec![
903                (
904                    "p",
905                    DataType::Struct(StructType::new(vec![
906                        ("x", DataType::Int32),
907                        ("y", DataType::Int32),
908                    ])),
909                ),
910                (
911                    "q",
912                    DataType::Struct(StructType::new(vec![
913                        ("x", DataType::Int32),
914                        ("y", DataType::Int32),
915                    ])),
916                ),
917            ])),
918            Some(ScalarImpl::Struct(StructValue::new(vec![
919                Some(ScalarImpl::Struct(StructValue::new(vec![
920                    Some(ScalarImpl::Int32(-2)),
921                    Some(ScalarImpl::Int32(-1)),
922                ]))),
923                Some(ScalarImpl::Struct(StructValue::new(vec![
924                    Some(ScalarImpl::Int32(2)),
925                    Some(ScalarImpl::Int32(1)),
926                ]))),
927            ]))),
928            r#"{
929                "type": "record",
930                "name": "Segment",
931                "fields": [
932                    {
933                        "name": "p",
934                        "type": {
935                            "type": "record",
936                            "name": "Point",
937                            "fields": [
938                                {
939                                    "name": "x",
940                                    "type": "int"
941                                },
942                                {
943                                    "name": "y",
944                                    "type": "int"
945                                }
946                            ]
947                        }
948                    },
949                    {
950                        "name": "q",
951                        "type": "Point"
952                    }
953                ]
954            }"#,
955            Value::Record(vec![
956                (
957                    "p".to_owned(),
958                    Value::Record(vec![
959                        ("x".to_owned(), Value::Int(-2)),
960                        ("y".to_owned(), Value::Int(-1)),
961                    ]),
962                ),
963                (
964                    "q".to_owned(),
965                    Value::Record(vec![
966                        ("x".to_owned(), Value::Int(2)),
967                        ("y".to_owned(), Value::Int(1)),
968                    ]),
969                ),
970            ]),
971        );
972
973        // Test complex JSON with nested structures - using serde_json::Value comparison
974        let complex_json = r#"{
975            "person": {
976                "name": "John Doe",
977                "age": 30,
978                "address": {
979                    "street": "123 Main St.",
980                    "city": "New York",
981                    "coordinates": [40.7128, -74.0060]
982                },
983                "contacts": [
984                    {"type": "email", "value": "john@example.com"},
985                    {"type": "phone", "value": "+1-555-123-4567"}
986                ],
987                "active": true,
988                "preferences": {
989                    "notifications": true,
990                    "theme": "dark",
991                    "languages": ["en", "es"],
992                    "lastLogin": null
993                },
994                "tags": ["premium", "verified"],
995                "unicode_test": "Hello, δΈ–η•Œ! 🌍"
996            }
997        }"#;
998
999        let input_json = JsonbVal::from_str(complex_json).unwrap();
1000        let result = on_field(
1001            &DataType::Jsonb,
1002            Some(ScalarImpl::Jsonb(input_json.clone())).to_datum_ref(),
1003            &AvroSchema::parse_str(r#""string""#).unwrap(),
1004            &NamesRef::new(&AvroSchema::parse_str(r#""string""#).unwrap()).unwrap(),
1005        )
1006        .unwrap();
1007
1008        // Compare as parsed JSON values to handle key order randomness
1009        if let Value::String(result_str) = result {
1010            let expected_json: serde_json::Value = serde_json::from_str(complex_json).unwrap();
1011            let actual_json: serde_json::Value = serde_json::from_str(&result_str).unwrap();
1012            assert_eq!(
1013                expected_json, actual_json,
1014                "JSON values should be equivalent regardless of key order"
1015            );
1016        } else {
1017            panic!("Expected String value");
1018        };
1019    }
1020
1021    #[test]
1022    fn test_encode_avro_err() {
1023        test_err(
1024            &DataType::Interval,
1025            Some(ScalarRefImpl::Interval(Interval::from_month_day_usec(
1026                -1,
1027                -1,
1028                i64::MAX,
1029            ))),
1030            r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
1031            "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
1032        );
1033
1034        let avro_schema = AvroSchema::parse_str(
1035            r#"{"type": "record", "name": "Root", "fields": [
1036                {"name": "f0", "type": "int"}
1037            ]}"#,
1038        )
1039        .unwrap();
1040        let mut record = Record::new(&avro_schema).unwrap();
1041        record.put("f0", Value::String("2".into()));
1042        let res: SinkResult<Vec<u8>> = AvroEncoded {
1043            value: Value::from(record),
1044            schema: Arc::new(avro_schema),
1045            header: AvroHeader::ConfluentSchemaRegistry(42),
1046        }
1047        .ser_to();
1048        assert_eq!(
1049            res.unwrap_err().to_string(),
1050            "Encode error: Value does not match schema"
1051        );
1052    }
1053
1054    #[test]
1055    fn test_encode_avro_record() {
1056        let avro_schema = AvroSchema::parse_str(
1057            r#"{
1058                "type": "record",
1059                "name": "Root",
1060                "fields": [
1061                    {"name": "req", "type": "int"},
1062                    {"name": "opt", "type": ["null", "long"]}
1063                ]
1064            }"#,
1065        )
1066        .unwrap();
1067        let avro_schema = Arc::new(avro_schema);
1068        let header = AvroHeader::None;
1069
1070        let schema = Schema::new(vec![
1071            Field::with_name(DataType::Int64, "opt"),
1072            Field::with_name(DataType::Int32, "req"),
1073        ]);
1074        let row = OwnedRow::new(vec![
1075            Some(ScalarImpl::Int64(31)),
1076            Some(ScalarImpl::Int32(15)),
1077        ]);
1078        let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1079        let actual = encoder.encode(row).unwrap();
1080        assert_eq!(
1081            actual.value,
1082            Value::Record(vec![
1083                ("req".into(), Value::Int(15)),
1084                ("opt".into(), Value::Union(1, Value::Long(31).into())),
1085            ])
1086        );
1087
1088        let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
1089        let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
1090        let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap();
1091        let actual = encoder.encode(row).unwrap();
1092        assert_eq!(
1093            actual.value,
1094            Value::Record(vec![
1095                ("req".into(), Value::Int(15)),
1096                ("opt".into(), Value::Union(0, Value::Null.into())),
1097            ])
1098        );
1099
1100        let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1101        let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1102            panic!()
1103        };
1104        assert_eq!(
1105            err.to_string(),
1106            "Encode error: encode 'req' error: field not present but required"
1107        );
1108
1109        let schema = Schema::new(vec![
1110            Field::with_name(DataType::Int64, "opt"),
1111            Field::with_name(DataType::Int32, "req"),
1112            Field::with_name(DataType::Varchar, "extra"),
1113        ]);
1114        let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else {
1115            panic!()
1116        };
1117        assert_eq!(
1118            err.to_string(),
1119            "Encode error: encode 'extra' error: field not in avro"
1120        );
1121
1122        let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
1123        let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
1124        let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else {
1125            panic!()
1126        };
1127        assert_eq!(
1128            err.to_string(),
1129            r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
1130        );
1131
1132        test_err(
1133            &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
1134            (),
1135            r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
1136            "encode 'f0' error: cannot encode boolean column as \"int\" field",
1137        );
1138    }
1139
1140    #[test]
1141    fn test_encode_avro_array() {
1142        let avro_schema = r#"{
1143            "type": "array",
1144            "items": "int"
1145        }"#;
1146
1147        test_ok(
1148            &DataType::List(DataType::Int32.into()),
1149            Some(ScalarImpl::List(ListValue::from_iter([4, 5]))),
1150            avro_schema,
1151            Value::Array(vec![Value::Int(4), Value::Int(5)]),
1152        );
1153
1154        test_err(
1155            &DataType::List(DataType::Int32.into()),
1156            Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
1157            avro_schema,
1158            "encode '' error: found null but required",
1159        );
1160
1161        test_ok(
1162            &DataType::List(DataType::Int32.into()),
1163            Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))),
1164            r#"{
1165                "type": "array",
1166                "items": ["null", "int"]
1167            }"#,
1168            Value::Array(vec![
1169                Value::Union(1, Value::Int(4).into()),
1170                Value::Union(0, Value::Null.into()),
1171            ]),
1172        );
1173
1174        test_ok(
1175            &DataType::List(DataType::List(DataType::Int32.into()).into()),
1176            Some(ScalarImpl::List(ListValue::from_iter([
1177                ListValue::from_iter([26, 29]),
1178                ListValue::from_iter([46, 49]),
1179            ]))),
1180            r#"{
1181                "type": "array",
1182                "items": {
1183                    "type": "array",
1184                    "items": "int"
1185                }
1186            }"#,
1187            Value::Array(vec![
1188                Value::Array(vec![Value::Int(26), Value::Int(29)]),
1189                Value::Array(vec![Value::Int(46), Value::Int(49)]),
1190            ]),
1191        );
1192
1193        test_err(
1194            &DataType::List(DataType::Boolean.into()),
1195            (),
1196            r#"{"type": "array", "items": "int"}"#,
1197            "encode '' error: cannot encode boolean column as \"int\" field",
1198        );
1199    }
1200
1201    #[test]
1202    fn test_encode_avro_union() {
1203        let t = &DataType::Timestamptz;
1204        let datum = Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(1500)));
1205        let opt_micros = r#"["null", {"type": "long", "logicalType": "timestamp-micros"}]"#;
1206        let opt_millis = r#"["null", {"type": "long", "logicalType": "timestamp-millis"}]"#;
1207        let both = r#"[{"type": "long", "logicalType": "timestamp-millis"}, {"type": "long", "logicalType": "timestamp-micros"}]"#;
1208        let empty = "[]";
1209        let one = r#"[{"type": "long", "logicalType": "timestamp-millis"}]"#;
1210        let right = r#"[{"type": "long", "logicalType": "timestamp-millis"}, "null"]"#;
1211
1212        test_ok(
1213            t,
1214            datum.clone(),
1215            opt_micros,
1216            Value::Union(1, Value::TimestampMicros(1500).into()),
1217        );
1218        test_ok(t, None, opt_micros, Value::Union(0, Value::Null.into()));
1219        test_ok(
1220            t,
1221            datum.clone(),
1222            opt_millis,
1223            Value::Union(1, Value::TimestampMillis(1).into()),
1224        );
1225        test_ok(t, None, opt_millis, Value::Union(0, Value::Null.into()));
1226
1227        test_err(
1228            t,
1229            datum.to_datum_ref(),
1230            both,
1231            r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
1232        );
1233
1234        test_err(
1235            t,
1236            datum.to_datum_ref(),
1237            empty,
1238            "encode '' error: cannot encode timestamp with time zone column as [] field",
1239        );
1240
1241        test_ok(
1242            t,
1243            datum.clone(),
1244            one,
1245            Value::Union(0, Value::TimestampMillis(1).into()),
1246        );
1247        test_err(t, None, one, "encode '' error: found null but required");
1248
1249        test_ok(
1250            t,
1251            datum.clone(),
1252            right,
1253            Value::Union(0, Value::TimestampMillis(1).into()),
1254        );
1255        test_ok(t, None, right, Value::Union(1, Value::Null.into()));
1256    }
1257
1258    /// This just demonstrates bugs of the upstream [`apache_avro`], rather than our encoder.
1259    /// The encoder is not using these buggy calls and is already tested above.
1260    #[test]
1261    fn test_encode_avro_lib_bug() {
1262        use apache_avro::{Reader, Writer};
1263
1264        // a record with 2 optional int fields
1265        let avro_schema = AvroSchema::parse_str(
1266            r#"{
1267                "type": "record",
1268                "name": "Root",
1269                "fields": [
1270                    {
1271                        "name": "f0",
1272                        "type": ["null", "int"]
1273                    },
1274                    {
1275                        "name": "f1",
1276                        "type": ["null", "int"]
1277                    }
1278                ]
1279            }"#,
1280        )
1281        .unwrap();
1282
1283        let mut writer = Writer::new(&avro_schema, Vec::new());
1284        let mut record = Record::new(writer.schema()).unwrap();
1285        // f0 omitted, f1 = Int(3)
1286        record.put("f1", Value::Int(3));
1287        writer.append(record).unwrap();
1288        let encoded = writer.into_inner().unwrap();
1289        // writing produced no error, but read fails
1290        let reader = Reader::new(encoded.as_slice()).unwrap();
1291        for value in reader {
1292            assert_eq!(
1293                value.unwrap_err().to_string(),
1294                "Union index 3 out of bounds: 2"
1295            );
1296        }
1297
1298        let mut writer = Writer::new(&avro_schema, Vec::new());
1299        let mut record = Record::new(writer.schema()).unwrap();
1300        // f0 omitted, f1 = Union(1, Int(3))
1301        record.put("f1", Value::Union(1, Value::Int(3).into()));
1302        writer.append(record).unwrap();
1303        let encoded = writer.into_inner().unwrap();
1304        // writing produced no error, but read returns wrong value
1305        let reader = Reader::new(encoded.as_slice()).unwrap();
1306        for value in reader {
1307            assert_eq!(
1308                value.unwrap(),
1309                Value::Record(vec![
1310                    ("f0".into(), Value::Union(1, Value::Int(3).into())),
1311                    ("f1".into(), Value::Union(0, Value::Null.into())),
1312                ])
1313            );
1314        }
1315    }
1316}