1mod jni_catalog;
16mod mock_catalog;
17mod storage_catalog;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
22use ::iceberg::table::Table;
23use ::iceberg::{Catalog, TableIdent};
24use anyhow::{Context, anyhow};
25use iceberg::io::{
26 AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
27 GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD,
28};
29use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
30use phf::{Set, phf_set};
31use risingwave_common::bail;
32use serde_derive::Deserialize;
33use serde_with::serde_as;
34use url::Url;
35use with_options::WithOptions;
36
37use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
38use crate::deserialize_optional_bool_from_string;
39use crate::enforce_secret::EnforceSecret;
40use crate::error::ConnectorResult;
41
42#[serde_as]
43#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
44pub struct IcebergCommon {
45 #[serde(rename = "catalog.type")]
48 pub catalog_type: Option<String>,
49 #[serde(rename = "s3.region")]
50 pub region: Option<String>,
51 #[serde(rename = "s3.endpoint")]
52 pub endpoint: Option<String>,
53 #[serde(rename = "s3.access.key")]
54 pub access_key: Option<String>,
55 #[serde(rename = "s3.secret.key")]
56 pub secret_key: Option<String>,
57
58 #[serde(rename = "gcs.credential")]
59 pub gcs_credential: Option<String>,
60
61 #[serde(rename = "azblob.account_name")]
62 pub azblob_account_name: Option<String>,
63 #[serde(rename = "azblob.account_key")]
64 pub azblob_account_key: Option<String>,
65 #[serde(rename = "azblob.endpoint_url")]
66 pub azblob_endpoint_url: Option<String>,
67
68 #[serde(rename = "warehouse.path")]
70 pub warehouse_path: Option<String>,
71 #[serde(rename = "glue.id")]
74 pub glue_id: Option<String>,
75 #[serde(rename = "catalog.name")]
77 pub catalog_name: Option<String>,
78 #[serde(rename = "catalog.uri")]
80 pub catalog_uri: Option<String>,
81 #[serde(rename = "database.name")]
82 pub database_name: Option<String>,
83 #[serde(rename = "table.name")]
85 pub table_name: String,
86 #[serde(rename = "catalog.credential")]
89 pub credential: Option<String>,
90 #[serde(rename = "catalog.token")]
93 pub token: Option<String>,
94 #[serde(rename = "catalog.oauth2_server_uri")]
97 pub oauth2_server_uri: Option<String>,
98 #[serde(rename = "catalog.scope")]
101 pub scope: Option<String>,
102
103 #[serde(rename = "catalog.rest.signing_region")]
105 pub rest_signing_region: Option<String>,
106
107 #[serde(rename = "catalog.rest.signing_name")]
109 pub rest_signing_name: Option<String>,
110
111 #[serde(
113 rename = "catalog.rest.sigv4_enabled",
114 default,
115 deserialize_with = "deserialize_optional_bool_from_string"
116 )]
117 pub rest_sigv4_enabled: Option<bool>,
118
119 #[serde(
120 rename = "s3.path.style.access",
121 default,
122 deserialize_with = "deserialize_optional_bool_from_string"
123 )]
124 pub path_style_access: Option<bool>,
125 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
127 pub enable_config_load: Option<bool>,
128
129 #[serde(
131 rename = "hosted_catalog",
132 default,
133 deserialize_with = "deserialize_optional_bool_from_string"
134 )]
135 pub hosted_catalog: Option<bool>,
136}
137
138impl EnforceSecret for IcebergCommon {
139 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
140 "s3.access.key",
141 "s3.secret.key",
142 "gcs.credential",
143 "catalog.credential",
144 "catalog.token",
145 "catalog.oauth2_server_uri",
146 };
147}
148
149impl IcebergCommon {
150 pub fn catalog_type(&self) -> &str {
151 self.catalog_type.as_deref().unwrap_or("storage")
152 }
153
154 pub fn catalog_name(&self) -> String {
155 self.catalog_name
156 .as_ref()
157 .map(|s| s.to_string())
158 .unwrap_or_else(|| "risingwave".to_owned())
159 }
160
161 fn build_jni_catalog_configs(
163 &self,
164 java_catalog_props: &HashMap<String, String>,
165 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
166 let mut iceberg_configs = HashMap::new();
167 let enable_config_load = self.enable_config_load.unwrap_or(false);
168 let file_io_props = {
169 let catalog_type = self.catalog_type().to_owned();
170
171 if let Some(region) = &self.region {
172 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
174 }
175
176 if let Some(endpoint) = &self.endpoint {
177 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
179 }
180
181 if let Some(access_key) = &self.access_key {
183 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
184 }
185 if let Some(secret_key) = &self.secret_key {
186 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
187 }
188 if let Some(gcs_credential) = &self.gcs_credential {
189 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
190 if catalog_type != "rest" && catalog_type != "rest_rust" {
191 bail!("gcs unsupported in {} catalog", &catalog_type);
192 }
193 }
194
195 if let (
196 Some(azblob_account_name),
197 Some(azblob_account_key),
198 Some(azblob_endpoint_url),
199 ) = (
200 &self.azblob_account_name,
201 &self.azblob_account_key,
202 &self.azblob_endpoint_url,
203 ) {
204 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
205 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
206 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
207
208 if catalog_type != "rest" && catalog_type != "rest_rust" {
209 bail!("azblob unsupported in {} catalog", &catalog_type);
210 }
211 }
212
213 match &self.warehouse_path {
214 Some(warehouse_path) => {
215 let (bucket, _) = {
216 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
217 let url = Url::parse(warehouse_path);
218 if (url.is_err() || is_s3_tables)
219 && (catalog_type == "rest" || catalog_type == "rest_rust")
220 {
221 (None, None)
225 } else {
226 let url = url.with_context(|| {
227 format!("Invalid warehouse path: {}", warehouse_path)
228 })?;
229 let bucket = url
230 .host_str()
231 .with_context(|| {
232 format!(
233 "Invalid s3 path: {}, bucket is missing",
234 warehouse_path
235 )
236 })?
237 .to_owned();
238 let root = url.path().trim_start_matches('/').to_owned();
239 (Some(bucket), Some(root))
240 }
241 };
242
243 if let Some(bucket) = bucket {
244 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
245 }
246 }
247 None => {
248 if catalog_type != "rest" && catalog_type != "rest_rust" {
249 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
250 }
251 }
252 }
253 iceberg_configs.insert(
254 S3_DISABLE_CONFIG_LOAD.to_owned(),
255 (!enable_config_load).to_string(),
256 );
257
258 iceberg_configs.insert(
259 GCS_DISABLE_CONFIG_LOAD.to_owned(),
260 (!enable_config_load).to_string(),
261 );
262
263 iceberg_configs
264 };
265
266 let mut java_catalog_configs = HashMap::new();
268 {
269 if let Some(uri) = self.catalog_uri.as_deref() {
270 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
271 }
272
273 if let Some(warehouse_path) = &self.warehouse_path {
274 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
275 }
276 java_catalog_configs.extend(java_catalog_props.clone());
277
278 java_catalog_configs.insert(
280 "io-impl".to_owned(),
281 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
282 );
283
284 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
286
287 if let Some(region) = &self.region {
288 java_catalog_configs.insert("client.region".to_owned(), region.clone());
289 }
290 if let Some(endpoint) = &self.endpoint {
291 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
292 }
293
294 if let Some(access_key) = &self.access_key {
295 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
296 }
297 if let Some(secret_key) = &self.secret_key {
298 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
299 }
300
301 if let Some(path_style_access) = self.path_style_access {
302 java_catalog_configs.insert(
303 "s3.path-style-access".to_owned(),
304 path_style_access.to_string(),
305 );
306 }
307
308 match self.catalog_type.as_deref() {
309 Some("rest") => {
310 if let Some(credential) = &self.credential {
311 java_catalog_configs.insert("credential".to_owned(), credential.clone());
312 }
313 if let Some(token) = &self.token {
314 java_catalog_configs.insert("token".to_owned(), token.clone());
315 }
316 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
317 java_catalog_configs
318 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
319 }
320 if let Some(scope) = &self.scope {
321 java_catalog_configs.insert("scope".to_owned(), scope.clone());
322 }
323 if let Some(rest_signing_region) = &self.rest_signing_region {
324 java_catalog_configs.insert(
325 "rest.signing-region".to_owned(),
326 rest_signing_region.clone(),
327 );
328 }
329 if let Some(rest_signing_name) = &self.rest_signing_name {
330 java_catalog_configs
331 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
332 }
333 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
334 java_catalog_configs.insert(
335 "rest.sigv4-enabled".to_owned(),
336 rest_sigv4_enabled.to_string(),
337 );
338
339 if let Some(access_key) = &self.access_key {
340 java_catalog_configs
341 .insert("rest.access-key-id".to_owned(), access_key.clone());
342 }
343
344 if let Some(secret_key) = &self.secret_key {
345 java_catalog_configs
346 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
347 }
348 }
349 }
350 Some("glue") => {
351 if !enable_config_load {
352 java_catalog_configs.insert(
353 "client.credentials-provider".to_owned(),
354 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
355 );
356 if let Some(access_key) = &self.access_key {
359 java_catalog_configs.insert(
360 "client.credentials-provider.glue.access-key-id".to_owned(),
361 access_key.clone(),
362 );
363 }
364 if let Some(secret_key) = &self.secret_key {
365 java_catalog_configs.insert(
366 "client.credentials-provider.glue.secret-access-key".to_owned(),
367 secret_key.clone(),
368 );
369 }
370 }
371
372 if let Some(region) = &self.region {
373 java_catalog_configs.insert("client.region".to_owned(), region.clone());
374 java_catalog_configs.insert(
375 "glue.endpoint".to_owned(),
376 format!("https://glue.{}.amazonaws.com", region),
377 );
378 }
379
380 if let Some(glue_id) = self.glue_id.as_deref() {
381 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
382 }
383 }
384 _ => {}
385 }
386 }
387
388 Ok((file_io_props, java_catalog_configs))
389 }
390}
391
392impl IcebergCommon {
393 pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
394 let ret = if let Some(database_name) = &self.database_name {
395 TableIdent::from_strs(vec![database_name, &self.table_name])
396 } else {
397 TableIdent::from_strs(vec![&self.table_name])
398 };
399
400 Ok(ret.context("Failed to create table identifier")?)
401 }
402
403 pub async fn create_catalog(
405 &self,
406 java_catalog_props: &HashMap<String, String>,
407 ) -> ConnectorResult<Arc<dyn Catalog>> {
408 match self.catalog_type() {
409 "storage" => {
410 let warehouse = self
411 .warehouse_path
412 .clone()
413 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
414 let url = Url::parse(warehouse.as_ref())
415 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
416
417 let config = match url.scheme() {
418 "s3" | "s3a" => StorageCatalogConfig::S3(
419 storage_catalog::StorageCatalogS3Config::builder()
420 .warehouse(warehouse)
421 .access_key(self.access_key.clone())
422 .secret_key(self.secret_key.clone())
423 .region(self.region.clone())
424 .endpoint(self.endpoint.clone())
425 .enable_config_load(self.enable_config_load)
426 .build(),
427 ),
428 "gs" | "gcs" => StorageCatalogConfig::Gcs(
429 storage_catalog::StorageCatalogGcsConfig::builder()
430 .warehouse(warehouse)
431 .credential(self.gcs_credential.clone())
432 .enable_config_load(self.enable_config_load)
433 .build(),
434 ),
435 "azblob" => StorageCatalogConfig::Azblob(
436 storage_catalog::StorageCatalogAzblobConfig::builder()
437 .warehouse(warehouse)
438 .account_name(self.azblob_account_name.clone())
439 .account_key(self.azblob_account_key.clone())
440 .endpoint(self.azblob_endpoint_url.clone())
441 .build(),
442 ),
443 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
444 };
445
446 let catalog = storage_catalog::StorageCatalog::new(config)?;
447 Ok(Arc::new(catalog))
448 }
449 "rest_rust" => {
450 let mut iceberg_configs = HashMap::new();
451
452 if let Some(gcs_credential) = &self.gcs_credential {
454 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
455 } else {
456 if let Some(region) = &self.region {
457 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
458 }
459 if let Some(endpoint) = &self.endpoint {
460 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
461 }
462 if let Some(access_key) = &self.access_key {
463 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
464 }
465 if let Some(secret_key) = &self.secret_key {
466 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
467 }
468 };
469
470 if let Some(credential) = &self.credential {
471 iceberg_configs.insert("credential".to_owned(), credential.clone());
472 }
473 if let Some(token) = &self.token {
474 iceberg_configs.insert("token".to_owned(), token.clone());
475 }
476 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
477 iceberg_configs
478 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
479 }
480 if let Some(scope) = &self.scope {
481 iceberg_configs.insert("scope".to_owned(), scope.clone());
482 }
483
484 let config_builder =
485 iceberg_catalog_rest::RestCatalogConfig::builder()
486 .uri(self.catalog_uri.clone().with_context(|| {
487 "`catalog.uri` must be set in rest catalog".to_owned()
488 })?)
489 .props(iceberg_configs);
490
491 let config = match &self.warehouse_path {
492 Some(warehouse_path) => {
493 config_builder.warehouse(warehouse_path.clone()).build()
494 }
495 None => config_builder.build(),
496 };
497 let catalog = iceberg_catalog_rest::RestCatalog::new(config);
498 Ok(Arc::new(catalog))
499 }
500 "glue_rust" => {
501 let mut iceberg_configs = HashMap::new();
502 if let Some(region) = &self.region {
504 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
505 }
506 if let Some(access_key) = &self.access_key {
507 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
508 }
509 if let Some(secret_key) = &self.secret_key {
510 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
511 }
512 if let Some(region) = &self.region {
514 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
515 }
516 if let Some(endpoint) = &self.endpoint {
517 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
518 }
519 if let Some(access_key) = &self.access_key {
520 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
521 }
522 if let Some(secret_key) = &self.secret_key {
523 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
524 }
525 let config_builder =
526 iceberg_catalog_glue::GlueCatalogConfig::builder()
527 .warehouse(self.warehouse_path.clone().ok_or_else(|| {
528 anyhow!("`warehouse.path` must be set in glue catalog")
529 })?)
530 .props(iceberg_configs);
531 let config = if let Some(uri) = self.catalog_uri.as_deref() {
532 config_builder.uri(uri.to_owned()).build()
533 } else {
534 config_builder.build()
535 };
536 let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
537 Ok(Arc::new(catalog))
538 }
539 catalog_type
540 if catalog_type == "hive"
541 || catalog_type == "snowflake"
542 || catalog_type == "jdbc"
543 || catalog_type == "rest"
544 || catalog_type == "glue" =>
545 {
546 let (file_io_props, java_catalog_props) =
548 self.build_jni_catalog_configs(java_catalog_props)?;
549 let catalog_impl = match catalog_type {
550 "hive" => "org.apache.iceberg.hive.HiveCatalog",
551 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
552 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
553 "rest" => "org.apache.iceberg.rest.RESTCatalog",
554 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
555 _ => unreachable!(),
556 };
557
558 jni_catalog::JniCatalog::build_catalog(
559 file_io_props,
560 self.catalog_name(),
561 catalog_impl,
562 java_catalog_props,
563 )
564 }
565 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
566 _ => {
567 bail!(
568 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
569 self.catalog_type()
570 )
571 }
572 }
573 }
574
575 pub async fn load_table(
577 &self,
578 java_catalog_props: &HashMap<String, String>,
579 ) -> ConnectorResult<Table> {
580 let catalog = self
581 .create_catalog(java_catalog_props)
582 .await
583 .context("Unable to load iceberg catalog")?;
584
585 let table_id = self
586 .full_table_name()
587 .context("Unable to parse table name")?;
588
589 catalog.load_table(&table_id).await.map_err(Into::into)
590 }
591}