1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::{
30 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
31 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
32};
33use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
34use phf::{Set, phf_set};
35use risingwave_common::bail;
36use risingwave_common::error::IcebergError;
37use risingwave_common::util::deployment::Deployment;
38use risingwave_common::util::env_var::env_var_is_true;
39use serde::Deserialize;
40use serde_with::serde_as;
41use url::Url;
42use with_options::WithOptions;
43
44use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
45use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
46use crate::deserialize_optional_bool_from_string;
47use crate::enforce_secret::EnforceSecret;
48use crate::error::ConnectorResult;
49
50#[serde_as]
51#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
52pub struct IcebergCommon {
53 #[serde(rename = "catalog.type")]
56 pub catalog_type: Option<String>,
57 #[serde(rename = "s3.region")]
58 pub s3_region: Option<String>,
59 #[serde(rename = "s3.endpoint")]
60 pub s3_endpoint: Option<String>,
61 #[serde(rename = "s3.access.key")]
62 pub s3_access_key: Option<String>,
63 #[serde(rename = "s3.secret.key")]
64 pub s3_secret_key: Option<String>,
65 #[serde(rename = "s3.iam_role_arn")]
66 pub s3_iam_role_arn: Option<String>,
67
68 #[serde(rename = "glue.access.key")]
69 pub glue_access_key: Option<String>,
70 #[serde(rename = "glue.secret.key")]
71 pub glue_secret_key: Option<String>,
72 #[serde(rename = "glue.iam_role_arn")]
73 pub glue_iam_role_arn: Option<String>,
74 #[serde(rename = "glue.region")]
75 pub glue_region: Option<String>,
76 #[serde(rename = "glue.id")]
79 pub glue_id: Option<String>,
80
81 #[serde(rename = "gcs.credential")]
82 pub gcs_credential: Option<String>,
83
84 #[serde(rename = "azblob.account_name")]
85 pub azblob_account_name: Option<String>,
86 #[serde(rename = "azblob.account_key")]
87 pub azblob_account_key: Option<String>,
88 #[serde(rename = "azblob.endpoint_url")]
89 pub azblob_endpoint_url: Option<String>,
90
91 #[serde(rename = "adlsgen2.account_name")]
92 pub adlsgen2_account_name: Option<String>,
93 #[serde(rename = "adlsgen2.account_key")]
94 pub adlsgen2_account_key: Option<String>,
95 #[serde(rename = "adlsgen2.endpoint")]
96 pub adlsgen2_endpoint: Option<String>,
97
98 #[serde(rename = "warehouse.path")]
100 pub warehouse_path: Option<String>,
101 #[serde(rename = "catalog.name")]
103 pub catalog_name: Option<String>,
104 #[serde(rename = "catalog.uri")]
106 pub catalog_uri: Option<String>,
107 #[serde(rename = "catalog.credential")]
110 pub catalog_credential: Option<String>,
111 #[serde(rename = "catalog.token")]
114 pub catalog_token: Option<String>,
115 #[serde(rename = "catalog.oauth2_server_uri")]
118 pub catalog_oauth2_server_uri: Option<String>,
119 #[serde(rename = "catalog.scope")]
122 pub catalog_scope: Option<String>,
123
124 #[serde(rename = "catalog.rest.signing_region")]
126 pub rest_signing_region: Option<String>,
127
128 #[serde(rename = "catalog.rest.signing_name")]
130 pub rest_signing_name: Option<String>,
131
132 #[serde(
134 rename = "catalog.rest.sigv4_enabled",
135 default,
136 deserialize_with = "deserialize_optional_bool_from_string"
137 )]
138 pub rest_sigv4_enabled: Option<bool>,
139
140 #[serde(
141 rename = "s3.path.style.access",
142 default,
143 deserialize_with = "deserialize_optional_bool_from_string"
144 )]
145 pub s3_path_style_access: Option<bool>,
146 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
148 pub enable_config_load: Option<bool>,
149
150 #[serde(
152 rename = "hosted_catalog",
153 default,
154 deserialize_with = "deserialize_optional_bool_from_string"
155 )]
156 pub hosted_catalog: Option<bool>,
157
158 #[serde(rename = "catalog.header")]
165 pub catalog_header: Option<String>,
166
167 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
169 pub vended_credentials: Option<bool>,
170}
171
172impl EnforceSecret for IcebergCommon {
173 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
174 "s3.access.key",
175 "s3.secret.key",
176 "gcs.credential",
177 "catalog.credential",
178 "catalog.token",
179 "catalog.oauth2_server_uri",
180 "adlsgen2.account_key",
181 "adlsgen2.client_secret",
182 "glue.access.key",
183 "glue.secret.key",
184 };
185}
186
187#[serde_as]
188#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
189#[serde(deny_unknown_fields)]
190pub struct IcebergTableIdentifier {
191 #[serde(rename = "database.name")]
192 pub database_name: Option<String>,
193 #[serde(rename = "table.name")]
195 pub table_name: String,
196}
197
198impl IcebergTableIdentifier {
199 pub fn database_name(&self) -> Option<&str> {
200 self.database_name.as_deref()
201 }
202
203 pub fn table_name(&self) -> &str {
204 &self.table_name
205 }
206
207 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
208 let ret = if let Some(database_name) = &self.database_name {
209 TableIdent::from_strs(vec![database_name, &self.table_name])
210 } else {
211 TableIdent::from_strs(vec![&self.table_name])
212 };
213
214 Ok(ret.context("Failed to create table identifier")?)
215 }
216}
217
218impl IcebergCommon {
219 pub fn catalog_type(&self) -> &str {
220 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
221 if self.vended_credentials() && catalog_type == "rest" {
222 "rest_rust"
223 } else {
224 catalog_type
225 }
226 }
227
228 pub fn vended_credentials(&self) -> bool {
229 self.vended_credentials.unwrap_or(false)
230 }
231
232 fn glue_access_key(&self) -> Option<&str> {
233 self.glue_access_key
234 .as_deref()
235 .or(self.s3_access_key.as_deref())
236 }
237
238 fn glue_secret_key(&self) -> Option<&str> {
239 self.glue_secret_key
240 .as_deref()
241 .or(self.s3_secret_key.as_deref())
242 }
243
244 fn glue_region(&self) -> Option<&str> {
245 self.glue_region.as_deref().or(self.s3_region.as_deref())
246 }
247
248 pub fn catalog_name(&self) -> String {
249 self.catalog_name
250 .as_ref()
251 .cloned()
252 .unwrap_or_else(|| "risingwave".to_owned())
253 }
254
255 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
256 let mut headers = HashMap::new();
257 let user_agent = match Deployment::current() {
258 Deployment::Ci => "RisingWave(CI)".to_owned(),
259 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
260 Deployment::Other => "RisingWave(OSS)".to_owned(),
261 };
262 if self.vended_credentials() {
263 headers.insert(
264 "X-Iceberg-Access-Delegation".to_owned(),
265 "vended-credentials".to_owned(),
266 );
267 }
268 headers.insert("User-Agent".to_owned(), user_agent);
269 if let Some(header) = &self.catalog_header {
270 for pair in header.split(';') {
271 let mut parts = pair.split('=');
272 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
273 headers.insert(key.to_owned(), value.to_owned());
274 } else {
275 bail!("Invalid header format: {}", pair);
276 }
277 }
278 }
279 Ok(headers)
280 }
281
282 pub fn enable_config_load(&self) -> bool {
283 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
285 if matches!(self.enable_config_load, Some(true)) {
286 tracing::warn!(
287 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
288 );
289 }
290 return false;
291 }
292 self.enable_config_load.unwrap_or(false)
293 }
294
295 fn build_jni_catalog_configs(
297 &self,
298 java_catalog_props: &HashMap<String, String>,
299 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
300 let mut iceberg_configs = HashMap::new();
301 let enable_config_load = self.enable_config_load();
302 let file_io_props = {
303 let catalog_type = self.catalog_type().to_owned();
304
305 if let Some(region) = &self.s3_region {
306 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
308 }
309
310 if let Some(endpoint) = &self.s3_endpoint {
311 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
313 }
314
315 if let Some(access_key) = &self.s3_access_key {
317 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
318 }
319 if let Some(secret_key) = &self.s3_secret_key {
320 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
321 }
322 if let Some(role_arn) = &self.s3_iam_role_arn {
323 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
324 }
325 if let Some(gcs_credential) = &self.gcs_credential {
326 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
327 if catalog_type != "rest" && catalog_type != "rest_rust" {
328 bail!("gcs unsupported in {} catalog", &catalog_type);
329 }
330 }
331
332 if let (
333 Some(azblob_account_name),
334 Some(azblob_account_key),
335 Some(azblob_endpoint_url),
336 ) = (
337 &self.azblob_account_name,
338 &self.azblob_account_key,
339 &self.azblob_endpoint_url,
340 ) {
341 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
342 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
343 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
344
345 if catalog_type != "rest" && catalog_type != "rest_rust" {
346 bail!("azblob unsupported in {} catalog", &catalog_type);
347 }
348 }
349
350 if let (Some(account_name), Some(account_key)) = (
351 self.adlsgen2_account_name.as_ref(),
352 self.adlsgen2_account_key.as_ref(),
353 ) {
354 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
355 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
356 if catalog_type != "rest" && catalog_type != "rest_rust" {
357 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
358 }
359 }
360
361 match &self.warehouse_path {
362 Some(warehouse_path) => {
363 let (bucket, _) = {
364 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
365 let url = Url::parse(warehouse_path);
366 if (url.is_err() || is_s3_tables)
367 && (catalog_type == "rest" || catalog_type == "rest_rust")
368 {
369 (None, None)
373 } else {
374 let url = url.with_context(|| {
375 format!("Invalid warehouse path: {}", warehouse_path)
376 })?;
377 let bucket = url
378 .host_str()
379 .with_context(|| {
380 format!(
381 "Invalid s3 path: {}, bucket is missing",
382 warehouse_path
383 )
384 })?
385 .to_owned();
386 let root = url.path().trim_start_matches('/').to_owned();
387 (Some(bucket), Some(root))
388 }
389 };
390
391 if let Some(bucket) = bucket {
392 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
393 }
394 }
395 None => {
396 if catalog_type != "rest" && catalog_type != "rest_rust" {
397 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
398 }
399 }
400 }
401 iceberg_configs.insert(
402 S3_DISABLE_CONFIG_LOAD.to_owned(),
403 (!enable_config_load).to_string(),
404 );
405
406 iceberg_configs.insert(
407 GCS_DISABLE_CONFIG_LOAD.to_owned(),
408 (!enable_config_load).to_string(),
409 );
410
411 if let Some(path_style_access) = self.s3_path_style_access {
412 iceberg_configs.insert(
413 S3_PATH_STYLE_ACCESS.to_owned(),
414 path_style_access.to_string(),
415 );
416 }
417
418 iceberg_configs
419 };
420
421 let mut java_catalog_configs = HashMap::new();
423 {
424 if let Some(uri) = self.catalog_uri.as_deref() {
425 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
426 }
427
428 if let Some(warehouse_path) = &self.warehouse_path {
429 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
430 }
431 java_catalog_configs.extend(java_catalog_props.clone());
432
433 java_catalog_configs.insert(
435 "io-impl".to_owned(),
436 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
437 );
438
439 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
441
442 if let Some(region) = &self.s3_region {
443 java_catalog_configs.insert("client.region".to_owned(), region.clone());
444 }
445 if let Some(endpoint) = &self.s3_endpoint {
446 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
447 }
448
449 if let Some(access_key) = &self.s3_access_key {
450 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
451 }
452 if let Some(secret_key) = &self.s3_secret_key {
453 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
454 }
455
456 if let Some(path_style_access) = &self.s3_path_style_access {
457 java_catalog_configs.insert(
458 "s3.path-style-access".to_owned(),
459 path_style_access.to_string(),
460 );
461 }
462
463 let headers = self.headers()?;
464 for (header_name, header_value) in headers {
465 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
466 }
467
468 match self.catalog_type() {
469 "rest" => {
470 if let Some(credential) = &self.catalog_credential {
471 java_catalog_configs.insert("credential".to_owned(), credential.clone());
472 }
473 if let Some(token) = &self.catalog_token {
474 java_catalog_configs.insert("token".to_owned(), token.clone());
475 }
476 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
477 java_catalog_configs
478 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
479 }
480 if let Some(scope) = &self.catalog_scope {
481 java_catalog_configs.insert("scope".to_owned(), scope.clone());
482 }
483 if let Some(rest_signing_region) = &self.rest_signing_region {
484 java_catalog_configs.insert(
485 "rest.signing-region".to_owned(),
486 rest_signing_region.clone(),
487 );
488 }
489 if let Some(rest_signing_name) = &self.rest_signing_name {
490 java_catalog_configs
491 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
492 }
493 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
494 java_catalog_configs.insert(
495 "rest.sigv4-enabled".to_owned(),
496 rest_sigv4_enabled.to_string(),
497 );
498
499 if let Some(access_key) = &self.s3_access_key {
500 java_catalog_configs
501 .insert("rest.access-key-id".to_owned(), access_key.clone());
502 }
503
504 if let Some(secret_key) = &self.s3_secret_key {
505 java_catalog_configs
506 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
507 }
508 }
509 }
510 "glue" => {
511 let glue_access_key = self.glue_access_key();
512 let glue_secret_key = self.glue_secret_key();
513 let has_glue_credentials =
514 glue_access_key.is_some() && glue_secret_key.is_some();
515 let should_configure_glue_provider = !enable_config_load
516 || has_glue_credentials
517 || self.glue_iam_role_arn.is_some();
518
519 if should_configure_glue_provider {
520 java_catalog_configs.insert(
521 "client.credentials-provider".to_owned(),
522 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
523 );
524 if let Some(region) = self.glue_region() {
525 java_catalog_configs.insert(
526 "client.credentials-provider.glue.region".to_owned(),
527 region.to_owned(),
528 );
529 }
530 if let Some(access_key) = glue_access_key {
531 java_catalog_configs.insert(
532 "client.credentials-provider.glue.access-key-id".to_owned(),
533 access_key.to_owned(),
534 );
535 }
536 if let Some(secret_key) = glue_secret_key {
537 java_catalog_configs.insert(
538 "client.credentials-provider.glue.secret-access-key".to_owned(),
539 secret_key.to_owned(),
540 );
541 }
542 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
543 java_catalog_configs.insert(
544 "client.credentials-provider.glue.iam-role-arn".to_owned(),
545 role_arn.to_owned(),
546 );
547 }
548 if enable_config_load && !has_glue_credentials {
549 java_catalog_configs.insert(
550 "client.credentials-provider.glue.use-default-credential-chain"
551 .to_owned(),
552 "true".to_owned(),
553 );
554 }
555 }
556
557 if let Some(region) = self.glue_region() {
558 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
559 java_catalog_configs.insert(
560 "glue.endpoint".to_owned(),
561 format!("https://glue.{}.amazonaws.com", region),
562 );
563 }
564
565 if let Some(glue_id) = self.glue_id.as_deref() {
566 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
567 }
568 }
569 _ => {}
570 }
571 }
572
573 Ok((file_io_props, java_catalog_configs))
574 }
575}
576
577impl IcebergCommon {
578 pub async fn create_catalog(
580 &self,
581 java_catalog_props: &HashMap<String, String>,
582 ) -> ConnectorResult<Arc<dyn Catalog>> {
583 match self.catalog_type() {
584 "storage" => {
585 let warehouse = self
586 .warehouse_path
587 .clone()
588 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
589 let url = Url::parse(warehouse.as_ref())
590 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
591
592 let config = match url.scheme() {
593 "s3" | "s3a" => StorageCatalogConfig::S3(
594 storage_catalog::StorageCatalogS3Config::builder()
595 .warehouse(warehouse)
596 .access_key(self.s3_access_key.clone())
597 .secret_key(self.s3_secret_key.clone())
598 .region(self.s3_region.clone())
599 .endpoint(self.s3_endpoint.clone())
600 .path_style_access(self.s3_path_style_access)
601 .enable_config_load(Some(self.enable_config_load()))
602 .build(),
603 ),
604 "gs" | "gcs" => StorageCatalogConfig::Gcs(
605 storage_catalog::StorageCatalogGcsConfig::builder()
606 .warehouse(warehouse)
607 .credential(self.gcs_credential.clone())
608 .enable_config_load(Some(self.enable_config_load()))
609 .build(),
610 ),
611 "azblob" => StorageCatalogConfig::Azblob(
612 storage_catalog::StorageCatalogAzblobConfig::builder()
613 .warehouse(warehouse)
614 .account_name(self.azblob_account_name.clone())
615 .account_key(self.azblob_account_key.clone())
616 .endpoint(self.azblob_endpoint_url.clone())
617 .build(),
618 ),
619 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
620 };
621
622 let catalog = storage_catalog::StorageCatalog::new(config)?;
623 Ok(Arc::new(catalog))
624 }
625 "rest_rust" => {
626 let mut iceberg_configs = HashMap::new();
627
628 if let Some(gcs_credential) = &self.gcs_credential {
630 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
631 } else {
632 if let Some(region) = &self.s3_region {
633 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
634 }
635 if let Some(endpoint) = &self.s3_endpoint {
636 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
637 }
638 if let Some(access_key) = &self.s3_access_key {
639 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
640 }
641 if let Some(secret_key) = &self.s3_secret_key {
642 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
643 }
644 if let Some(path_style_access) = &self.s3_path_style_access {
645 iceberg_configs.insert(
646 S3_PATH_STYLE_ACCESS.to_owned(),
647 path_style_access.to_string(),
648 );
649 }
650 };
651
652 if let Some(credential) = &self.catalog_credential {
653 iceberg_configs.insert("credential".to_owned(), credential.clone());
654 }
655 if let Some(token) = &self.catalog_token {
656 iceberg_configs.insert("token".to_owned(), token.clone());
657 }
658 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
659 iceberg_configs
660 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
661 }
662 if let Some(scope) = &self.catalog_scope {
663 iceberg_configs.insert("scope".to_owned(), scope.clone());
664 }
665
666 let headers = self.headers()?;
667 for (header_name, header_value) in headers {
668 iceberg_configs.insert(format!("header.{}", header_name), header_value);
669 }
670
671 iceberg_configs.insert(
672 iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
673 self.catalog_uri
674 .clone()
675 .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
676 );
677 if let Some(warehouse_path) = &self.warehouse_path {
678 iceberg_configs.insert(
679 iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
680 warehouse_path.clone(),
681 );
682 }
683 let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
684 .load("rest", iceberg_configs)
685 .await
686 .map_err(|e| anyhow!(IcebergError::from(e)))?;
687 Ok(Arc::new(catalog))
688 }
689 "glue_rust" => {
690 let mut iceberg_configs = HashMap::new();
691 if let Some(region) = self.glue_region() {
693 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
694 }
695 if let Some(access_key) = self.glue_access_key() {
696 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
697 }
698 if let Some(secret_key) = self.glue_secret_key() {
699 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
700 }
701 if let Some(region) = &self.s3_region {
703 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
704 }
705 if let Some(endpoint) = &self.s3_endpoint {
706 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
707 }
708 if let Some(access_key) = &self.s3_access_key {
709 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
710 }
711 if let Some(secret_key) = &self.s3_secret_key {
712 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
713 }
714 if let Some(role_arn) = &self.s3_iam_role_arn {
715 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
716 }
717 if let Some(path_style_access) = &self.s3_path_style_access {
718 iceberg_configs.insert(
719 S3_PATH_STYLE_ACCESS.to_owned(),
720 path_style_access.to_string(),
721 );
722 }
723 iceberg_configs.insert(
724 iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
725 self.warehouse_path
726 .clone()
727 .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
728 );
729 if let Some(uri) = self.catalog_uri.as_deref() {
730 iceberg_configs.insert(
731 iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
732 uri.to_owned(),
733 );
734 }
735 let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
736 .load("glue", iceberg_configs)
737 .await
738 .map_err(|e| anyhow!(IcebergError::from(e)))?;
739 Ok(Arc::new(catalog))
740 }
741 catalog_type
742 if catalog_type == "hive"
743 || catalog_type == "snowflake"
744 || catalog_type == "jdbc"
745 || catalog_type == "rest"
746 || catalog_type == "glue" =>
747 {
748 let (file_io_props, java_catalog_props) =
750 self.build_jni_catalog_configs(java_catalog_props)?;
751 let catalog_impl = match catalog_type {
752 "hive" => "org.apache.iceberg.hive.HiveCatalog",
753 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
754 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
755 "rest" => "org.apache.iceberg.rest.RESTCatalog",
756 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
757 _ => unreachable!(),
758 };
759
760 jni_catalog::JniCatalog::build_catalog(
761 file_io_props,
762 self.catalog_name(),
763 catalog_impl,
764 java_catalog_props,
765 )
766 }
767 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
768 _ => {
769 bail!(
770 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
771 self.catalog_type()
772 )
773 }
774 }
775 }
776
777 pub async fn load_table(
779 &self,
780 table: &IcebergTableIdentifier,
781 java_catalog_props: &HashMap<String, String>,
782 ) -> ConnectorResult<Table> {
783 let catalog = self
784 .create_catalog(java_catalog_props)
785 .await
786 .context("Unable to load iceberg catalog")?;
787
788 let table_id = table
789 .to_table_ident()
790 .context("Unable to parse table name")?;
791
792 catalog.load_table(&table_id).await.map_err(Into::into)
793 }
794}