risingwave_connector/schema/
protobuf.rs1use std::collections::BTreeMap;
16
17use anyhow::Context as _;
18use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor};
19use prost_types::FileDescriptorSet;
20use risingwave_connector_codec::common::protobuf::compile_pb;
21
22use super::loader::{LoadedSchema, SchemaLoader};
23use super::schema_registry::Subject;
24use super::{
25 InvalidOptionError, MESSAGE_NAME_KEY, SCHEMA_LOCATION_KEY, SCHEMA_REGISTRY_KEY,
26 SchemaFetchError, invalid_option_error,
27};
28use crate::connector_common::AwsAuthProps;
29use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};
30
31pub async fn fetch_descriptor(
33 format_options: &BTreeMap<String, String>,
34 topic: &str,
35 aws_auth_props: Option<&AwsAuthProps>,
36) -> Result<(MessageDescriptor, Option<i32>), SchemaFetchError> {
37 let message_name = format_options
38 .get(MESSAGE_NAME_KEY)
39 .ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
40 .clone();
41 let schema_location = format_options.get(SCHEMA_LOCATION_KEY);
42 let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY);
43 let row_schema_location = match (schema_location, schema_registry) {
44 (Some(_), Some(_)) => {
45 return Err(invalid_option_error!(
46 "cannot use {SCHEMA_LOCATION_KEY} and {SCHEMA_REGISTRY_KEY} together"
47 )
48 .into());
49 }
50 (None, None) => {
51 return Err(invalid_option_error!(
52 "requires one of {SCHEMA_LOCATION_KEY} or {SCHEMA_REGISTRY_KEY}"
53 )
54 .into());
55 }
56 (None, Some(_)) => {
57 let (md, sid) = fetch_from_registry(&message_name, format_options, topic).await?;
58 return Ok((md, Some(sid)));
59 }
60 (Some(url), None) => url.clone(),
61 };
62
63 if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
64 return Err(invalid_option_error!("s3 URL not supported yet").into());
65 }
66
67 let enc = EncodingProperties::Protobuf(ProtobufProperties {
68 schema_location: crate::parser::SchemaLocation::File {
69 url: row_schema_location,
70 aws_auth_props: aws_auth_props.cloned(),
71 },
72 message_name,
73 ..Default::default()
75 });
76 let conf = ProtobufParserConfig::new(enc)
81 .await
82 .map_err(SchemaFetchError::YetToMigrate)?;
83 Ok((conf.message_descriptor, None))
84}
85
86pub async fn fetch_from_registry(
87 message_name: &str,
88 format_options: &BTreeMap<String, String>,
89 topic: &str,
90) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
91 let loader = SchemaLoader::from_format_options(topic, format_options).await?;
92
93 let (vid, vpb) = loader.load_val_schema::<FileDescriptor>().await?;
94 let vid = match vid {
95 super::SchemaVersion::Confluent(vid) => vid,
96 super::SchemaVersion::Glue(_) => {
97 return Err(
98 invalid_option_error!("Protobuf with Glue Schema Registry unsupported").into(),
99 );
100 }
101 };
102
103 Ok((
104 vpb.parent_pool().get_message_by_name(message_name).unwrap(),
105 vid,
106 ))
107}
108
109impl LoadedSchema for FileDescriptor {
110 fn compile(primary: Subject, references: Vec<Subject>) -> Result<Self, SchemaFetchError> {
111 let primary_name = primary.name.clone();
112
113 match compile_pb_subject(primary, references)
114 .context("failed to compile protobuf schema into fd set")
115 {
116 Err(e) => Err(SchemaFetchError::SchemaCompile(e.into())),
117 Ok(fd_set) => DescriptorPool::from_file_descriptor_set(fd_set)
118 .context("failed to convert fd set to descriptor pool")
119 .and_then(|pool| {
120 pool.get_file_by_name(&primary_name)
121 .context("file lost after compilation")
122 })
123 .map_err(|e| SchemaFetchError::SchemaCompile(e.into())),
124 }
125 }
126}
127
128fn compile_pb_subject(
129 primary_subject: Subject,
130 dependency_subjects: Vec<Subject>,
131) -> Result<FileDescriptorSet, SchemaFetchError> {
132 compile_pb(
133 (
134 primary_subject.name.clone(),
135 primary_subject.schema.content.clone(),
136 ),
137 dependency_subjects
138 .into_iter()
139 .map(|s| (s.name.clone(), s.schema.content.clone())),
140 )
141 .map_err(|e| SchemaFetchError::SchemaCompile(e.into()))
142}