risingwave_connector/parser/avro/
parser.rs1use std::fmt::Debug;
16use std::sync::Arc;
17
18use anyhow::Context;
19use apache_avro::types::Value;
20use apache_avro::{Reader, Schema, from_avro_datum};
21use risingwave_common::catalog::Field;
22use risingwave_common::{bail, try_match_expand};
23use risingwave_connector_codec::decoder::avro::{
24 AvroAccess, AvroParseOptions, ResolvedAvroSchema, avro_schema_to_fields,
25};
26
27use super::{ConfluentSchemaCache, GlueSchemaCache as _, GlueSchemaCacheImpl};
28use crate::error::ConnectorResult;
29use crate::parser::unified::AccessImpl;
30use crate::parser::utils::bytes_from_url;
31use crate::parser::{
32 AccessBuilder, AvroProperties, EncodingProperties, MapHandling, SchemaLocation,
33};
34use crate::schema::schema_registry::{
35 Client, extract_schema_id, get_subject_by_strategy, handle_sr_list,
36};
37use crate::source::SourceMeta;
38
39#[derive(Debug)]
41pub struct AvroAccessBuilder {
42 schema: Arc<ResolvedAvroSchema>,
43 writer_schema_cache: WriterSchemaCache,
45 value: Option<Value>,
46}
47
48impl AccessBuilder for AvroAccessBuilder {
49 async fn generate_accessor(
50 &mut self,
51 payload: Vec<u8>,
52 source_meta: &SourceMeta,
53 ) -> ConnectorResult<AccessImpl<'_>> {
54 self.value = self.parse_avro_value(&payload, source_meta).await?;
55 Ok(AccessImpl::Avro(AvroAccess::new(
56 self.value.as_ref().unwrap(),
57 AvroParseOptions::create(&self.schema.original_schema),
58 )))
59 }
60}
61
62impl AvroAccessBuilder {
63 pub fn new(config: AvroParserConfig) -> ConnectorResult<Self> {
64 let AvroParserConfig {
65 schema,
66 writer_schema_cache,
67 ..
68 } = config;
69 Ok(Self {
70 schema,
71 writer_schema_cache,
72 value: None,
73 })
74 }
75
76 async fn parse_avro_value(
97 &self,
98 payload: &[u8],
99 _source_meta: &SourceMeta,
100 ) -> ConnectorResult<Option<Value>> {
101 match &self.writer_schema_cache {
104 WriterSchemaCache::Confluent(resolver) => {
105 let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
106 let writer_schema = resolver.get_by_id(schema_id).await?;
107 Ok(Some(from_avro_datum(
108 writer_schema.as_ref(),
109 &mut raw_payload,
110 Some(&self.schema.original_schema),
111 )?))
112 }
113 WriterSchemaCache::File => {
114 let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?;
116 match reader.next() {
117 Some(Ok(v)) => Ok(Some(v)),
118 Some(Err(e)) => Err(e)?,
119 None => bail!("avro parse unexpected eof"),
120 }
121 }
122 WriterSchemaCache::Glue(resolver) => {
123 if payload.len() < 18 {
129 bail!("payload shorter than 18-byte glue header");
130 }
131 if payload[0] != 3 {
132 bail!(
133 "Only support glue header version 3 but found {}",
134 payload[0]
135 );
136 }
137 if payload[1] != 0 {
138 bail!("Non-zero compression {} not supported", payload[1]);
139 }
140 let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap();
141 let writer_schema = resolver.get_by_id(schema_version_id).await?;
142 let mut raw_payload = &payload[18..];
143 Ok(Some(from_avro_datum(
144 writer_schema.as_ref(),
145 &mut raw_payload,
146 Some(&self.schema.original_schema),
147 )?))
148 }
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
154pub struct AvroParserConfig {
155 schema: Arc<ResolvedAvroSchema>,
156 writer_schema_cache: WriterSchemaCache,
159
160 map_handling: Option<MapHandling>,
161}
162
163#[derive(Debug, Clone)]
164enum WriterSchemaCache {
165 Confluent(Arc<ConfluentSchemaCache>),
166 Glue(Arc<GlueSchemaCacheImpl>),
167 File,
168}
169
170impl AvroParserConfig {
171 pub async fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
172 let AvroProperties {
173 schema_location,
174 record_name,
175 key_record_name,
176 map_handling,
177 } = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
178 match schema_location {
179 SchemaLocation::Confluent {
180 urls: schema_location,
181 client_config,
182 name_strategy,
183 topic,
184 } => {
185 let url = handle_sr_list(schema_location.as_str())?;
186 let client = Client::new(url, &client_config)?;
187 let resolver = ConfluentSchemaCache::new(client);
188
189 if let Some(name) = &key_record_name {
190 bail!("unused FORMAT ENCODE option: key.message='{name}'");
191 }
192 let subject_value = get_subject_by_strategy(
193 &name_strategy,
194 topic.as_str(),
195 record_name.as_deref(),
196 false,
197 )?;
198 tracing::debug!("value subject {subject_value}");
199
200 Ok(Self {
201 schema: Arc::new(ResolvedAvroSchema::create(
202 resolver.get_by_subject(&subject_value).await?,
203 )?),
204 writer_schema_cache: WriterSchemaCache::Confluent(Arc::new(resolver)),
205 map_handling,
206 })
207 }
208 SchemaLocation::File {
209 url: schema_location,
210 aws_auth_props,
211 } => {
212 let url = handle_sr_list(schema_location.as_str())?;
213 let url = url.first().unwrap();
214 let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?;
215 let schema = Schema::parse_reader(&mut schema_content.as_slice())
216 .context("failed to parse avro schema")?;
217 Ok(Self {
218 schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
219 writer_schema_cache: WriterSchemaCache::File,
220 map_handling,
221 })
222 }
223 SchemaLocation::Glue {
224 schema_arn,
225 aws_auth_props,
226 mock_config,
227 } => {
228 let resolver =
229 GlueSchemaCacheImpl::new(&aws_auth_props, mock_config.as_deref()).await?;
230 let schema = resolver.get_by_name(&schema_arn).await?;
231 Ok(Self {
232 schema: Arc::new(ResolvedAvroSchema::create(schema)?),
233 writer_schema_cache: WriterSchemaCache::Glue(Arc::new(resolver)),
234 map_handling,
235 })
236 }
237 }
238 }
239
240 pub fn map_to_columns(&self) -> ConnectorResult<Vec<Field>> {
241 avro_schema_to_fields(&self.schema.original_schema, self.map_handling).map_err(Into::into)
242 }
243}
244
245#[cfg(test)]
246mod test {
247 use std::env;
248
249 use url::Url;
250
251 use super::*;
252 use crate::connector_common::AwsAuthProps;
253
254 fn test_data_path(file_name: &str) -> String {
255 let curr_dir = env::current_dir().unwrap().into_os_string();
256 curr_dir.into_string().unwrap() + "/src/test_data/" + file_name
257 }
258
259 #[tokio::test]
260 #[ignore]
261 async fn test_load_schema_from_s3() {
262 let schema_location = "s3://mingchao-schemas/complex-schema.avsc".to_owned();
263 let url = Url::parse(&schema_location).unwrap();
264 let aws_auth_config: AwsAuthProps =
265 serde_json::from_str(r#"region":"ap-southeast-1"#).unwrap();
266 let schema_content = bytes_from_url(&url, Some(&aws_auth_config)).await;
267 assert!(schema_content.is_ok());
268 let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
269 assert!(schema.is_ok());
270 println!("schema = {:?}", schema.unwrap());
271 }
272
273 #[tokio::test]
274 async fn test_load_schema_from_local() {
275 let schema_location = Url::from_file_path(test_data_path("complex-schema.avsc")).unwrap();
276 let schema_content = bytes_from_url(&schema_location, None).await;
277 assert!(schema_content.is_ok());
278 let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
279 assert!(schema.is_ok());
280 println!("schema = {:?}", schema.unwrap());
281 }
282
283 #[tokio::test]
284 #[ignore]
285 async fn test_load_schema_from_https() {
286 let schema_location =
287 "https://mingchao-schemas.s3.ap-southeast-1.amazonaws.com/complex-schema.avsc";
288 let url = Url::parse(schema_location).unwrap();
289 let schema_content = bytes_from_url(&url, None).await;
290 assert!(schema_content.is_ok());
291 let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
292 assert!(schema.is_ok());
293 println!("schema = {:?}", schema.unwrap());
294 }
295}