risingwave_connector/schema/
loader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
18
19use super::schema_registry::{
20    Client, Subject, get_subject_by_strategy, handle_sr_list, name_strategy_from_str,
21};
22use super::{
23    AWS_GLUE_SCHEMA_ARN_KEY, InvalidOptionError, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY,
24    MalformedResponseError, NAME_STRATEGY_KEY, SCHEMA_REGISTRY_KEY, SchemaFetchError,
25    invalid_option_error, malformed_response_error,
26};
27use crate::connector_common::AwsAuthProps;
28
29pub enum SchemaLoader {
30    Confluent(ConfluentSchemaLoader),
31    Glue(GlueSchemaLoader),
32}
33
34pub struct ConfluentSchemaLoader {
35    pub client: Client,
36    pub name_strategy: PbSchemaRegistryNameStrategy,
37    pub topic: String,
38    pub key_record_name: Option<String>,
39    pub val_record_name: Option<String>,
40}
41
42pub enum GlueSchemaLoader {
43    Real {
44        client: aws_sdk_glue::Client,
45        schema_arn: String,
46    },
47    Mock {
48        schema_version_id: uuid::Uuid,
49        definition: String,
50    },
51}
52
53pub enum SchemaVersion {
54    Confluent(i32),
55    Glue(uuid::Uuid),
56}
57
58impl ConfluentSchemaLoader {
59    pub fn from_format_options(
60        topic: &str,
61        format_options: &BTreeMap<String, String>,
62    ) -> Result<Self, SchemaFetchError> {
63        let schema_location = format_options
64            .get(SCHEMA_REGISTRY_KEY)
65            .ok_or_else(|| invalid_option_error!("{SCHEMA_REGISTRY_KEY} required"))?;
66        let client_config = format_options.into();
67        let urls = handle_sr_list(schema_location)?;
68        let client = Client::new(urls, &client_config)?;
69
70        let name_strategy = format_options
71            .get(NAME_STRATEGY_KEY)
72            .map(|s| {
73                name_strategy_from_str(s)
74                    .ok_or_else(|| invalid_option_error!("unrecognized strategy {s}"))
75            })
76            .transpose()?
77            .unwrap_or_default();
78        let key_record_name = format_options.get(KEY_MESSAGE_NAME_KEY).cloned();
79        let val_record_name = format_options.get(MESSAGE_NAME_KEY).cloned();
80
81        Ok(Self {
82            client,
83            name_strategy,
84            topic: topic.into(),
85            key_record_name,
86            val_record_name,
87        })
88    }
89
90    async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
91        &self,
92    ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
93        let record = match IS_KEY {
94            true => self.key_record_name.as_deref(),
95            false => self.val_record_name.as_deref(),
96        };
97        let subject = get_subject_by_strategy(&self.name_strategy, &self.topic, record, IS_KEY)?;
98        let (primary_subject, dependency_subjects) =
99            self.client.get_subject_and_references(&subject).await?;
100        let schema_id = primary_subject.schema.id;
101        let out = Out::compile(primary_subject, dependency_subjects)?;
102        Ok((SchemaVersion::Confluent(schema_id), out))
103    }
104}
105
106impl GlueSchemaLoader {
107    pub async fn from_format_options(
108        schema_arn: &str,
109        format_options: &BTreeMap<String, String>,
110    ) -> Result<Self, SchemaFetchError> {
111        risingwave_common::license::Feature::GlueSchemaRegistry.check_available()?;
112        if let Some(mock_config) = format_options.get("aws.glue.mock_config") {
113            // Internal format for easy testing. See `MockGlueSchemaCache` for details.
114            let parsed: serde_json::Value =
115                serde_json::from_str(mock_config).expect("mock config shall be valid json");
116            let schema_version_id_str = parsed
117                .get("arn_to_latest_id")
118                .unwrap()
119                .as_object()
120                .unwrap()
121                .get(schema_arn)
122                .unwrap()
123                .as_str()
124                .unwrap();
125            let definition = parsed
126                .get("by_id")
127                .unwrap()
128                .as_object()
129                .unwrap()
130                .get(schema_version_id_str)
131                .unwrap()
132                .to_string();
133            return Ok(Self::Mock {
134                schema_version_id: schema_version_id_str.parse()?,
135                definition,
136            });
137        };
138        let aws_auth_props =
139            serde_json::from_value::<AwsAuthProps>(serde_json::to_value(format_options).unwrap())
140                .map_err(|_e| invalid_option_error!(""))?;
141        let client = aws_sdk_glue::Client::new(
142            &aws_auth_props
143                .build_config()
144                .await
145                .map_err(SchemaFetchError::YetToMigrate)?,
146        );
147        Ok(Self::Real {
148            client,
149            schema_arn: schema_arn.to_owned(),
150        })
151    }
152
153    async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
154        &self,
155    ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
156        if IS_KEY {
157            return Err(invalid_option_error!(
158                "GlueSchemaRegistry cannot be key. Specify `KEY ENCODE [TEXT | BYTES]` please."
159            )
160            .into());
161        }
162        let (schema_version_id, definition) = match self {
163            Self::Mock {
164                schema_version_id,
165                definition,
166            } => (*schema_version_id, definition.clone()),
167            Self::Real { client, schema_arn } => {
168                use aws_sdk_glue::types::{SchemaId, SchemaVersionNumber};
169
170                let res = client
171                    .get_schema_version()
172                    .schema_id(SchemaId::builder().schema_arn(schema_arn).build())
173                    .schema_version_number(
174                        SchemaVersionNumber::builder().latest_version(true).build(),
175                    )
176                    .send()
177                    .await
178                    .map_err(|e| Box::new(e.into_service_error()))?;
179                let schema_version_id = res
180                    .schema_version_id()
181                    .ok_or_else(|| malformed_response_error!("missing schema_version_id"))?
182                    .parse()?;
183                let definition = res
184                    .schema_definition()
185                    .ok_or_else(|| malformed_response_error!("missing schema_definition"))?
186                    .to_owned();
187                (schema_version_id, definition)
188            }
189        };
190
191        // https://github.com/awslabs/aws-glue-schema-registry/issues/32
192        // No references in AWS Glue Schema Registry yet
193        let primary = Subject {
194            version: 0,
195            name: "".to_owned(),
196            schema: super::schema_registry::ConfluentSchema {
197                id: 0,
198                content: definition,
199            },
200        };
201        let out = Out::compile(primary, vec![])?;
202        Ok((SchemaVersion::Glue(schema_version_id), out))
203    }
204}
205
206impl SchemaLoader {
207    pub async fn from_format_options(
208        topic: &str,
209        format_options: &BTreeMap<String, String>,
210    ) -> Result<Self, SchemaFetchError> {
211        if let Some(schema_arn) = format_options.get(AWS_GLUE_SCHEMA_ARN_KEY) {
212            Ok(Self::Glue(
213                GlueSchemaLoader::from_format_options(schema_arn, format_options).await?,
214            ))
215        } else {
216            Ok(Self::Confluent(ConfluentSchemaLoader::from_format_options(
217                topic,
218                format_options,
219            )?))
220        }
221    }
222
223    async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
224        &self,
225    ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
226        match self {
227            Self::Confluent(inner) => inner.load_schema::<Out, IS_KEY>().await,
228            Self::Glue(inner) => inner.load_schema::<Out, IS_KEY>().await,
229        }
230    }
231
232    pub async fn load_key_schema<Out: LoadedSchema>(
233        &self,
234    ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
235        self.load_schema::<Out, true>().await
236    }
237
238    pub async fn load_val_schema<Out: LoadedSchema>(
239        &self,
240    ) -> Result<(SchemaVersion, Out), SchemaFetchError> {
241        self.load_schema::<Out, false>().await
242    }
243}
244
245pub trait LoadedSchema: Sized {
246    fn compile(primary: Subject, references: Vec<Subject>) -> Result<Self, SchemaFetchError>;
247}