risingwave_connector/parser/avro/
parser.rsuse std::fmt::Debug;
use std::sync::Arc;
use anyhow::Context;
use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Reader, Schema};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector_codec::decoder::avro::{
avro_schema_to_column_descs, AvroAccess, AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::plan_common::ColumnDesc;
use super::{ConfluentSchemaCache, GlueSchemaCache as _, GlueSchemaCacheImpl};
use crate::error::ConnectorResult;
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{
AccessBuilder, AvroProperties, EncodingProperties, MapHandling, SchemaLocation,
};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<ResolvedAvroSchema>,
writer_schema_cache: WriterSchemaCache,
value: Option<Value>,
}
impl AccessBuilder for AvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::create(&self.schema.resolved_schema),
)))
}
}
impl AvroAccessBuilder {
pub fn new(config: AvroParserConfig) -> ConnectorResult<Self> {
let AvroParserConfig {
schema,
writer_schema_cache,
..
} = config;
Ok(Self {
schema,
writer_schema_cache,
value: None,
})
}
async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult<Option<Value>> {
match &self.writer_schema_cache {
WriterSchemaCache::Confluent(resolver) => {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let writer_schema = resolver.get_by_id(schema_id).await?;
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(&self.schema.original_schema),
)?))
}
WriterSchemaCache::File => {
let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e)?,
None => bail!("avro parse unexpected eof"),
}
}
WriterSchemaCache::Glue(resolver) => {
if payload.len() < 18 {
bail!("payload shorter than 18-byte glue header");
}
if payload[0] != 3 {
bail!(
"Only support glue header version 3 but found {}",
payload[0]
);
}
if payload[1] != 0 {
bail!("Non-zero compression {} not supported", payload[1]);
}
let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap();
let writer_schema = resolver.get_by_id(schema_version_id).await?;
let mut raw_payload = &payload[18..];
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(&self.schema.original_schema),
)?))
}
}
}
}
#[derive(Debug, Clone)]
pub struct AvroParserConfig {
schema: Arc<ResolvedAvroSchema>,
writer_schema_cache: WriterSchemaCache,
map_handling: Option<MapHandling>,
}
#[derive(Debug, Clone)]
enum WriterSchemaCache {
Confluent(Arc<ConfluentSchemaCache>),
Glue(Arc<GlueSchemaCacheImpl>),
File,
}
impl AvroParserConfig {
pub async fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
let AvroProperties {
schema_location,
record_name,
key_record_name,
map_handling,
} = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
match schema_location {
SchemaLocation::Confluent {
urls: schema_location,
client_config,
name_strategy,
topic,
} => {
let url = handle_sr_list(schema_location.as_str())?;
let client = Client::new(url, &client_config)?;
let resolver = ConfluentSchemaCache::new(client);
if let Some(name) = &key_record_name {
bail!("unused FORMAT ENCODE option: key.message='{name}'");
}
let subject_value = get_subject_by_strategy(
&name_strategy,
topic.as_str(),
record_name.as_deref(),
false,
)?;
tracing::debug!("value subject {subject_value}");
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_value).await?,
)?),
writer_schema_cache: WriterSchemaCache::Confluent(Arc::new(resolver)),
map_handling,
})
}
SchemaLocation::File {
url: schema_location,
aws_auth_props,
} => {
let url = handle_sr_list(schema_location.as_str())?;
let url = url.first().unwrap();
let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?;
let schema = Schema::parse_reader(&mut schema_content.as_slice())
.context("failed to parse avro schema")?;
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
writer_schema_cache: WriterSchemaCache::File,
map_handling,
})
}
SchemaLocation::Glue {
schema_arn,
aws_auth_props,
mock_config,
} => {
let resolver =
GlueSchemaCacheImpl::new(&aws_auth_props, mock_config.as_deref()).await?;
let schema = resolver.get_by_name(&schema_arn).await?;
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(schema)?),
writer_schema_cache: WriterSchemaCache::Glue(Arc::new(resolver)),
map_handling,
})
}
}
}
pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
.map_err(Into::into)
}
}
#[cfg(test)]
mod test {
use std::env;
use url::Url;
use super::*;
use crate::connector_common::AwsAuthProps;
fn test_data_path(file_name: &str) -> String {
let curr_dir = env::current_dir().unwrap().into_os_string();
curr_dir.into_string().unwrap() + "/src/test_data/" + file_name
}
#[tokio::test]
#[ignore]
async fn test_load_schema_from_s3() {
let schema_location = "s3://mingchao-schemas/complex-schema.avsc".to_string();
let url = Url::parse(&schema_location).unwrap();
let aws_auth_config: AwsAuthProps =
serde_json::from_str(r#"region":"ap-southeast-1"#).unwrap();
let schema_content = bytes_from_url(&url, Some(&aws_auth_config)).await;
assert!(schema_content.is_ok());
let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
assert!(schema.is_ok());
println!("schema = {:?}", schema.unwrap());
}
#[tokio::test]
async fn test_load_schema_from_local() {
let schema_location = Url::from_file_path(test_data_path("complex-schema.avsc")).unwrap();
let schema_content = bytes_from_url(&schema_location, None).await;
assert!(schema_content.is_ok());
let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
assert!(schema.is_ok());
println!("schema = {:?}", schema.unwrap());
}
#[tokio::test]
#[ignore]
async fn test_load_schema_from_https() {
let schema_location =
"https://mingchao-schemas.s3.ap-southeast-1.amazonaws.com/complex-schema.avsc";
let url = Url::parse(schema_location).unwrap();
let schema_content = bytes_from_url(&url, None).await;
assert!(schema_content.is_ok());
let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
assert!(schema.is_ok());
println!("schema = {:?}", schema.unwrap());
}
}