1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56 #[serde(rename = "catalog.type")]
59 pub catalog_type: Option<String>,
60 #[serde(rename = "s3.region")]
61 pub s3_region: Option<String>,
62 #[serde(rename = "s3.endpoint")]
63 pub s3_endpoint: Option<String>,
64 #[serde(rename = "s3.access.key")]
65 pub s3_access_key: Option<String>,
66 #[serde(rename = "s3.secret.key")]
67 pub s3_secret_key: Option<String>,
68 #[serde(rename = "s3.iam_role_arn")]
69 pub s3_iam_role_arn: Option<String>,
70
71 #[serde(rename = "glue.access.key")]
72 pub glue_access_key: Option<String>,
73 #[serde(rename = "glue.secret.key")]
74 pub glue_secret_key: Option<String>,
75 #[serde(rename = "glue.iam_role_arn")]
76 pub glue_iam_role_arn: Option<String>,
77 #[serde(rename = "glue.region")]
78 pub glue_region: Option<String>,
79 #[serde(rename = "glue.id")]
82 pub glue_id: Option<String>,
83
84 #[serde(rename = "gcs.credential")]
85 pub gcs_credential: Option<String>,
86
87 #[serde(rename = "azblob.account_name")]
88 pub azblob_account_name: Option<String>,
89 #[serde(rename = "azblob.account_key")]
90 pub azblob_account_key: Option<String>,
91 #[serde(rename = "azblob.endpoint_url")]
92 pub azblob_endpoint_url: Option<String>,
93
94 #[serde(rename = "adlsgen2.account_name")]
95 pub adlsgen2_account_name: Option<String>,
96 #[serde(rename = "adlsgen2.account_key")]
97 pub adlsgen2_account_key: Option<String>,
98 #[serde(rename = "adlsgen2.endpoint")]
99 pub adlsgen2_endpoint: Option<String>,
100
101 #[serde(rename = "warehouse.path")]
103 pub warehouse_path: Option<String>,
104 #[serde(rename = "catalog.name")]
106 pub catalog_name: Option<String>,
107 #[serde(rename = "catalog.uri")]
109 pub catalog_uri: Option<String>,
110 #[serde(rename = "catalog.credential")]
113 pub catalog_credential: Option<String>,
114 #[serde(rename = "catalog.token")]
117 pub catalog_token: Option<String>,
118 #[serde(rename = "catalog.oauth2_server_uri")]
121 pub catalog_oauth2_server_uri: Option<String>,
122 #[serde(rename = "catalog.scope")]
125 pub catalog_scope: Option<String>,
126
127 #[serde(rename = "catalog.rest.signing_region")]
129 pub rest_signing_region: Option<String>,
130
131 #[serde(rename = "catalog.rest.signing_name")]
133 pub rest_signing_name: Option<String>,
134
135 #[serde(
137 rename = "catalog.rest.sigv4_enabled",
138 default,
139 deserialize_with = "deserialize_optional_bool_from_string"
140 )]
141 pub rest_sigv4_enabled: Option<bool>,
142
143 #[serde(
144 rename = "s3.path.style.access",
145 default,
146 deserialize_with = "deserialize_optional_bool_from_string"
147 )]
148 pub s3_path_style_access: Option<bool>,
149 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151 pub enable_config_load: Option<bool>,
152
153 #[serde(
155 rename = "hosted_catalog",
156 default,
157 deserialize_with = "deserialize_optional_bool_from_string"
158 )]
159 pub hosted_catalog: Option<bool>,
160
161 #[serde(rename = "catalog.header")]
168 pub catalog_header: Option<String>,
169
170 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
172 pub vended_credentials: Option<bool>,
173}
174
175const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
178const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
179const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
180 SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
181
182impl EnforceSecret for IcebergCommon {
183 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
184 "s3.access.key",
185 "s3.secret.key",
186 "gcs.credential",
187 "catalog.credential",
188 "catalog.token",
189 "catalog.oauth2_server_uri",
190 "adlsgen2.account_key",
191 "adlsgen2.client_secret",
192 "glue.access.key",
193 "glue.secret.key",
194 };
195}
196
197#[serde_as]
198#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
199#[serde(deny_unknown_fields)]
200pub struct IcebergTableIdentifier {
201 #[serde(rename = "database.name")]
202 pub database_name: Option<String>,
203 #[serde(rename = "table.name")]
205 pub table_name: String,
206}
207
208impl IcebergTableIdentifier {
209 pub fn database_name(&self) -> Option<&str> {
210 self.database_name.as_deref()
211 }
212
213 pub fn table_name(&self) -> &str {
214 &self.table_name
215 }
216
217 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
218 let ret = if let Some(database_name) = &self.database_name {
219 TableIdent::from_strs(vec![database_name, &self.table_name])
220 } else {
221 TableIdent::from_strs(vec![&self.table_name])
222 };
223
224 Ok(ret.context("Failed to create table identifier")?)
225 }
226}
227
228impl IcebergCommon {
229 pub fn catalog_type(&self) -> &str {
230 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
231 if self.vended_credentials() && catalog_type == "rest" {
232 "rest_rust"
233 } else {
234 catalog_type
235 }
236 }
237
238 pub fn vended_credentials(&self) -> bool {
239 self.vended_credentials.unwrap_or(false)
240 }
241
242 fn glue_access_key(&self) -> Option<&str> {
243 self.glue_access_key
244 .as_deref()
245 .or(self.s3_access_key.as_deref())
246 }
247
248 fn glue_secret_key(&self) -> Option<&str> {
249 self.glue_secret_key
250 .as_deref()
251 .or(self.s3_secret_key.as_deref())
252 }
253
254 fn glue_region(&self) -> Option<&str> {
255 self.glue_region.as_deref().or(self.s3_region.as_deref())
256 }
257
258 pub fn catalog_name(&self) -> String {
259 self.catalog_name
260 .as_ref()
261 .cloned()
262 .unwrap_or_else(|| "risingwave".to_owned())
263 }
264
265 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
266 let mut headers = HashMap::new();
267 let user_agent = match Deployment::current() {
268 Deployment::Ci => "RisingWave(CI)".to_owned(),
269 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
270 Deployment::Other => "RisingWave(OSS)".to_owned(),
271 };
272 if self.vended_credentials() {
273 headers.insert(
274 "X-Iceberg-Access-Delegation".to_owned(),
275 "vended-credentials".to_owned(),
276 );
277 }
278 headers.insert("User-Agent".to_owned(), user_agent);
279 if let Some(header) = &self.catalog_header {
280 for pair in header.split(';') {
281 let mut parts = pair.split('=');
282 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
283 headers.insert(key.to_owned(), value.to_owned());
284 } else {
285 bail!("Invalid header format: {}", pair);
286 }
287 }
288 }
289 Ok(headers)
290 }
291
292 pub fn enable_config_load(&self) -> bool {
293 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
295 if matches!(self.enable_config_load, Some(true)) {
296 tracing::warn!(
297 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
298 );
299 }
300 return false;
301 }
302 self.enable_config_load.unwrap_or(false)
303 }
304
305 fn build_jni_catalog_configs(
307 &self,
308 java_catalog_props: &HashMap<String, String>,
309 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
310 let mut iceberg_configs = HashMap::new();
311 let enable_config_load = self.enable_config_load();
312 let file_io_props = {
313 let catalog_type = self.catalog_type().to_owned();
314
315 if let Some(region) = &self.s3_region {
316 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
318 }
319
320 if let Some(endpoint) = &self.s3_endpoint {
321 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
323 }
324
325 if let Some(access_key) = &self.s3_access_key {
327 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
328 }
329 if let Some(secret_key) = &self.s3_secret_key {
330 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
331 }
332 if let Some(role_arn) = &self.s3_iam_role_arn {
333 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
334 }
335 if let Some(gcs_credential) = &self.gcs_credential {
336 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
337 if catalog_type != "rest" && catalog_type != "rest_rust" {
338 bail!("gcs unsupported in {} catalog", &catalog_type);
339 }
340 }
341
342 if let (
343 Some(azblob_account_name),
344 Some(azblob_account_key),
345 Some(azblob_endpoint_url),
346 ) = (
347 &self.azblob_account_name,
348 &self.azblob_account_key,
349 &self.azblob_endpoint_url,
350 ) {
351 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
352 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
353 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
354
355 if catalog_type != "rest" && catalog_type != "rest_rust" {
356 bail!("azblob unsupported in {} catalog", &catalog_type);
357 }
358 }
359
360 if let (Some(account_name), Some(account_key)) = (
361 self.adlsgen2_account_name.as_ref(),
362 self.adlsgen2_account_key.as_ref(),
363 ) {
364 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
365 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
366 if catalog_type != "rest" && catalog_type != "rest_rust" {
367 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
368 }
369 }
370
371 match &self.warehouse_path {
372 Some(warehouse_path) => {
373 let (bucket, _) = {
374 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
375 let url = Url::parse(warehouse_path);
376 if (url.is_err() || is_s3_tables)
377 && (catalog_type == "rest" || catalog_type == "rest_rust")
378 {
379 (None, None)
383 } else {
384 let url = url.with_context(|| {
385 format!("Invalid warehouse path: {}", warehouse_path)
386 })?;
387 let bucket = url
388 .host_str()
389 .with_context(|| {
390 format!(
391 "Invalid s3 path: {}, bucket is missing",
392 warehouse_path
393 )
394 })?
395 .to_owned();
396 let root = url.path().trim_start_matches('/').to_owned();
397 (Some(bucket), Some(root))
398 }
399 };
400
401 if let Some(bucket) = bucket {
402 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
403 }
404 }
405 None => {
406 if catalog_type != "rest" && catalog_type != "rest_rust" {
407 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
408 }
409 }
410 }
411 iceberg_configs.insert(
412 S3_DISABLE_CONFIG_LOAD.to_owned(),
413 (!enable_config_load).to_string(),
414 );
415
416 iceberg_configs.insert(
417 GCS_DISABLE_CONFIG_LOAD.to_owned(),
418 (!enable_config_load).to_string(),
419 );
420
421 if let Some(path_style_access) = self.s3_path_style_access {
422 iceberg_configs.insert(
423 S3_PATH_STYLE_ACCESS.to_owned(),
424 path_style_access.to_string(),
425 );
426 }
427
428 iceberg_configs
429 };
430
431 let mut java_catalog_configs = HashMap::new();
433 {
434 if let Some(uri) = self.catalog_uri.as_deref() {
435 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
436 }
437
438 if let Some(warehouse_path) = &self.warehouse_path {
439 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
440 }
441 java_catalog_configs.extend(java_catalog_props.clone());
442
443 java_catalog_configs.insert(
445 "io-impl".to_owned(),
446 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
447 );
448
449 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
451
452 if let Some(region) = &self.s3_region {
453 java_catalog_configs.insert("client.region".to_owned(), region.clone());
454 }
455 if let Some(endpoint) = &self.s3_endpoint {
456 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
457 }
458
459 if let Some(access_key) = &self.s3_access_key {
460 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
461 }
462 if let Some(secret_key) = &self.s3_secret_key {
463 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
464 }
465
466 if let Some(path_style_access) = &self.s3_path_style_access {
467 java_catalog_configs.insert(
468 "s3.path-style-access".to_owned(),
469 path_style_access.to_string(),
470 );
471 }
472
473 let headers = self.headers()?;
474 for (header_name, header_value) in headers {
475 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
476 }
477
478 match self.catalog_type() {
479 "rest" => {
480 if let Some(credential) = &self.catalog_credential {
481 java_catalog_configs.insert("credential".to_owned(), credential.clone());
482 }
483 if let Some(token) = &self.catalog_token {
484 java_catalog_configs.insert("token".to_owned(), token.clone());
485 }
486 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
487 java_catalog_configs
488 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
489 }
490 if let Some(scope) = &self.catalog_scope {
491 java_catalog_configs.insert("scope".to_owned(), scope.clone());
492 }
493 if let Some(rest_signing_region) = &self.rest_signing_region {
494 java_catalog_configs.insert(
495 "rest.signing-region".to_owned(),
496 rest_signing_region.clone(),
497 );
498 }
499 if let Some(rest_signing_name) = &self.rest_signing_name {
500 java_catalog_configs
501 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
502 }
503 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
504 java_catalog_configs.insert(
505 "rest.sigv4-enabled".to_owned(),
506 rest_sigv4_enabled.to_string(),
507 );
508
509 if let Some(access_key) = &self.s3_access_key {
510 java_catalog_configs
511 .insert("rest.access-key-id".to_owned(), access_key.clone());
512 }
513
514 if let Some(secret_key) = &self.s3_secret_key {
515 java_catalog_configs
516 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
517 }
518 }
519 }
520 "glue" => {
521 let glue_access_key = self.glue_access_key();
522 let glue_secret_key = self.glue_secret_key();
523 let has_glue_credentials =
524 glue_access_key.is_some() && glue_secret_key.is_some();
525 let should_configure_glue_provider = !enable_config_load
526 || has_glue_credentials
527 || self.glue_iam_role_arn.is_some();
528
529 if should_configure_glue_provider {
530 java_catalog_configs.insert(
531 "client.credentials-provider".to_owned(),
532 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
533 );
534 if let Some(region) = self.glue_region() {
535 java_catalog_configs.insert(
536 "client.credentials-provider.glue.region".to_owned(),
537 region.to_owned(),
538 );
539 }
540 if let Some(access_key) = glue_access_key {
541 java_catalog_configs.insert(
542 "client.credentials-provider.glue.access-key-id".to_owned(),
543 access_key.to_owned(),
544 );
545 }
546 if let Some(secret_key) = glue_secret_key {
547 java_catalog_configs.insert(
548 "client.credentials-provider.glue.secret-access-key".to_owned(),
549 secret_key.to_owned(),
550 );
551 }
552 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
553 java_catalog_configs.insert(
554 "client.credentials-provider.glue.iam-role-arn".to_owned(),
555 role_arn.to_owned(),
556 );
557 }
558 if enable_config_load && !has_glue_credentials {
559 java_catalog_configs.insert(
560 "client.credentials-provider.glue.use-default-credential-chain"
561 .to_owned(),
562 "true".to_owned(),
563 );
564 }
565 }
566
567 if let Some(region) = self.glue_region() {
568 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
569 java_catalog_configs.insert(
570 "glue.endpoint".to_owned(),
571 format!("https://glue.{}.amazonaws.com", region),
572 );
573 }
574
575 if let Some(glue_id) = self.glue_id.as_deref() {
576 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
577 }
578 }
579 _ => {}
580 }
581 }
582
583 Ok((file_io_props, java_catalog_configs))
584 }
585}
586
587impl IcebergCommon {
588 pub async fn create_catalog(
590 &self,
591 java_catalog_props: &HashMap<String, String>,
592 ) -> ConnectorResult<Arc<dyn Catalog>> {
593 match self.catalog_type() {
594 "storage" => {
595 let warehouse = self
596 .warehouse_path
597 .clone()
598 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
599 let url = Url::parse(warehouse.as_ref())
600 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
601
602 let config = match url.scheme() {
603 "s3" | "s3a" => StorageCatalogConfig::S3(
604 storage_catalog::StorageCatalogS3Config::builder()
605 .warehouse(warehouse)
606 .access_key(self.s3_access_key.clone())
607 .secret_key(self.s3_secret_key.clone())
608 .region(self.s3_region.clone())
609 .endpoint(self.s3_endpoint.clone())
610 .path_style_access(self.s3_path_style_access)
611 .enable_config_load(Some(self.enable_config_load()))
612 .build(),
613 ),
614 "gs" | "gcs" => StorageCatalogConfig::Gcs(
615 storage_catalog::StorageCatalogGcsConfig::builder()
616 .warehouse(warehouse)
617 .credential(self.gcs_credential.clone())
618 .enable_config_load(Some(self.enable_config_load()))
619 .build(),
620 ),
621 "azblob" => StorageCatalogConfig::Azblob(
622 storage_catalog::StorageCatalogAzblobConfig::builder()
623 .warehouse(warehouse)
624 .account_name(self.azblob_account_name.clone())
625 .account_key(self.azblob_account_key.clone())
626 .endpoint(self.azblob_endpoint_url.clone())
627 .build(),
628 ),
629 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
630 };
631
632 let catalog = storage_catalog::StorageCatalog::new(config)?;
633 Ok(Arc::new(catalog))
634 }
635 "rest_rust" => {
636 let mut iceberg_configs = HashMap::new();
637
638 if let Some(gcs_credential) = &self.gcs_credential {
640 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
641 } else {
642 if let Some(region) = &self.s3_region {
643 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
644 }
645 if let Some(endpoint) = &self.s3_endpoint {
646 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
647 }
648 if let Some(access_key) = &self.s3_access_key {
649 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
650 }
651 if let Some(secret_key) = &self.s3_secret_key {
652 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
653 }
654 if let Some(path_style_access) = &self.s3_path_style_access {
655 iceberg_configs.insert(
656 S3_PATH_STYLE_ACCESS.to_owned(),
657 path_style_access.to_string(),
658 );
659 }
660 };
661
662 if let Some(credential) = &self.catalog_credential {
663 iceberg_configs.insert("credential".to_owned(), credential.clone());
664 }
665 if let Some(token) = &self.catalog_token {
666 iceberg_configs.insert("token".to_owned(), token.clone());
667 }
668 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
669 iceberg_configs
670 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
671 }
672 if let Some(scope) = &self.catalog_scope {
673 iceberg_configs.insert("scope".to_owned(), scope.clone());
674 }
675
676 let headers = self.headers()?;
677 for (header_name, header_value) in headers {
678 iceberg_configs.insert(format!("header.{}", header_name), header_value);
679 }
680
681 iceberg_configs.insert(
682 iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
683 self.catalog_uri
684 .clone()
685 .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
686 );
687 if let Some(warehouse_path) = &self.warehouse_path {
688 iceberg_configs.insert(
689 iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
690 warehouse_path.clone(),
691 );
692 }
693 let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
694 .load("rest", iceberg_configs)
695 .await
696 .map_err(|e| anyhow!(IcebergError::from(e)))?;
697 Ok(Arc::new(catalog))
698 }
699 "glue_rust" => {
700 let mut iceberg_configs = HashMap::new();
701 if let Some(region) = self.glue_region() {
703 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
704 }
705 if let Some(access_key) = self.glue_access_key() {
706 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
707 }
708 if let Some(secret_key) = self.glue_secret_key() {
709 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
710 }
711 if let Some(region) = &self.s3_region {
713 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
714 }
715 if let Some(endpoint) = &self.s3_endpoint {
716 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
717 }
718 if let Some(access_key) = &self.s3_access_key {
719 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
720 }
721 if let Some(secret_key) = &self.s3_secret_key {
722 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
723 }
724 if let Some(role_arn) = &self.s3_iam_role_arn {
725 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
726 }
727 if let Some(path_style_access) = &self.s3_path_style_access {
728 iceberg_configs.insert(
729 S3_PATH_STYLE_ACCESS.to_owned(),
730 path_style_access.to_string(),
731 );
732 }
733 iceberg_configs.insert(
734 iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
735 self.warehouse_path
736 .clone()
737 .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
738 );
739 if let Some(uri) = self.catalog_uri.as_deref() {
740 iceberg_configs.insert(
741 iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
742 uri.to_owned(),
743 );
744 }
745 let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
746 .load("glue", iceberg_configs)
747 .await
748 .map_err(|e| anyhow!(IcebergError::from(e)))?;
749 Ok(Arc::new(catalog))
750 }
751 catalog_type
752 if catalog_type == "hive"
753 || catalog_type == "snowflake"
754 || catalog_type == "jdbc"
755 || catalog_type == "rest"
756 || catalog_type == "glue" =>
757 {
758 let (file_io_props, java_catalog_props) =
760 self.build_jni_catalog_configs(java_catalog_props)?;
761 let catalog_impl = match catalog_type {
762 "hive" => "org.apache.iceberg.hive.HiveCatalog",
763 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
764 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
765 "rest" => "org.apache.iceberg.rest.RESTCatalog",
766 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
767 _ => unreachable!(),
768 };
769
770 jni_catalog::JniCatalog::build_catalog(
771 file_io_props,
772 self.catalog_name(),
773 catalog_impl,
774 java_catalog_props,
775 )
776 }
777 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
778 _ => {
779 bail!(
780 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
781 self.catalog_type()
782 )
783 }
784 }
785 }
786
787 pub async fn load_table(
789 &self,
790 table: &IcebergTableIdentifier,
791 java_catalog_props: &HashMap<String, String>,
792 ) -> ConnectorResult<Table> {
793 let catalog = self
794 .create_catalog(java_catalog_props)
795 .await
796 .context("Unable to load iceberg catalog")?;
797
798 let table_id = table
799 .to_table_ident()
800 .context("Unable to parse table name")?;
801
802 let table = catalog.load_table(&table_id).await?;
803 Ok(rebuild_table_with_shared_cache(table).await)
804 }
805}
806
807pub(crate) async fn shared_object_cache(
809 init_object_cache: Arc<ObjectCache>,
810 table_uuid: Uuid,
811) -> Arc<ObjectCache> {
812 static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
813 MokaCache::builder()
814 .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
815 .build()
816 });
817
818 CACHE
819 .get_with(table_uuid, async { init_object_cache })
820 .await
821}
822
823pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
824 let table_uuid = table.metadata().uuid();
825 let init_object_cache = table.object_cache();
826 let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
827 table.with_object_cache(object_cache)
828}