1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::{
30 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
31 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
32};
33use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
34use phf::{Set, phf_set};
35use risingwave_common::bail;
36use risingwave_common::util::deployment::Deployment;
37use risingwave_common::util::env_var::env_var_is_true;
38use serde::Deserialize;
39use serde_with::serde_as;
40use url::Url;
41use with_options::WithOptions;
42
43use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
44use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
45use crate::deserialize_optional_bool_from_string;
46use crate::enforce_secret::EnforceSecret;
47use crate::error::ConnectorResult;
48
49#[serde_as]
50#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
51pub struct IcebergCommon {
52 #[serde(rename = "catalog.type")]
55 pub catalog_type: Option<String>,
56 #[serde(rename = "s3.region")]
57 pub s3_region: Option<String>,
58 #[serde(rename = "s3.endpoint")]
59 pub s3_endpoint: Option<String>,
60 #[serde(rename = "s3.access.key")]
61 pub s3_access_key: Option<String>,
62 #[serde(rename = "s3.secret.key")]
63 pub s3_secret_key: Option<String>,
64 #[serde(rename = "s3.iam_role_arn")]
65 pub s3_iam_role_arn: Option<String>,
66
67 #[serde(rename = "glue.access.key")]
68 pub glue_access_key: Option<String>,
69 #[serde(rename = "glue.secret.key")]
70 pub glue_secret_key: Option<String>,
71 #[serde(rename = "glue.iam_role_arn")]
72 pub glue_iam_role_arn: Option<String>,
73 #[serde(rename = "glue.region")]
74 pub glue_region: Option<String>,
75 #[serde(rename = "glue.id")]
78 pub glue_id: Option<String>,
79
80 #[serde(rename = "gcs.credential")]
81 pub gcs_credential: Option<String>,
82
83 #[serde(rename = "azblob.account_name")]
84 pub azblob_account_name: Option<String>,
85 #[serde(rename = "azblob.account_key")]
86 pub azblob_account_key: Option<String>,
87 #[serde(rename = "azblob.endpoint_url")]
88 pub azblob_endpoint_url: Option<String>,
89
90 #[serde(rename = "adlsgen2.account_name")]
91 pub adlsgen2_account_name: Option<String>,
92 #[serde(rename = "adlsgen2.account_key")]
93 pub adlsgen2_account_key: Option<String>,
94 #[serde(rename = "adlsgen2.endpoint")]
95 pub adlsgen2_endpoint: Option<String>,
96
97 #[serde(rename = "warehouse.path")]
99 pub warehouse_path: Option<String>,
100 #[serde(rename = "catalog.name")]
102 pub catalog_name: Option<String>,
103 #[serde(rename = "catalog.uri")]
105 pub catalog_uri: Option<String>,
106 #[serde(rename = "catalog.credential")]
109 pub catalog_credential: Option<String>,
110 #[serde(rename = "catalog.token")]
113 pub catalog_token: Option<String>,
114 #[serde(rename = "catalog.oauth2_server_uri")]
117 pub catalog_oauth2_server_uri: Option<String>,
118 #[serde(rename = "catalog.scope")]
121 pub catalog_scope: Option<String>,
122
123 #[serde(rename = "catalog.rest.signing_region")]
125 pub rest_signing_region: Option<String>,
126
127 #[serde(rename = "catalog.rest.signing_name")]
129 pub rest_signing_name: Option<String>,
130
131 #[serde(
133 rename = "catalog.rest.sigv4_enabled",
134 default,
135 deserialize_with = "deserialize_optional_bool_from_string"
136 )]
137 pub rest_sigv4_enabled: Option<bool>,
138
139 #[serde(
140 rename = "s3.path.style.access",
141 default,
142 deserialize_with = "deserialize_optional_bool_from_string"
143 )]
144 pub s3_path_style_access: Option<bool>,
145 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
147 pub enable_config_load: Option<bool>,
148
149 #[serde(
151 rename = "hosted_catalog",
152 default,
153 deserialize_with = "deserialize_optional_bool_from_string"
154 )]
155 pub hosted_catalog: Option<bool>,
156
157 #[serde(rename = "catalog.header")]
164 pub catalog_header: Option<String>,
165
166 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
168 pub vended_credentials: Option<bool>,
169}
170
171impl EnforceSecret for IcebergCommon {
172 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
173 "s3.access.key",
174 "s3.secret.key",
175 "gcs.credential",
176 "catalog.credential",
177 "catalog.token",
178 "catalog.oauth2_server_uri",
179 "adlsgen2.account_key",
180 "adlsgen2.client_secret",
181 "glue.access.key",
182 "glue.secret.key",
183 };
184}
185
186#[serde_as]
187#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
188#[serde(deny_unknown_fields)]
189pub struct IcebergTableIdentifier {
190 #[serde(rename = "database.name")]
191 pub database_name: Option<String>,
192 #[serde(rename = "table.name")]
194 pub table_name: String,
195}
196
197impl IcebergTableIdentifier {
198 pub fn database_name(&self) -> Option<&str> {
199 self.database_name.as_deref()
200 }
201
202 pub fn table_name(&self) -> &str {
203 &self.table_name
204 }
205
206 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
207 let ret = if let Some(database_name) = &self.database_name {
208 TableIdent::from_strs(vec![database_name, &self.table_name])
209 } else {
210 TableIdent::from_strs(vec![&self.table_name])
211 };
212
213 Ok(ret.context("Failed to create table identifier")?)
214 }
215}
216
217impl IcebergCommon {
218 pub fn catalog_type(&self) -> &str {
219 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
220 if self.vended_credentials() && catalog_type == "rest" {
221 "rest_rust"
222 } else {
223 catalog_type
224 }
225 }
226
227 pub fn vended_credentials(&self) -> bool {
228 self.vended_credentials.unwrap_or(false)
229 }
230
231 fn glue_access_key(&self) -> Option<&str> {
232 self.glue_access_key
233 .as_deref()
234 .or(self.s3_access_key.as_deref())
235 }
236
237 fn glue_secret_key(&self) -> Option<&str> {
238 self.glue_secret_key
239 .as_deref()
240 .or(self.s3_secret_key.as_deref())
241 }
242
243 fn glue_region(&self) -> Option<&str> {
244 self.glue_region.as_deref().or(self.s3_region.as_deref())
245 }
246
247 pub fn catalog_name(&self) -> String {
248 self.catalog_name
249 .as_ref()
250 .cloned()
251 .unwrap_or_else(|| "risingwave".to_owned())
252 }
253
254 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
255 let mut headers = HashMap::new();
256 let user_agent = match Deployment::current() {
257 Deployment::Ci => "RisingWave(CI)".to_owned(),
258 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
259 Deployment::Other => "RisingWave(OSS)".to_owned(),
260 };
261 if self.vended_credentials() {
262 headers.insert(
263 "X-Iceberg-Access-Delegation".to_owned(),
264 "vended-credentials".to_owned(),
265 );
266 }
267 headers.insert("User-Agent".to_owned(), user_agent);
268 if let Some(header) = &self.catalog_header {
269 for pair in header.split(';') {
270 let mut parts = pair.split('=');
271 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
272 headers.insert(key.to_owned(), value.to_owned());
273 } else {
274 bail!("Invalid header format: {}", pair);
275 }
276 }
277 }
278 Ok(headers)
279 }
280
281 pub fn enable_config_load(&self) -> bool {
282 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
284 if matches!(self.enable_config_load, Some(true)) {
285 tracing::warn!(
286 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
287 );
288 }
289 return false;
290 }
291 self.enable_config_load.unwrap_or(false)
292 }
293
294 fn build_jni_catalog_configs(
296 &self,
297 java_catalog_props: &HashMap<String, String>,
298 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
299 let mut iceberg_configs = HashMap::new();
300 let enable_config_load = self.enable_config_load();
301 let file_io_props = {
302 let catalog_type = self.catalog_type().to_owned();
303
304 if let Some(region) = &self.s3_region {
305 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
307 }
308
309 if let Some(endpoint) = &self.s3_endpoint {
310 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
312 }
313
314 if let Some(access_key) = &self.s3_access_key {
316 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
317 }
318 if let Some(secret_key) = &self.s3_secret_key {
319 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
320 }
321 if let Some(role_arn) = &self.s3_iam_role_arn {
322 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
323 }
324 if let Some(gcs_credential) = &self.gcs_credential {
325 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
326 if catalog_type != "rest" && catalog_type != "rest_rust" {
327 bail!("gcs unsupported in {} catalog", &catalog_type);
328 }
329 }
330
331 if let (
332 Some(azblob_account_name),
333 Some(azblob_account_key),
334 Some(azblob_endpoint_url),
335 ) = (
336 &self.azblob_account_name,
337 &self.azblob_account_key,
338 &self.azblob_endpoint_url,
339 ) {
340 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
341 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
342 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
343
344 if catalog_type != "rest" && catalog_type != "rest_rust" {
345 bail!("azblob unsupported in {} catalog", &catalog_type);
346 }
347 }
348
349 if let (Some(account_name), Some(account_key)) = (
350 self.adlsgen2_account_name.as_ref(),
351 self.adlsgen2_account_key.as_ref(),
352 ) {
353 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
354 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
355 if catalog_type != "rest" && catalog_type != "rest_rust" {
356 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
357 }
358 }
359
360 match &self.warehouse_path {
361 Some(warehouse_path) => {
362 let (bucket, _) = {
363 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
364 let url = Url::parse(warehouse_path);
365 if (url.is_err() || is_s3_tables)
366 && (catalog_type == "rest" || catalog_type == "rest_rust")
367 {
368 (None, None)
372 } else {
373 let url = url.with_context(|| {
374 format!("Invalid warehouse path: {}", warehouse_path)
375 })?;
376 let bucket = url
377 .host_str()
378 .with_context(|| {
379 format!(
380 "Invalid s3 path: {}, bucket is missing",
381 warehouse_path
382 )
383 })?
384 .to_owned();
385 let root = url.path().trim_start_matches('/').to_owned();
386 (Some(bucket), Some(root))
387 }
388 };
389
390 if let Some(bucket) = bucket {
391 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
392 }
393 }
394 None => {
395 if catalog_type != "rest" && catalog_type != "rest_rust" {
396 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
397 }
398 }
399 }
400 iceberg_configs.insert(
401 S3_DISABLE_CONFIG_LOAD.to_owned(),
402 (!enable_config_load).to_string(),
403 );
404
405 iceberg_configs.insert(
406 GCS_DISABLE_CONFIG_LOAD.to_owned(),
407 (!enable_config_load).to_string(),
408 );
409
410 if let Some(path_style_access) = self.s3_path_style_access {
411 iceberg_configs.insert(
412 S3_PATH_STYLE_ACCESS.to_owned(),
413 path_style_access.to_string(),
414 );
415 }
416
417 iceberg_configs
418 };
419
420 let mut java_catalog_configs = HashMap::new();
422 {
423 if let Some(uri) = self.catalog_uri.as_deref() {
424 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
425 }
426
427 if let Some(warehouse_path) = &self.warehouse_path {
428 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
429 }
430 java_catalog_configs.extend(java_catalog_props.clone());
431
432 java_catalog_configs.insert(
434 "io-impl".to_owned(),
435 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
436 );
437
438 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
440
441 if let Some(region) = &self.s3_region {
442 java_catalog_configs.insert("client.region".to_owned(), region.clone());
443 }
444 if let Some(endpoint) = &self.s3_endpoint {
445 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
446 }
447
448 if let Some(access_key) = &self.s3_access_key {
449 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
450 }
451 if let Some(secret_key) = &self.s3_secret_key {
452 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
453 }
454
455 if let Some(path_style_access) = &self.s3_path_style_access {
456 java_catalog_configs.insert(
457 "s3.path-style-access".to_owned(),
458 path_style_access.to_string(),
459 );
460 }
461
462 let headers = self.headers()?;
463 for (header_name, header_value) in headers {
464 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
465 }
466
467 match self.catalog_type() {
468 "rest" => {
469 if let Some(credential) = &self.catalog_credential {
470 java_catalog_configs.insert("credential".to_owned(), credential.clone());
471 }
472 if let Some(token) = &self.catalog_token {
473 java_catalog_configs.insert("token".to_owned(), token.clone());
474 }
475 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
476 java_catalog_configs
477 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
478 }
479 if let Some(scope) = &self.catalog_scope {
480 java_catalog_configs.insert("scope".to_owned(), scope.clone());
481 }
482 if let Some(rest_signing_region) = &self.rest_signing_region {
483 java_catalog_configs.insert(
484 "rest.signing-region".to_owned(),
485 rest_signing_region.clone(),
486 );
487 }
488 if let Some(rest_signing_name) = &self.rest_signing_name {
489 java_catalog_configs
490 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
491 }
492 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
493 java_catalog_configs.insert(
494 "rest.sigv4-enabled".to_owned(),
495 rest_sigv4_enabled.to_string(),
496 );
497
498 if let Some(access_key) = &self.s3_access_key {
499 java_catalog_configs
500 .insert("rest.access-key-id".to_owned(), access_key.clone());
501 }
502
503 if let Some(secret_key) = &self.s3_secret_key {
504 java_catalog_configs
505 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
506 }
507 }
508 }
509 "glue" => {
510 let glue_access_key = self.glue_access_key();
511 let glue_secret_key = self.glue_secret_key();
512 let has_glue_credentials =
513 glue_access_key.is_some() && glue_secret_key.is_some();
514 let should_configure_glue_provider = !enable_config_load
515 || has_glue_credentials
516 || self.glue_iam_role_arn.is_some();
517
518 if should_configure_glue_provider {
519 java_catalog_configs.insert(
520 "client.credentials-provider".to_owned(),
521 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
522 );
523 if let Some(region) = self.glue_region() {
524 java_catalog_configs.insert(
525 "client.credentials-provider.glue.region".to_owned(),
526 region.to_owned(),
527 );
528 }
529 if let Some(access_key) = glue_access_key {
530 java_catalog_configs.insert(
531 "client.credentials-provider.glue.access-key-id".to_owned(),
532 access_key.to_owned(),
533 );
534 }
535 if let Some(secret_key) = glue_secret_key {
536 java_catalog_configs.insert(
537 "client.credentials-provider.glue.secret-access-key".to_owned(),
538 secret_key.to_owned(),
539 );
540 }
541 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
542 java_catalog_configs.insert(
543 "client.credentials-provider.glue.iam-role-arn".to_owned(),
544 role_arn.to_owned(),
545 );
546 }
547 if enable_config_load && !has_glue_credentials {
548 java_catalog_configs.insert(
549 "client.credentials-provider.glue.use-default-credential-chain"
550 .to_owned(),
551 "true".to_owned(),
552 );
553 }
554 }
555
556 if let Some(region) = self.glue_region() {
557 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
558 java_catalog_configs.insert(
559 "glue.endpoint".to_owned(),
560 format!("https://glue.{}.amazonaws.com", region),
561 );
562 }
563
564 if let Some(glue_id) = self.glue_id.as_deref() {
565 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
566 }
567 }
568 _ => {}
569 }
570 }
571
572 Ok((file_io_props, java_catalog_configs))
573 }
574}
575
576impl IcebergCommon {
577 pub async fn create_catalog(
579 &self,
580 java_catalog_props: &HashMap<String, String>,
581 ) -> ConnectorResult<Arc<dyn Catalog>> {
582 match self.catalog_type() {
583 "storage" => {
584 let warehouse = self
585 .warehouse_path
586 .clone()
587 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
588 let url = Url::parse(warehouse.as_ref())
589 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
590
591 let config = match url.scheme() {
592 "s3" | "s3a" => StorageCatalogConfig::S3(
593 storage_catalog::StorageCatalogS3Config::builder()
594 .warehouse(warehouse)
595 .access_key(self.s3_access_key.clone())
596 .secret_key(self.s3_secret_key.clone())
597 .region(self.s3_region.clone())
598 .endpoint(self.s3_endpoint.clone())
599 .path_style_access(self.s3_path_style_access)
600 .enable_config_load(Some(self.enable_config_load()))
601 .build(),
602 ),
603 "gs" | "gcs" => StorageCatalogConfig::Gcs(
604 storage_catalog::StorageCatalogGcsConfig::builder()
605 .warehouse(warehouse)
606 .credential(self.gcs_credential.clone())
607 .enable_config_load(Some(self.enable_config_load()))
608 .build(),
609 ),
610 "azblob" => StorageCatalogConfig::Azblob(
611 storage_catalog::StorageCatalogAzblobConfig::builder()
612 .warehouse(warehouse)
613 .account_name(self.azblob_account_name.clone())
614 .account_key(self.azblob_account_key.clone())
615 .endpoint(self.azblob_endpoint_url.clone())
616 .build(),
617 ),
618 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
619 };
620
621 let catalog = storage_catalog::StorageCatalog::new(config)?;
622 Ok(Arc::new(catalog))
623 }
624 "rest_rust" => {
625 let mut iceberg_configs = HashMap::new();
626
627 if let Some(gcs_credential) = &self.gcs_credential {
629 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
630 } else {
631 if let Some(region) = &self.s3_region {
632 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
633 }
634 if let Some(endpoint) = &self.s3_endpoint {
635 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
636 }
637 if let Some(access_key) = &self.s3_access_key {
638 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
639 }
640 if let Some(secret_key) = &self.s3_secret_key {
641 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
642 }
643 if let Some(path_style_access) = &self.s3_path_style_access {
644 iceberg_configs.insert(
645 S3_PATH_STYLE_ACCESS.to_owned(),
646 path_style_access.to_string(),
647 );
648 }
649 };
650
651 if let Some(credential) = &self.catalog_credential {
652 iceberg_configs.insert("credential".to_owned(), credential.clone());
653 }
654 if let Some(token) = &self.catalog_token {
655 iceberg_configs.insert("token".to_owned(), token.clone());
656 }
657 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
658 iceberg_configs
659 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
660 }
661 if let Some(scope) = &self.catalog_scope {
662 iceberg_configs.insert("scope".to_owned(), scope.clone());
663 }
664
665 let headers = self.headers()?;
666 for (header_name, header_value) in headers {
667 iceberg_configs.insert(format!("header.{}", header_name), header_value);
668 }
669
670 let config_builder =
671 iceberg_catalog_rest::RestCatalogConfig::builder()
672 .uri(self.catalog_uri.clone().with_context(|| {
673 "`catalog.uri` must be set in rest catalog".to_owned()
674 })?)
675 .props(iceberg_configs);
676
677 let config = match &self.warehouse_path {
678 Some(warehouse_path) => {
679 config_builder.warehouse(warehouse_path.clone()).build()
680 }
681 None => config_builder.build(),
682 };
683 let catalog = iceberg_catalog_rest::RestCatalog::new(config);
684 Ok(Arc::new(catalog))
685 }
686 "glue_rust" => {
687 let mut iceberg_configs = HashMap::new();
688 if let Some(region) = self.glue_region() {
690 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
691 }
692 if let Some(access_key) = self.glue_access_key() {
693 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
694 }
695 if let Some(secret_key) = self.glue_secret_key() {
696 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
697 }
698 if let Some(region) = &self.s3_region {
700 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
701 }
702 if let Some(endpoint) = &self.s3_endpoint {
703 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
704 }
705 if let Some(access_key) = &self.s3_access_key {
706 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
707 }
708 if let Some(secret_key) = &self.s3_secret_key {
709 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
710 }
711 if let Some(role_arn) = &self.s3_iam_role_arn {
712 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
713 }
714 if let Some(path_style_access) = &self.s3_path_style_access {
715 iceberg_configs.insert(
716 S3_PATH_STYLE_ACCESS.to_owned(),
717 path_style_access.to_string(),
718 );
719 }
720 let config_builder =
721 iceberg_catalog_glue::GlueCatalogConfig::builder()
722 .warehouse(self.warehouse_path.clone().ok_or_else(|| {
723 anyhow!("`warehouse.path` must be set in glue catalog")
724 })?)
725 .props(iceberg_configs);
726 let config = if let Some(uri) = self.catalog_uri.as_deref() {
727 config_builder.uri(uri.to_owned()).build()
728 } else {
729 config_builder.build()
730 };
731 let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
732 Ok(Arc::new(catalog))
733 }
734 catalog_type
735 if catalog_type == "hive"
736 || catalog_type == "snowflake"
737 || catalog_type == "jdbc"
738 || catalog_type == "rest"
739 || catalog_type == "glue" =>
740 {
741 let (file_io_props, java_catalog_props) =
743 self.build_jni_catalog_configs(java_catalog_props)?;
744 let catalog_impl = match catalog_type {
745 "hive" => "org.apache.iceberg.hive.HiveCatalog",
746 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
747 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
748 "rest" => "org.apache.iceberg.rest.RESTCatalog",
749 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
750 _ => unreachable!(),
751 };
752
753 jni_catalog::JniCatalog::build_catalog(
754 file_io_props,
755 self.catalog_name(),
756 catalog_impl,
757 java_catalog_props,
758 )
759 }
760 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
761 _ => {
762 bail!(
763 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
764 self.catalog_type()
765 )
766 }
767 }
768 }
769
770 pub async fn load_table(
772 &self,
773 table: &IcebergTableIdentifier,
774 java_catalog_props: &HashMap<String, String>,
775 ) -> ConnectorResult<Table> {
776 let catalog = self
777 .create_catalog(java_catalog_props)
778 .await
779 .context("Unable to load iceberg catalog")?;
780
781 let table_id = table
782 .to_table_ident()
783 .context("Unable to parse table name")?;
784
785 catalog.load_table(&table_id).await.map_err(Into::into)
786 }
787}