risingwave_connector/schema/
protobuf.rsuse std::collections::BTreeMap;
use anyhow::Context as _;
use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor};
use prost_types::FileDescriptorSet;
use risingwave_connector_codec::common::protobuf::compile_pb;
use super::loader::{LoadedSchema, SchemaLoader};
use super::schema_registry::Subject;
use super::{
invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY,
SCHEMA_LOCATION_KEY, SCHEMA_REGISTRY_KEY,
};
use crate::connector_common::AwsAuthProps;
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};
pub async fn fetch_descriptor(
format_options: &BTreeMap<String, String>,
topic: &str,
aws_auth_props: Option<&AwsAuthProps>,
) -> Result<(MessageDescriptor, Option<i32>), SchemaFetchError> {
let message_name = format_options
.get(MESSAGE_NAME_KEY)
.ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
.clone();
let schema_location = format_options.get(SCHEMA_LOCATION_KEY);
let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY);
let row_schema_location = match (schema_location, schema_registry) {
(Some(_), Some(_)) => {
return Err(invalid_option_error!(
"cannot use {SCHEMA_LOCATION_KEY} and {SCHEMA_REGISTRY_KEY} together"
)
.into())
}
(None, None) => {
return Err(invalid_option_error!(
"requires one of {SCHEMA_LOCATION_KEY} or {SCHEMA_REGISTRY_KEY}"
)
.into())
}
(None, Some(_)) => {
let (md, sid) = fetch_from_registry(&message_name, format_options, topic).await?;
return Ok((md, Some(sid)));
}
(Some(url), None) => url.clone(),
};
if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
return Err(invalid_option_error!("s3 URL not supported yet").into());
}
let enc = EncodingProperties::Protobuf(ProtobufProperties {
use_schema_registry: false,
row_schema_location,
message_name,
aws_auth_props: aws_auth_props.cloned(),
..Default::default()
});
let conf = ProtobufParserConfig::new(enc)
.await
.map_err(SchemaFetchError::YetToMigrate)?;
Ok((conf.message_descriptor, None))
}
pub async fn fetch_from_registry(
message_name: &str,
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
let loader = SchemaLoader::from_format_options(topic, format_options)?;
let (vid, vpb) = loader.load_val_schema::<FileDescriptor>().await?;
Ok((
vpb.parent_pool().get_message_by_name(message_name).unwrap(),
vid,
))
}
impl LoadedSchema for FileDescriptor {
fn compile(primary: Subject, references: Vec<Subject>) -> Result<Self, SchemaFetchError> {
let primary_name = primary.name.clone();
match compile_pb_subject(primary, references)
.context("failed to compile protobuf schema into fd set")
{
Err(e) => Err(SchemaFetchError::SchemaCompile(e.into())),
Ok(fd_set) => DescriptorPool::from_file_descriptor_set(fd_set)
.context("failed to convert fd set to descriptor pool")
.and_then(|pool| {
pool.get_file_by_name(&primary_name)
.context("file lost after compilation")
})
.map_err(|e| SchemaFetchError::SchemaCompile(e.into())),
}
}
}
fn compile_pb_subject(
primary_subject: Subject,
dependency_subjects: Vec<Subject>,
) -> Result<FileDescriptorSet, SchemaFetchError> {
compile_pb(
(
primary_subject.name.clone(),
primary_subject.schema.content.clone(),
),
dependency_subjects
.into_iter()
.map(|s| (s.name.clone(), s.schema.content.clone())),
)
.map_err(|e| SchemaFetchError::SchemaCompile(e.into()))
}