risingwave_connector/schema/
loader.rs1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
18
19use super::schema_registry::{
20 Client, Subject, get_subject_by_strategy, handle_sr_list, name_strategy_from_str,
21};
22use super::{
23 AWS_GLUE_SCHEMA_ARN_KEY, InvalidOptionError, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY,
24 MalformedResponseError, NAME_STRATEGY_KEY, SCHEMA_REGISTRY_KEY, SchemaFetchError,
25 invalid_option_error, malformed_response_error,
26};
27use crate::connector_common::AwsAuthProps;
28
29pub enum SchemaLoader {
30 Confluent(ConfluentSchemaLoader),
31 Glue(GlueSchemaLoader),
32}
33
34pub struct ConfluentSchemaLoader {
35 pub client: Client,
36 pub name_strategy: PbSchemaRegistryNameStrategy,
37 pub topic: String,
38 pub key_record_name: Option<String>,
39 pub val_record_name: Option<String>,
40}
41
42pub enum GlueSchemaLoader {
43 Real {
44 client: aws_sdk_glue::Client,
45 schema_arn: String,
46 },
47 Mock {
48 schema_version_id: uuid::Uuid,
49 definition: String,
50 },
51}
52
53pub enum SchemaVersion {
54 Confluent(i32),
55 Glue(uuid::Uuid),
56}
57
58impl ConfluentSchemaLoader {
59 pub fn from_format_options(
60 topic: &str,
61 format_options: &BTreeMap<String, String>,
62 ) -> Result<Self, SchemaFetchError> {
63 let schema_location = format_options
64 .get(SCHEMA_REGISTRY_KEY)
65 .ok_or_else(|| invalid_option_error!("{SCHEMA_REGISTRY_KEY} required"))?;
66 let client_config = format_options.into();
67 let urls = handle_sr_list(schema_location)?;
68 let client = Client::new(urls, &client_config)?;
69
70 let name_strategy = format_options
71 .get(NAME_STRATEGY_KEY)
72 .map(|s| {
73 name_strategy_from_str(s)
74 .ok_or_else(|| invalid_option_error!("unrecognized strategy {s}"))
75 })
76 .transpose()?
77 .unwrap_or_default();
78 let key_record_name = format_options.get(KEY_MESSAGE_NAME_KEY).cloned();
79 let val_record_name = format_options.get(MESSAGE_NAME_KEY).cloned();
80
81 Ok(Self {
82 client,
83 name_strategy,
84 topic: topic.into(),
85 key_record_name,
86 val_record_name,
87 })
88 }
89
90 async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
91 &self,
92 ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
93 let record = match IS_KEY {
94 true => self.key_record_name.as_deref(),
95 false => self.val_record_name.as_deref(),
96 };
97 let subject = get_subject_by_strategy(&self.name_strategy, &self.topic, record, IS_KEY)?;
98 let (primary_subject, dependency_subjects) =
99 self.client.get_subject_and_references(&subject).await?;
100 let schema_id = primary_subject.schema.id;
101 let out = Out::compile(primary_subject, dependency_subjects)?;
102 Ok((SchemaVersion::Confluent(schema_id), out))
103 }
104}
105
106impl GlueSchemaLoader {
107 pub async fn from_format_options(
108 schema_arn: &str,
109 format_options: &BTreeMap<String, String>,
110 ) -> Result<Self, SchemaFetchError> {
111 risingwave_common::license::Feature::GlueSchemaRegistry.check_available()?;
112 if let Some(mock_config) = format_options.get("aws.glue.mock_config") {
113 let parsed: serde_json::Value =
115 serde_json::from_str(mock_config).expect("mock config shall be valid json");
116 let schema_version_id_str = parsed
117 .get("arn_to_latest_id")
118 .unwrap()
119 .as_object()
120 .unwrap()
121 .get(schema_arn)
122 .unwrap()
123 .as_str()
124 .unwrap();
125 let definition = parsed
126 .get("by_id")
127 .unwrap()
128 .as_object()
129 .unwrap()
130 .get(schema_version_id_str)
131 .unwrap()
132 .to_string();
133 return Ok(Self::Mock {
134 schema_version_id: schema_version_id_str.parse()?,
135 definition,
136 });
137 };
138 let aws_auth_props =
139 serde_json::from_value::<AwsAuthProps>(serde_json::to_value(format_options).unwrap())
140 .map_err(|_e| invalid_option_error!(""))?;
141 let client = aws_sdk_glue::Client::new(
142 &aws_auth_props
143 .build_config()
144 .await
145 .map_err(SchemaFetchError::YetToMigrate)?,
146 );
147 Ok(Self::Real {
148 client,
149 schema_arn: schema_arn.to_owned(),
150 })
151 }
152
153 async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
154 &self,
155 ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
156 if IS_KEY {
157 return Err(invalid_option_error!(
158 "GlueSchemaRegistry cannot be key. Specify `KEY ENCODE [TEXT | BYTES]` please."
159 )
160 .into());
161 }
162 let (schema_version_id, definition) = match self {
163 Self::Mock {
164 schema_version_id,
165 definition,
166 } => (*schema_version_id, definition.clone()),
167 Self::Real { client, schema_arn } => {
168 use aws_sdk_glue::types::{SchemaId, SchemaVersionNumber};
169
170 let res = client
171 .get_schema_version()
172 .schema_id(SchemaId::builder().schema_arn(schema_arn).build())
173 .schema_version_number(
174 SchemaVersionNumber::builder().latest_version(true).build(),
175 )
176 .send()
177 .await
178 .map_err(|e| Box::new(e.into_service_error()))?;
179 let schema_version_id = res
180 .schema_version_id()
181 .ok_or_else(|| malformed_response_error!("missing schema_version_id"))?
182 .parse()?;
183 let definition = res
184 .schema_definition()
185 .ok_or_else(|| malformed_response_error!("missing schema_definition"))?
186 .to_owned();
187 (schema_version_id, definition)
188 }
189 };
190
191 let primary = Subject {
194 version: 0,
195 name: "".to_owned(),
196 schema: super::schema_registry::ConfluentSchema {
197 id: 0,
198 content: definition,
199 },
200 };
201 let out = Out::compile(primary, vec![])?;
202 Ok((SchemaVersion::Glue(schema_version_id), out))
203 }
204}
205
206impl SchemaLoader {
207 pub async fn from_format_options(
208 topic: &str,
209 format_options: &BTreeMap<String, String>,
210 ) -> Result<Self, SchemaFetchError> {
211 if let Some(schema_arn) = format_options.get(AWS_GLUE_SCHEMA_ARN_KEY) {
212 Ok(Self::Glue(
213 GlueSchemaLoader::from_format_options(schema_arn, format_options).await?,
214 ))
215 } else {
216 Ok(Self::Confluent(ConfluentSchemaLoader::from_format_options(
217 topic,
218 format_options,
219 )?))
220 }
221 }
222
223 async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
224 &self,
225 ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
226 match self {
227 Self::Confluent(inner) => inner.load_schema::<Out, IS_KEY>().await,
228 Self::Glue(inner) => inner.load_schema::<Out, IS_KEY>().await,
229 }
230 }
231
232 pub async fn load_key_schema<Out: LoadedSchema>(
233 &self,
234 ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
235 self.load_schema::<Out, true>().await
236 }
237
238 pub async fn load_val_schema<Out: LoadedSchema>(
239 &self,
240 ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
241 self.load_schema::<Out, false>().await
242 }
243}
244
245pub trait LoadedSchema: Sized {
246 fn compile(primary: Subject, references: Vec<Subject>) -> Result<Self, SchemaFetchError>;
247}