risingwave_connector_codec/decoder/avro/
schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use anyhow::Context;
18use apache_avro::AvroResult;
19use apache_avro::schema::{DecimalSchema, NamesRef, RecordSchema, ResolvedSchema, Schema};
20use itertools::Itertools;
21use risingwave_common::catalog::Field;
22use risingwave_common::error::NotImplemented;
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{DataType, Decimal, MapType, StructType};
25use risingwave_common::{bail, bail_not_implemented};
26
27use super::get_nullable_union_inner;
28
29/// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved.
30///
31/// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet).
32///
33/// TODO: refactor avro lib to use the feature there.
34#[derive(Debug)]
35pub struct ResolvedAvroSchema {
36    /// Should be used for parsing bytes into Avro value
37    pub original_schema: Arc<Schema>,
38}
39
40impl ResolvedAvroSchema {
41    pub fn create(schema: Arc<Schema>) -> AvroResult<Self> {
42        Ok(Self {
43            original_schema: schema,
44        })
45    }
46}
47
48/// How to convert the map type from the input encoding to RisingWave's datatype.
49///
50/// XXX: Should this be `avro.map.handling.mode`? Can it be shared between Avro and Protobuf?
51#[derive(Debug, Copy, Clone)]
52pub enum MapHandling {
53    Jsonb,
54    Map,
55}
56
57impl MapHandling {
58    pub const OPTION_KEY: &'static str = "map.handling.mode";
59
60    pub fn from_options(
61        options: &std::collections::BTreeMap<String, String>,
62    ) -> anyhow::Result<Option<Self>> {
63        let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
64            Some("jsonb") => Self::Jsonb,
65            Some("map") => Self::Map,
66            Some(v) => bail!("unrecognized {} value {}", Self::OPTION_KEY, v),
67            None => return Ok(None),
68        };
69        Ok(Some(mode))
70    }
71}
72
73/// This function expects original schema (with `Ref`).
74/// TODO: change `map_handling` to some `Config`, and also unify debezium.
75pub fn avro_schema_to_fields(
76    schema: &Schema,
77    map_handling: Option<MapHandling>,
78) -> anyhow::Result<Vec<Field>> {
79    let resolved = ResolvedSchema::try_from(schema)?;
80    let mut ancestor_records: Vec<String> = vec![];
81    let root_type = avro_type_mapping(
82        schema,
83        &mut ancestor_records,
84        resolved.get_names(),
85        map_handling,
86    )?;
87    let DataType::Struct(root_struct) = root_type else {
88        bail!("schema invalid, record type required at top level of the schema.");
89    };
90    let fields = root_struct
91        .iter()
92        .map(|(name, data_type)| Field::new(name, data_type.clone()))
93        .collect();
94    Ok(fields)
95}
96
97const DBZ_VARIABLE_SCALE_DECIMAL_NAME: &str = "VariableScaleDecimal";
98const DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE: &str = "io.debezium.data";
99
100/// This function expects original schema (with `Ref`).
101fn avro_type_mapping(
102    schema: &Schema,
103    ancestor_records: &mut Vec<String>,
104    refs: &NamesRef<'_>,
105    map_handling: Option<MapHandling>,
106) -> anyhow::Result<DataType> {
107    let data_type = match schema {
108        Schema::String => DataType::Varchar,
109        Schema::Int => DataType::Int32,
110        Schema::Long => DataType::Int64,
111        Schema::Boolean => DataType::Boolean,
112        Schema::Float => DataType::Float32,
113        Schema::Double => DataType::Float64,
114        Schema::Decimal(DecimalSchema { precision, .. }) => {
115            if *precision > Decimal::MAX_PRECISION.into() {
116                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
117                    LazyLock::new(LogSuppresser::default);
118                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
119                    tracing::warn!(
120                        suppressed_count,
121                        "RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
122                        Decimal::MAX_PRECISION,
123                        precision
124                    );
125                }
126            }
127            DataType::Decimal
128        }
129        Schema::Date => DataType::Date,
130        Schema::LocalTimestampMillis => DataType::Timestamp,
131        Schema::LocalTimestampMicros => DataType::Timestamp,
132        Schema::TimestampMillis => DataType::Timestamptz,
133        Schema::TimestampMicros => DataType::Timestamptz,
134        Schema::Duration => DataType::Interval,
135        Schema::Bytes => DataType::Bytea,
136        Schema::Enum { .. } => DataType::Varchar,
137        Schema::TimeMillis => DataType::Time,
138        Schema::TimeMicros => DataType::Time,
139        Schema::Record(RecordSchema { fields, name, .. }) => {
140            if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
141                && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into())
142            {
143                return Ok(DataType::Decimal);
144            }
145
146            let unique_name = name.fullname(None);
147            if ancestor_records.contains(&unique_name) {
148                bail!(
149                    "circular reference detected in Avro schema: {} -> {}",
150                    ancestor_records.join(" -> "),
151                    unique_name
152                );
153            }
154
155            ancestor_records.push(unique_name);
156            let ty = StructType::new(
157                fields
158                    .iter()
159                    .map(|f| {
160                        Ok((
161                            &f.name,
162                            avro_type_mapping(&f.schema, ancestor_records, refs, map_handling)?,
163                        ))
164                    })
165                    .collect::<anyhow::Result<Vec<_>>>()?,
166            )
167            .into();
168            ancestor_records.pop();
169            ty
170        }
171        Schema::Array(item_schema) => {
172            let item_type =
173                avro_type_mapping(item_schema.as_ref(), ancestor_records, refs, map_handling)?;
174            DataType::List(Box::new(item_type))
175        }
176        Schema::Union(union_schema) => {
177            // Note: Unions may not immediately contain other unions. So a `null` must represent a top-level null.
178            // e.g., ["null", ["null", "string"]] is not allowed
179
180            // Note: Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum.
181            // https://avro.apache.org/docs/1.11.1/specification/_print/#unions
182            debug_assert!(
183                union_schema
184                    .variants()
185                    .iter()
186                    .map(Schema::canonical_form) // Schema doesn't implement Eq, but only PartialEq.
187                    .duplicates()
188                    .next()
189                    .is_none(),
190                "Union contains duplicate types: {union_schema:?}",
191            );
192            match get_nullable_union_inner(union_schema) {
193                Some(inner) => avro_type_mapping(inner, ancestor_records, refs, map_handling)?,
194                None => {
195                    // Convert the union to a struct, each field of the struct represents a variant of the union.
196                    // Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect.
197                    // Note: Avro union's variant tag is type name, not field name (unlike Rust enum, or Protobuf oneof).
198
199                    // XXX: do we need to introduce union.handling.mode?
200                    let fields = union_schema
201                        .variants()
202                        .iter()
203                        // null will mean the whole struct is null
204                        .filter(|variant| !matches!(variant, &&Schema::Null))
205                        .map(|variant| {
206                            avro_type_mapping(variant, ancestor_records, refs, map_handling)
207                                .and_then(|t| {
208                                    let name = avro_schema_to_struct_field_name(variant)?;
209                                    Ok((name, t))
210                                })
211                        })
212                        .try_collect::<_, Vec<_>, _>()
213                        .context("failed to convert Avro union to struct")?;
214
215                    StructType::new(fields).into()
216                }
217            }
218        }
219        Schema::Ref { name } => {
220            if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
221                && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into())
222            {
223                DataType::Decimal
224            } else {
225                avro_type_mapping(
226                    refs[name], // `ResolvedSchema::try_from` already handles lookup failure
227                    ancestor_records,
228                    refs,
229                    map_handling,
230                )?
231            }
232        }
233        Schema::Map(value_schema) => {
234            // TODO: support native map type
235            match map_handling {
236                Some(MapHandling::Jsonb) => {
237                    if supported_avro_to_json_type(value_schema) {
238                        DataType::Jsonb
239                    } else {
240                        bail_not_implemented!(
241                            issue = 16963,
242                            "Avro map type to jsonb: {:?}",
243                            schema
244                        );
245                    }
246                }
247                Some(MapHandling::Map) | None => {
248                    let value = avro_type_mapping(
249                        value_schema.as_ref(),
250                        ancestor_records,
251                        refs,
252                        map_handling,
253                    )
254                    .context("failed to convert Avro map type")?;
255                    DataType::Map(MapType::from_kv(DataType::Varchar, value))
256                }
257            }
258        }
259        Schema::Uuid => DataType::Varchar,
260        Schema::Null | Schema::Fixed(_) => {
261            bail_not_implemented!("Avro type: {:?}", schema);
262        }
263    };
264
265    Ok(data_type)
266}
267
268/// Check for [`super::avro_to_jsonb`]
269fn supported_avro_to_json_type(schema: &Schema) -> bool {
270    match schema {
271        Schema::Null | Schema::Boolean | Schema::Int | Schema::String => true,
272
273        Schema::Map(value_schema) | Schema::Array(value_schema) => {
274            supported_avro_to_json_type(value_schema)
275        }
276        Schema::Record(RecordSchema { fields, .. }) => fields
277            .iter()
278            .all(|f| supported_avro_to_json_type(&f.schema)),
279        Schema::Long
280        | Schema::Float
281        | Schema::Double
282        | Schema::Bytes
283        | Schema::Enum(_)
284        | Schema::Fixed(_)
285        | Schema::Decimal(_)
286        | Schema::Uuid
287        | Schema::Date
288        | Schema::TimeMillis
289        | Schema::TimeMicros
290        | Schema::TimestampMillis
291        | Schema::TimestampMicros
292        | Schema::LocalTimestampMillis
293        | Schema::LocalTimestampMicros
294        | Schema::Duration
295        | Schema::Ref { name: _ }
296        | Schema::Union(_) => false,
297    }
298}
299
300/// The field name when converting Avro union type to RisingWave struct type.
301pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> Result<String, NotImplemented> {
302    Ok(match schema {
303        Schema::Null => unreachable!(),
304        Schema::Union(_) => unreachable!(),
305        // Primitive types
306        Schema::Boolean => "boolean".to_owned(),
307        Schema::Int => "int".to_owned(),
308        Schema::Long => "long".to_owned(),
309        Schema::Float => "float".to_owned(),
310        Schema::Double => "double".to_owned(),
311        Schema::Bytes => "bytes".to_owned(),
312        Schema::String => "string".to_owned(),
313        // Unnamed Complex types
314        Schema::Array(_) => "array".to_owned(),
315        Schema::Map(_) => "map".to_owned(),
316        // Named Complex types
317        Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) | Schema::Record(_) => {
318            // schema.name().unwrap().fullname(None)
319            // See test_avro_lib_union_record_bug
320            // https://github.com/risingwavelabs/risingwave/issues/17632
321            bail_not_implemented!(issue=17632, "Avro named type used in Union type: {:?}", schema)
322
323        }
324
325        // Logical types are currently banned. See https://github.com/risingwavelabs/risingwave/issues/17616
326
327/*
328        Schema::Uuid => "uuid".to_string(),
329        // Decimal is the most tricky. https://avro.apache.org/docs/1.11.1/specification/_print/#decimal
330        // - A decimal logical type annotates Avro bytes _or_ fixed types.
331        // - It has attributes `precision` and `scale`.
332        //  "For the purposes of schema resolution, two schemas that are decimal logical types match if their scales and precisions match."
333        // - When the physical type is fixed, it's a named type. And a schema containing 2 decimals is possible:
334        //   [
335        //     {"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
336        //     {"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
337        //   ]
338        //   In this case (a logical type's physical type is a named type), perhaps we should use the physical type's `name`.
339        Schema::Decimal(_) => "decimal".to_string(),
340        Schema::Date => "date".to_string(),
341        // Note: in Avro, the name style is "time-millis", etc.
342        // But in RisingWave (Postgres), it will require users to use quotes, i.e.,
343        // select (struct)."time-millis", (struct).time_millies from t;
344        // The latter might be more user-friendly.
345        Schema::TimeMillis => "time_millis".to_string(),
346        Schema::TimeMicros => "time_micros".to_string(),
347        Schema::TimestampMillis => "timestamp_millis".to_string(),
348        Schema::TimestampMicros => "timestamp_micros".to_string(),
349        Schema::LocalTimestampMillis => "local_timestamp_millis".to_string(),
350        Schema::LocalTimestampMicros => "local_timestamp_micros".to_string(),
351        Schema::Duration => "duration".to_string(),
352*/
353        Schema::Uuid
354        | Schema::Decimal(_)
355        | Schema::Date
356        | Schema::TimeMillis
357        | Schema::TimeMicros
358        | Schema::TimestampMillis
359        | Schema::TimestampMicros
360        | Schema::LocalTimestampMillis
361        | Schema::LocalTimestampMicros
362        | Schema::Duration => {
363            bail_not_implemented!(issue=17616, "Avro logicalType used in Union type: {:?}", schema)
364        }
365    })
366}