risingwave_connector/parser/avro/
parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::Arc;
17
18use anyhow::Context;
19use apache_avro::types::Value;
20use apache_avro::{Reader, Schema, from_avro_datum};
21use risingwave_common::catalog::Field;
22use risingwave_common::{bail, try_match_expand};
23use risingwave_connector_codec::decoder::avro::{
24    AvroAccess, AvroParseOptions, ResolvedAvroSchema, avro_schema_to_fields,
25};
26
27use super::{ConfluentSchemaCache, GlueSchemaCache as _, GlueSchemaCacheImpl};
28use crate::error::ConnectorResult;
29use crate::parser::unified::AccessImpl;
30use crate::parser::utils::bytes_from_url;
31use crate::parser::{
32    AccessBuilder, AvroProperties, EncodingProperties, MapHandling, SchemaLocation,
33};
34use crate::schema::schema_registry::{
35    Client, extract_schema_id, get_subject_by_strategy, handle_sr_list,
36};
37use crate::source::SourceMeta;
38
39// Default avro access builder
40#[derive(Debug)]
41pub struct AvroAccessBuilder {
42    schema: Arc<ResolvedAvroSchema>,
43    /// Refer to [`AvroParserConfig::writer_schema_cache`].
44    writer_schema_cache: WriterSchemaCache,
45    value: Option<Value>,
46}
47
48impl AccessBuilder for AvroAccessBuilder {
49    async fn generate_accessor(
50        &mut self,
51        payload: Vec<u8>,
52        source_meta: &SourceMeta,
53    ) -> ConnectorResult<AccessImpl<'_>> {
54        self.value = self.parse_avro_value(&payload, source_meta).await?;
55        Ok(AccessImpl::Avro(AvroAccess::new(
56            self.value.as_ref().unwrap(),
57            AvroParseOptions::create(&self.schema.original_schema),
58        )))
59    }
60}
61
62impl AvroAccessBuilder {
63    pub fn new(config: AvroParserConfig) -> ConnectorResult<Self> {
64        let AvroParserConfig {
65            schema,
66            writer_schema_cache,
67            ..
68        } = config;
69        Ok(Self {
70            schema,
71            writer_schema_cache,
72            value: None,
73        })
74    }
75
76    /// Note: we should use unresolved schema to parsing bytes into avro value.
77    /// Otherwise it's an invalid schema and parsing will fail. (Avro error: Two named schema defined for same fullname)
78    ///
79    /// # Notes about how Avro data looks like
80    ///
81    /// First, it has two [serialization encodings: binary and JSON](https://avro.apache.org/docs/1.11.1/specification/#encodings).
82    /// They don't have magic bytes and cannot be distinguished on their own.
83    ///
84    /// But in different cases, it starts with different headers, or magic bytes, which can be confusing.
85    ///
86    /// ## `apache_avro` API and headers
87    ///
88    /// - `apache_avro::Reader`: [Object Container Files](https://avro.apache.org/docs/1.11.1/specification/#object-container-files): contains file header, starting with 4 bytes `Obj1`. This is a batch file encoding. We don't use it.
89    /// - `apache_avro::GenericSingleObjectReader`: [Single-object encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding): starts with 2 bytes `0xC301`. This is designed to be used in places like Kafka, but Confluent schema registry doesn't use it.
90    /// - `apache_avro::from_avro_datum`: no header, binary encoding. This is what we should use.
91    ///
92    /// ## Confluent schema registry
93    ///
94    /// - In Kafka ([Confluent schema registry wire format](https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format)):
95    ///   starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding.
96    async fn parse_avro_value(
97        &self,
98        payload: &[u8],
99        _source_meta: &SourceMeta,
100    ) -> ConnectorResult<Option<Value>> {
101        // parse payload to avro value
102        // if use confluent schema, get writer schema from confluent schema registry
103        match &self.writer_schema_cache {
104            WriterSchemaCache::Confluent(resolver) => {
105                let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
106                let writer_schema = resolver.get_by_id(schema_id).await?;
107                Ok(Some(from_avro_datum(
108                    writer_schema.as_ref(),
109                    &mut raw_payload,
110                    Some(&self.schema.original_schema),
111                )?))
112            }
113            WriterSchemaCache::File => {
114                // FIXME: we should not use `Reader` (file header) here. See comment above and https://github.com/risingwavelabs/risingwave/issues/12871
115                let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?;
116                match reader.next() {
117                    Some(Ok(v)) => Ok(Some(v)),
118                    Some(Err(e)) => Err(e)?,
119                    None => bail!("avro parse unexpected eof"),
120                }
121            }
122            WriterSchemaCache::Glue(resolver) => {
123                // <https://github.com/awslabs/aws-glue-schema-registry/blob/v1.1.20/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L59-L61>
124                // byte 0:      header version = 3
125                // byte 1:      compression: 0 = no compression; 5 = zlib (unsupported)
126                // byte 2..=17: 16-byte UUID as schema version id
127                // byte 18..:   raw avro payload
128                if payload.len() < 18 {
129                    bail!("payload shorter than 18-byte glue header");
130                }
131                if payload[0] != 3 {
132                    bail!(
133                        "Only support glue header version 3 but found {}",
134                        payload[0]
135                    );
136                }
137                if payload[1] != 0 {
138                    bail!("Non-zero compression {} not supported", payload[1]);
139                }
140                let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap();
141                let writer_schema = resolver.get_by_id(schema_version_id).await?;
142                let mut raw_payload = &payload[18..];
143                Ok(Some(from_avro_datum(
144                    writer_schema.as_ref(),
145                    &mut raw_payload,
146                    Some(&self.schema.original_schema),
147                )?))
148            }
149        }
150    }
151}
152
153#[derive(Debug, Clone)]
154pub struct AvroParserConfig {
155    schema: Arc<ResolvedAvroSchema>,
156    /// Writer schema is the schema used to write the data. When parsing Avro data, the exactly same schema
157    /// must be used to decode the message, and then convert it with the reader schema.
158    writer_schema_cache: WriterSchemaCache,
159
160    map_handling: Option<MapHandling>,
161}
162
163#[derive(Debug, Clone)]
164enum WriterSchemaCache {
165    Confluent(Arc<ConfluentSchemaCache>),
166    Glue(Arc<GlueSchemaCacheImpl>),
167    File,
168}
169
170impl AvroParserConfig {
171    pub async fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
172        let AvroProperties {
173            schema_location,
174            record_name,
175            key_record_name,
176            map_handling,
177        } = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
178        match schema_location {
179            SchemaLocation::Confluent {
180                urls: schema_location,
181                client_config,
182                name_strategy,
183                topic,
184            } => {
185                let url = handle_sr_list(schema_location.as_str())?;
186                let client = Client::new(url, &client_config)?;
187                let resolver = ConfluentSchemaCache::new(client);
188
189                if let Some(name) = &key_record_name {
190                    bail!("unused FORMAT ENCODE option: key.message='{name}'");
191                }
192                let subject_value = get_subject_by_strategy(
193                    &name_strategy,
194                    topic.as_str(),
195                    record_name.as_deref(),
196                    false,
197                )?;
198                tracing::debug!("value subject {subject_value}");
199
200                Ok(Self {
201                    schema: Arc::new(ResolvedAvroSchema::create(
202                        resolver.get_by_subject(&subject_value).await?,
203                    )?),
204                    writer_schema_cache: WriterSchemaCache::Confluent(Arc::new(resolver)),
205                    map_handling,
206                })
207            }
208            SchemaLocation::File {
209                url: schema_location,
210                aws_auth_props,
211            } => {
212                let url = handle_sr_list(schema_location.as_str())?;
213                let url = url.first().unwrap();
214                let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?;
215                let schema = Schema::parse_reader(&mut schema_content.as_slice())
216                    .context("failed to parse avro schema")?;
217                Ok(Self {
218                    schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
219                    writer_schema_cache: WriterSchemaCache::File,
220                    map_handling,
221                })
222            }
223            SchemaLocation::Glue {
224                schema_arn,
225                aws_auth_props,
226                mock_config,
227            } => {
228                let resolver =
229                    GlueSchemaCacheImpl::new(&aws_auth_props, mock_config.as_deref()).await?;
230                let schema = resolver.get_by_name(&schema_arn).await?;
231                Ok(Self {
232                    schema: Arc::new(ResolvedAvroSchema::create(schema)?),
233                    writer_schema_cache: WriterSchemaCache::Glue(Arc::new(resolver)),
234                    map_handling,
235                })
236            }
237        }
238    }
239
240    pub fn map_to_columns(&self) -> ConnectorResult<Vec<Field>> {
241        avro_schema_to_fields(&self.schema.original_schema, self.map_handling).map_err(Into::into)
242    }
243}
244
245#[cfg(test)]
246mod test {
247    use std::env;
248
249    use url::Url;
250
251    use super::*;
252    use crate::connector_common::AwsAuthProps;
253
254    fn test_data_path(file_name: &str) -> String {
255        let curr_dir = env::current_dir().unwrap().into_os_string();
256        curr_dir.into_string().unwrap() + "/src/test_data/" + file_name
257    }
258
259    #[tokio::test]
260    #[ignore]
261    async fn test_load_schema_from_s3() {
262        let schema_location = "s3://mingchao-schemas/complex-schema.avsc".to_owned();
263        let url = Url::parse(&schema_location).unwrap();
264        let aws_auth_config: AwsAuthProps =
265            serde_json::from_str(r#"region":"ap-southeast-1"#).unwrap();
266        let schema_content = bytes_from_url(&url, Some(&aws_auth_config)).await;
267        assert!(schema_content.is_ok());
268        let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
269        assert!(schema.is_ok());
270        println!("schema = {:?}", schema.unwrap());
271    }
272
273    #[tokio::test]
274    async fn test_load_schema_from_local() {
275        let schema_location = Url::from_file_path(test_data_path("complex-schema.avsc")).unwrap();
276        let schema_content = bytes_from_url(&schema_location, None).await;
277        assert!(schema_content.is_ok());
278        let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
279        assert!(schema.is_ok());
280        println!("schema = {:?}", schema.unwrap());
281    }
282
283    #[tokio::test]
284    #[ignore]
285    async fn test_load_schema_from_https() {
286        let schema_location =
287            "https://mingchao-schemas.s3.ap-southeast-1.amazonaws.com/complex-schema.avsc";
288        let url = Url::parse(schema_location).unwrap();
289        let schema_content = bytes_from_url(&url, None).await;
290        assert!(schema_content.is_ok());
291        let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
292        assert!(schema.is_ok());
293        println!("schema = {:?}", schema.unwrap());
294    }
295}