risingwave_connector_codec/decoder/avro/
schema.rs1use std::sync::{Arc, LazyLock};
16
17use anyhow::Context;
18use apache_avro::AvroResult;
19use apache_avro::schema::{DecimalSchema, NamesRef, RecordSchema, ResolvedSchema, Schema};
20use itertools::Itertools;
21use risingwave_common::catalog::Field;
22use risingwave_common::error::NotImplemented;
23use risingwave_common::log::LogSuppresser;
24use risingwave_common::types::{DataType, Decimal, MapType, StructType};
25use risingwave_common::{bail, bail_not_implemented};
26
27use super::get_nullable_union_inner;
28
29#[derive(Debug)]
35pub struct ResolvedAvroSchema {
36 pub original_schema: Arc<Schema>,
38}
39
40impl ResolvedAvroSchema {
41 pub fn create(schema: Arc<Schema>) -> AvroResult<Self> {
42 Ok(Self {
43 original_schema: schema,
44 })
45 }
46}
47
48#[derive(Debug, Copy, Clone)]
52pub enum MapHandling {
53 Jsonb,
54 Map,
55}
56
57impl MapHandling {
58 pub const OPTION_KEY: &'static str = "map.handling.mode";
59
60 pub fn from_options(
61 options: &std::collections::BTreeMap<String, String>,
62 ) -> anyhow::Result<Option<Self>> {
63 let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
64 Some("jsonb") => Self::Jsonb,
65 Some("map") => Self::Map,
66 Some(v) => bail!("unrecognized {} value {}", Self::OPTION_KEY, v),
67 None => return Ok(None),
68 };
69 Ok(Some(mode))
70 }
71}
72
73pub fn avro_schema_to_fields(
76 schema: &Schema,
77 map_handling: Option<MapHandling>,
78) -> anyhow::Result<Vec<Field>> {
79 let resolved = ResolvedSchema::try_from(schema)?;
80 let mut ancestor_records: Vec<String> = vec![];
81 let root_type = avro_type_mapping(
82 schema,
83 &mut ancestor_records,
84 resolved.get_names(),
85 map_handling,
86 )?;
87 let DataType::Struct(root_struct) = root_type else {
88 bail!("schema invalid, record type required at top level of the schema.");
89 };
90 let fields = root_struct
91 .iter()
92 .map(|(name, data_type)| Field::new(name, data_type.clone()))
93 .collect();
94 Ok(fields)
95}
96
97const DBZ_VARIABLE_SCALE_DECIMAL_NAME: &str = "VariableScaleDecimal";
98const DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE: &str = "io.debezium.data";
99
100fn avro_type_mapping(
102 schema: &Schema,
103 ancestor_records: &mut Vec<String>,
104 refs: &NamesRef<'_>,
105 map_handling: Option<MapHandling>,
106) -> anyhow::Result<DataType> {
107 let data_type = match schema {
108 Schema::String => DataType::Varchar,
109 Schema::Int => DataType::Int32,
110 Schema::Long => DataType::Int64,
111 Schema::Boolean => DataType::Boolean,
112 Schema::Float => DataType::Float32,
113 Schema::Double => DataType::Float64,
114 Schema::Decimal(DecimalSchema { precision, .. }) => {
115 if *precision > Decimal::MAX_PRECISION.into() {
116 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
117 LazyLock::new(LogSuppresser::default);
118 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
119 tracing::warn!(
120 suppressed_count,
121 "RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
122 Decimal::MAX_PRECISION,
123 precision
124 );
125 }
126 }
127 DataType::Decimal
128 }
129 Schema::Date => DataType::Date,
130 Schema::LocalTimestampMillis => DataType::Timestamp,
131 Schema::LocalTimestampMicros => DataType::Timestamp,
132 Schema::TimestampMillis => DataType::Timestamptz,
133 Schema::TimestampMicros => DataType::Timestamptz,
134 Schema::Duration => DataType::Interval,
135 Schema::Bytes => DataType::Bytea,
136 Schema::Enum { .. } => DataType::Varchar,
137 Schema::TimeMillis => DataType::Time,
138 Schema::TimeMicros => DataType::Time,
139 Schema::Record(RecordSchema { fields, name, .. }) => {
140 if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
141 && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into())
142 {
143 return Ok(DataType::Decimal);
144 }
145
146 let unique_name = name.fullname(None);
147 if ancestor_records.contains(&unique_name) {
148 bail!(
149 "circular reference detected in Avro schema: {} -> {}",
150 ancestor_records.join(" -> "),
151 unique_name
152 );
153 }
154
155 ancestor_records.push(unique_name);
156 let ty = StructType::new(
157 fields
158 .iter()
159 .map(|f| {
160 Ok((
161 &f.name,
162 avro_type_mapping(&f.schema, ancestor_records, refs, map_handling)?,
163 ))
164 })
165 .collect::<anyhow::Result<Vec<_>>>()?,
166 )
167 .into();
168 ancestor_records.pop();
169 ty
170 }
171 Schema::Array(item_schema) => {
172 let item_type =
173 avro_type_mapping(item_schema.as_ref(), ancestor_records, refs, map_handling)?;
174 DataType::List(Box::new(item_type))
175 }
176 Schema::Union(union_schema) => {
177 debug_assert!(
183 union_schema
184 .variants()
185 .iter()
186 .map(Schema::canonical_form) .duplicates()
188 .next()
189 .is_none(),
190 "Union contains duplicate types: {union_schema:?}",
191 );
192 match get_nullable_union_inner(union_schema) {
193 Some(inner) => avro_type_mapping(inner, ancestor_records, refs, map_handling)?,
194 None => {
195 let fields = union_schema
201 .variants()
202 .iter()
203 .filter(|variant| !matches!(variant, &&Schema::Null))
205 .map(|variant| {
206 avro_type_mapping(variant, ancestor_records, refs, map_handling)
207 .and_then(|t| {
208 let name = avro_schema_to_struct_field_name(variant)?;
209 Ok((name, t))
210 })
211 })
212 .try_collect::<_, Vec<_>, _>()
213 .context("failed to convert Avro union to struct")?;
214
215 StructType::new(fields).into()
216 }
217 }
218 }
219 Schema::Ref { name } => {
220 if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
221 && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into())
222 {
223 DataType::Decimal
224 } else {
225 avro_type_mapping(
226 refs[name], ancestor_records,
228 refs,
229 map_handling,
230 )?
231 }
232 }
233 Schema::Map(value_schema) => {
234 match map_handling {
236 Some(MapHandling::Jsonb) => {
237 if supported_avro_to_json_type(value_schema) {
238 DataType::Jsonb
239 } else {
240 bail_not_implemented!(
241 issue = 16963,
242 "Avro map type to jsonb: {:?}",
243 schema
244 );
245 }
246 }
247 Some(MapHandling::Map) | None => {
248 let value = avro_type_mapping(
249 value_schema.as_ref(),
250 ancestor_records,
251 refs,
252 map_handling,
253 )
254 .context("failed to convert Avro map type")?;
255 DataType::Map(MapType::from_kv(DataType::Varchar, value))
256 }
257 }
258 }
259 Schema::Uuid => DataType::Varchar,
260 Schema::Null | Schema::Fixed(_) => {
261 bail_not_implemented!("Avro type: {:?}", schema);
262 }
263 };
264
265 Ok(data_type)
266}
267
268fn supported_avro_to_json_type(schema: &Schema) -> bool {
270 match schema {
271 Schema::Null | Schema::Boolean | Schema::Int | Schema::String => true,
272
273 Schema::Map(value_schema) | Schema::Array(value_schema) => {
274 supported_avro_to_json_type(value_schema)
275 }
276 Schema::Record(RecordSchema { fields, .. }) => fields
277 .iter()
278 .all(|f| supported_avro_to_json_type(&f.schema)),
279 Schema::Long
280 | Schema::Float
281 | Schema::Double
282 | Schema::Bytes
283 | Schema::Enum(_)
284 | Schema::Fixed(_)
285 | Schema::Decimal(_)
286 | Schema::Uuid
287 | Schema::Date
288 | Schema::TimeMillis
289 | Schema::TimeMicros
290 | Schema::TimestampMillis
291 | Schema::TimestampMicros
292 | Schema::LocalTimestampMillis
293 | Schema::LocalTimestampMicros
294 | Schema::Duration
295 | Schema::Ref { name: _ }
296 | Schema::Union(_) => false,
297 }
298}
299
300pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> Result<String, NotImplemented> {
302 Ok(match schema {
303 Schema::Null => unreachable!(),
304 Schema::Union(_) => unreachable!(),
305 Schema::Boolean => "boolean".to_owned(),
307 Schema::Int => "int".to_owned(),
308 Schema::Long => "long".to_owned(),
309 Schema::Float => "float".to_owned(),
310 Schema::Double => "double".to_owned(),
311 Schema::Bytes => "bytes".to_owned(),
312 Schema::String => "string".to_owned(),
313 Schema::Array(_) => "array".to_owned(),
315 Schema::Map(_) => "map".to_owned(),
316 Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) | Schema::Record(_) => {
318 bail_not_implemented!(issue=17632, "Avro named type used in Union type: {:?}", schema)
322
323 }
324
325 Schema::Uuid
354 | Schema::Decimal(_)
355 | Schema::Date
356 | Schema::TimeMillis
357 | Schema::TimeMicros
358 | Schema::TimestampMillis
359 | Schema::TimestampMicros
360 | Schema::LocalTimestampMillis
361 | Schema::LocalTimestampMicros
362 | Schema::Duration => {
363 bail_not_implemented!(issue=17616, "Avro logicalType used in Union type: {:?}", schema)
364 }
365 })
366}