risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24    S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56    // Catalog type supported by iceberg, such as "storage", "rest".
57    // If not set, we use "storage" as default.
58    #[serde(rename = "catalog.type")]
59    pub catalog_type: Option<String>,
60    #[serde(rename = "s3.region")]
61    pub s3_region: Option<String>,
62    #[serde(rename = "s3.endpoint")]
63    pub s3_endpoint: Option<String>,
64    #[serde(rename = "s3.access.key")]
65    pub s3_access_key: Option<String>,
66    #[serde(rename = "s3.secret.key")]
67    pub s3_secret_key: Option<String>,
68    #[serde(rename = "s3.iam_role_arn")]
69    pub s3_iam_role_arn: Option<String>,
70
71    #[serde(rename = "glue.access.key")]
72    pub glue_access_key: Option<String>,
73    #[serde(rename = "glue.secret.key")]
74    pub glue_secret_key: Option<String>,
75    #[serde(rename = "glue.iam_role_arn")]
76    pub glue_iam_role_arn: Option<String>,
77    #[serde(rename = "glue.region")]
78    pub glue_region: Option<String>,
79    /// AWS Client id, can be omitted for storage catalog or when
80    /// caller's AWS account ID matches glue id
81    #[serde(rename = "glue.id")]
82    pub glue_id: Option<String>,
83
84    #[serde(rename = "gcs.credential")]
85    pub gcs_credential: Option<String>,
86
87    #[serde(rename = "azblob.account_name")]
88    pub azblob_account_name: Option<String>,
89    #[serde(rename = "azblob.account_key")]
90    pub azblob_account_key: Option<String>,
91    #[serde(rename = "azblob.endpoint_url")]
92    pub azblob_endpoint_url: Option<String>,
93
94    #[serde(rename = "adlsgen2.account_name")]
95    pub adlsgen2_account_name: Option<String>,
96    #[serde(rename = "adlsgen2.account_key")]
97    pub adlsgen2_account_key: Option<String>,
98    #[serde(rename = "adlsgen2.endpoint")]
99    pub adlsgen2_endpoint: Option<String>,
100
101    /// Path of iceberg warehouse.
102    #[serde(rename = "warehouse.path")]
103    pub warehouse_path: Option<String>,
104    /// Catalog name, default value is risingwave.
105    #[serde(rename = "catalog.name")]
106    pub catalog_name: Option<String>,
107    /// URI of iceberg catalog, only applicable in rest catalog.
108    #[serde(rename = "catalog.uri")]
109    pub catalog_uri: Option<String>,
110    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
111    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
112    #[serde(rename = "catalog.credential")]
113    pub catalog_credential: Option<String>,
114    /// token for accessing iceberg catalog, only applicable in rest catalog.
115    /// A Bearer token which will be used for interaction with the server.
116    #[serde(rename = "catalog.token")]
117    pub catalog_token: Option<String>,
118    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
119    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
120    #[serde(rename = "catalog.oauth2_server_uri")]
121    pub catalog_oauth2_server_uri: Option<String>,
122    /// scope for accessing iceberg catalog, only applicable in rest catalog.
123    /// Additional scope for `OAuth2`.
124    #[serde(rename = "catalog.scope")]
125    pub catalog_scope: Option<String>,
126
127    /// The signing region to use when signing requests to the REST catalog.
128    #[serde(rename = "catalog.rest.signing_region")]
129    pub rest_signing_region: Option<String>,
130
131    /// The signing name to use when signing requests to the REST catalog.
132    #[serde(rename = "catalog.rest.signing_name")]
133    pub rest_signing_name: Option<String>,
134
135    /// Whether to use `SigV4` for signing requests to the REST catalog.
136    #[serde(
137        rename = "catalog.rest.sigv4_enabled",
138        default,
139        deserialize_with = "deserialize_optional_bool_from_string"
140    )]
141    pub rest_sigv4_enabled: Option<bool>,
142
143    #[serde(
144        rename = "s3.path.style.access",
145        default,
146        deserialize_with = "deserialize_optional_bool_from_string"
147    )]
148    pub s3_path_style_access: Option<bool>,
149    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
150    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151    pub enable_config_load: Option<bool>,
152
153    /// This is only used by iceberg engine to enable the hosted catalog.
154    #[serde(
155        rename = "hosted_catalog",
156        default,
157        deserialize_with = "deserialize_optional_bool_from_string"
158    )]
159    pub hosted_catalog: Option<bool>,
160
161    /// The http header to be used in the catalog requests.
162    /// Example:
163    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
164    /// explain the format of the header:
165    /// - Each header is a key-value pair, separated by an '='.
166    /// - Multiple headers can be specified, separated by a ';'.
167    #[serde(rename = "catalog.header")]
168    pub catalog_header: Option<String>,
169
170    /// Enable vended credentials for iceberg REST catalog.
171    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
172    pub vended_credentials: Option<bool>,
173}
174
175// Matches iceberg::io::object_cache default size (32MB).
176// TODO: change it after object cache get refactored.
177const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
178const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
179const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
180    SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
181
182impl EnforceSecret for IcebergCommon {
183    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
184        "s3.access.key",
185        "s3.secret.key",
186        "gcs.credential",
187        "catalog.credential",
188        "catalog.token",
189        "catalog.oauth2_server_uri",
190        "adlsgen2.account_key",
191        "adlsgen2.client_secret",
192        "glue.access.key",
193        "glue.secret.key",
194    };
195}
196
197#[serde_as]
198#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
199#[serde(deny_unknown_fields)]
200pub struct IcebergTableIdentifier {
201    #[serde(rename = "database.name")]
202    pub database_name: Option<String>,
203    /// Full name of table, must include schema name when database is provided.
204    #[serde(rename = "table.name")]
205    pub table_name: String,
206}
207
208impl IcebergTableIdentifier {
209    pub fn database_name(&self) -> Option<&str> {
210        self.database_name.as_deref()
211    }
212
213    pub fn table_name(&self) -> &str {
214        &self.table_name
215    }
216
217    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
218        let ret = if let Some(database_name) = &self.database_name {
219            TableIdent::from_strs(vec![database_name, &self.table_name])
220        } else {
221            TableIdent::from_strs(vec![&self.table_name])
222        };
223
224        Ok(ret.context("Failed to create table identifier")?)
225    }
226}
227
228impl IcebergCommon {
229    pub fn catalog_type(&self) -> &str {
230        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
231        if self.vended_credentials() && catalog_type == "rest" {
232            "rest_rust"
233        } else {
234            catalog_type
235        }
236    }
237
238    pub fn vended_credentials(&self) -> bool {
239        self.vended_credentials.unwrap_or(false)
240    }
241
242    fn glue_access_key(&self) -> Option<&str> {
243        self.glue_access_key
244            .as_deref()
245            .or(self.s3_access_key.as_deref())
246    }
247
248    fn glue_secret_key(&self) -> Option<&str> {
249        self.glue_secret_key
250            .as_deref()
251            .or(self.s3_secret_key.as_deref())
252    }
253
254    fn glue_region(&self) -> Option<&str> {
255        self.glue_region.as_deref().or(self.s3_region.as_deref())
256    }
257
258    pub fn catalog_name(&self) -> String {
259        self.catalog_name
260            .as_ref()
261            .cloned()
262            .unwrap_or_else(|| "risingwave".to_owned())
263    }
264
265    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
266        let mut headers = HashMap::new();
267        let user_agent = match Deployment::current() {
268            Deployment::Ci => "RisingWave(CI)".to_owned(),
269            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
270            Deployment::Other => "RisingWave(OSS)".to_owned(),
271        };
272        if self.vended_credentials() {
273            headers.insert(
274                "X-Iceberg-Access-Delegation".to_owned(),
275                "vended-credentials".to_owned(),
276            );
277        }
278        headers.insert("User-Agent".to_owned(), user_agent);
279        if let Some(header) = &self.catalog_header {
280            for pair in header.split(';') {
281                let mut parts = pair.split('=');
282                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
283                    headers.insert(key.to_owned(), value.to_owned());
284                } else {
285                    bail!("Invalid header format: {}", pair);
286                }
287            }
288        }
289        Ok(headers)
290    }
291
292    pub fn enable_config_load(&self) -> bool {
293        // If the env var is set to true, we disable the default config load. (Cloud environment)
294        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
295            if matches!(self.enable_config_load, Some(true)) {
296                tracing::warn!(
297                    "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
298                );
299            }
300            return false;
301        }
302        self.enable_config_load.unwrap_or(false)
303    }
304
305    /// For both V1 and V2.
306    fn build_jni_catalog_configs(
307        &self,
308        java_catalog_props: &HashMap<String, String>,
309    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
310        let mut iceberg_configs = HashMap::new();
311        let enable_config_load = self.enable_config_load();
312        let file_io_props = {
313            let catalog_type = self.catalog_type().to_owned();
314
315            if let Some(region) = &self.s3_region {
316                // iceberg-rust
317                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
318            }
319
320            if let Some(endpoint) = &self.s3_endpoint {
321                // iceberg-rust
322                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
323            }
324
325            // iceberg-rust
326            if let Some(access_key) = &self.s3_access_key {
327                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
328            }
329            if let Some(secret_key) = &self.s3_secret_key {
330                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
331            }
332            if let Some(role_arn) = &self.s3_iam_role_arn {
333                iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
334            }
335            if let Some(gcs_credential) = &self.gcs_credential {
336                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
337                if catalog_type != "rest" && catalog_type != "rest_rust" {
338                    bail!("gcs unsupported in {} catalog", &catalog_type);
339                }
340            }
341
342            if let (
343                Some(azblob_account_name),
344                Some(azblob_account_key),
345                Some(azblob_endpoint_url),
346            ) = (
347                &self.azblob_account_name,
348                &self.azblob_account_key,
349                &self.azblob_endpoint_url,
350            ) {
351                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
352                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
353                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
354
355                if catalog_type != "rest" && catalog_type != "rest_rust" {
356                    bail!("azblob unsupported in {} catalog", &catalog_type);
357                }
358            }
359
360            if let (Some(account_name), Some(account_key)) = (
361                self.adlsgen2_account_name.as_ref(),
362                self.adlsgen2_account_key.as_ref(),
363            ) {
364                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
365                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
366                if catalog_type != "rest" && catalog_type != "rest_rust" {
367                    bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
368                }
369            }
370
371            match &self.warehouse_path {
372                Some(warehouse_path) => {
373                    let (bucket, _) = {
374                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
375                        let url = Url::parse(warehouse_path);
376                        if (url.is_err() || is_s3_tables)
377                            && (catalog_type == "rest" || catalog_type == "rest_rust")
378                        {
379                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
380                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
381                            // so we allow it to pass here.
382                            (None, None)
383                        } else {
384                            let url = url.with_context(|| {
385                                format!("Invalid warehouse path: {}", warehouse_path)
386                            })?;
387                            let bucket = url
388                                .host_str()
389                                .with_context(|| {
390                                    format!(
391                                        "Invalid s3 path: {}, bucket is missing",
392                                        warehouse_path
393                                    )
394                                })?
395                                .to_owned();
396                            let root = url.path().trim_start_matches('/').to_owned();
397                            (Some(bucket), Some(root))
398                        }
399                    };
400
401                    if let Some(bucket) = bucket {
402                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
403                    }
404                }
405                None => {
406                    if catalog_type != "rest" && catalog_type != "rest_rust" {
407                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
408                    }
409                }
410            }
411            iceberg_configs.insert(
412                S3_DISABLE_CONFIG_LOAD.to_owned(),
413                (!enable_config_load).to_string(),
414            );
415
416            iceberg_configs.insert(
417                GCS_DISABLE_CONFIG_LOAD.to_owned(),
418                (!enable_config_load).to_string(),
419            );
420
421            if let Some(path_style_access) = self.s3_path_style_access {
422                iceberg_configs.insert(
423                    S3_PATH_STYLE_ACCESS.to_owned(),
424                    path_style_access.to_string(),
425                );
426            }
427
428            iceberg_configs
429        };
430
431        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
432        let mut java_catalog_configs = HashMap::new();
433        {
434            if let Some(uri) = self.catalog_uri.as_deref() {
435                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
436            }
437
438            if let Some(warehouse_path) = &self.warehouse_path {
439                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
440            }
441            java_catalog_configs.extend(java_catalog_props.clone());
442
443            // Currently we only support s3, so let's set it to s3
444            java_catalog_configs.insert(
445                "io-impl".to_owned(),
446                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
447            );
448
449            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
450            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
451
452            if let Some(region) = &self.s3_region {
453                java_catalog_configs.insert("client.region".to_owned(), region.clone());
454            }
455            if let Some(endpoint) = &self.s3_endpoint {
456                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
457            }
458
459            if let Some(access_key) = &self.s3_access_key {
460                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
461            }
462            if let Some(secret_key) = &self.s3_secret_key {
463                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
464            }
465
466            if let Some(path_style_access) = &self.s3_path_style_access {
467                java_catalog_configs.insert(
468                    "s3.path-style-access".to_owned(),
469                    path_style_access.to_string(),
470                );
471            }
472
473            let headers = self.headers()?;
474            for (header_name, header_value) in headers {
475                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
476            }
477
478            match self.catalog_type() {
479                "rest" => {
480                    if let Some(credential) = &self.catalog_credential {
481                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
482                    }
483                    if let Some(token) = &self.catalog_token {
484                        java_catalog_configs.insert("token".to_owned(), token.clone());
485                    }
486                    if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
487                        java_catalog_configs
488                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
489                    }
490                    if let Some(scope) = &self.catalog_scope {
491                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
492                    }
493                    if let Some(rest_signing_region) = &self.rest_signing_region {
494                        java_catalog_configs.insert(
495                            "rest.signing-region".to_owned(),
496                            rest_signing_region.clone(),
497                        );
498                    }
499                    if let Some(rest_signing_name) = &self.rest_signing_name {
500                        java_catalog_configs
501                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
502                    }
503                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
504                        java_catalog_configs.insert(
505                            "rest.sigv4-enabled".to_owned(),
506                            rest_sigv4_enabled.to_string(),
507                        );
508
509                        if let Some(access_key) = &self.s3_access_key {
510                            java_catalog_configs
511                                .insert("rest.access-key-id".to_owned(), access_key.clone());
512                        }
513
514                        if let Some(secret_key) = &self.s3_secret_key {
515                            java_catalog_configs
516                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
517                        }
518                    }
519                }
520                "glue" => {
521                    let glue_access_key = self.glue_access_key();
522                    let glue_secret_key = self.glue_secret_key();
523                    let has_glue_credentials =
524                        glue_access_key.is_some() && glue_secret_key.is_some();
525                    let should_configure_glue_provider = !enable_config_load
526                        || has_glue_credentials
527                        || self.glue_iam_role_arn.is_some();
528
529                    if should_configure_glue_provider {
530                        java_catalog_configs.insert(
531                            "client.credentials-provider".to_owned(),
532                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
533                        );
534                        if let Some(region) = self.glue_region() {
535                            java_catalog_configs.insert(
536                                "client.credentials-provider.glue.region".to_owned(),
537                                region.to_owned(),
538                            );
539                        }
540                        if let Some(access_key) = glue_access_key {
541                            java_catalog_configs.insert(
542                                "client.credentials-provider.glue.access-key-id".to_owned(),
543                                access_key.to_owned(),
544                            );
545                        }
546                        if let Some(secret_key) = glue_secret_key {
547                            java_catalog_configs.insert(
548                                "client.credentials-provider.glue.secret-access-key".to_owned(),
549                                secret_key.to_owned(),
550                            );
551                        }
552                        if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
553                            java_catalog_configs.insert(
554                                "client.credentials-provider.glue.iam-role-arn".to_owned(),
555                                role_arn.to_owned(),
556                            );
557                        }
558                        if enable_config_load && !has_glue_credentials {
559                            java_catalog_configs.insert(
560                                "client.credentials-provider.glue.use-default-credential-chain"
561                                    .to_owned(),
562                                "true".to_owned(),
563                            );
564                        }
565                    }
566
567                    if let Some(region) = self.glue_region() {
568                        java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
569                        java_catalog_configs.insert(
570                            "glue.endpoint".to_owned(),
571                            format!("https://glue.{}.amazonaws.com", region),
572                        );
573                    }
574
575                    if let Some(glue_id) = self.glue_id.as_deref() {
576                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
577                    }
578                }
579                _ => {}
580            }
581        }
582
583        Ok((file_io_props, java_catalog_configs))
584    }
585}
586
587impl IcebergCommon {
588    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
589    pub async fn create_catalog(
590        &self,
591        java_catalog_props: &HashMap<String, String>,
592    ) -> ConnectorResult<Arc<dyn Catalog>> {
593        match self.catalog_type() {
594            "storage" => {
595                let warehouse = self
596                    .warehouse_path
597                    .clone()
598                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
599                let url = Url::parse(warehouse.as_ref())
600                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
601
602                let config = match url.scheme() {
603                    "s3" | "s3a" => StorageCatalogConfig::S3(
604                        storage_catalog::StorageCatalogS3Config::builder()
605                            .warehouse(warehouse)
606                            .access_key(self.s3_access_key.clone())
607                            .secret_key(self.s3_secret_key.clone())
608                            .region(self.s3_region.clone())
609                            .endpoint(self.s3_endpoint.clone())
610                            .path_style_access(self.s3_path_style_access)
611                            .enable_config_load(Some(self.enable_config_load()))
612                            .build(),
613                    ),
614                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
615                        storage_catalog::StorageCatalogGcsConfig::builder()
616                            .warehouse(warehouse)
617                            .credential(self.gcs_credential.clone())
618                            .enable_config_load(Some(self.enable_config_load()))
619                            .build(),
620                    ),
621                    "azblob" => StorageCatalogConfig::Azblob(
622                        storage_catalog::StorageCatalogAzblobConfig::builder()
623                            .warehouse(warehouse)
624                            .account_name(self.azblob_account_name.clone())
625                            .account_key(self.azblob_account_key.clone())
626                            .endpoint(self.azblob_endpoint_url.clone())
627                            .build(),
628                    ),
629                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
630                };
631
632                let catalog = storage_catalog::StorageCatalog::new(config)?;
633                Ok(Arc::new(catalog))
634            }
635            "rest_rust" => {
636                let mut iceberg_configs = HashMap::new();
637
638                // check gcs credential or s3 access key and secret key
639                if let Some(gcs_credential) = &self.gcs_credential {
640                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
641                } else {
642                    if let Some(region) = &self.s3_region {
643                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
644                    }
645                    if let Some(endpoint) = &self.s3_endpoint {
646                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
647                    }
648                    if let Some(access_key) = &self.s3_access_key {
649                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
650                    }
651                    if let Some(secret_key) = &self.s3_secret_key {
652                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
653                    }
654                    if let Some(path_style_access) = &self.s3_path_style_access {
655                        iceberg_configs.insert(
656                            S3_PATH_STYLE_ACCESS.to_owned(),
657                            path_style_access.to_string(),
658                        );
659                    }
660                };
661
662                if let Some(credential) = &self.catalog_credential {
663                    iceberg_configs.insert("credential".to_owned(), credential.clone());
664                }
665                if let Some(token) = &self.catalog_token {
666                    iceberg_configs.insert("token".to_owned(), token.clone());
667                }
668                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
669                    iceberg_configs
670                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
671                }
672                if let Some(scope) = &self.catalog_scope {
673                    iceberg_configs.insert("scope".to_owned(), scope.clone());
674                }
675
676                let headers = self.headers()?;
677                for (header_name, header_value) in headers {
678                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
679                }
680
681                iceberg_configs.insert(
682                    iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
683                    self.catalog_uri
684                        .clone()
685                        .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
686                );
687                if let Some(warehouse_path) = &self.warehouse_path {
688                    iceberg_configs.insert(
689                        iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
690                        warehouse_path.clone(),
691                    );
692                }
693                let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
694                    .load("rest", iceberg_configs)
695                    .await
696                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
697                Ok(Arc::new(catalog))
698            }
699            "glue_rust" => {
700                let mut iceberg_configs = HashMap::new();
701                // glue
702                if let Some(region) = self.glue_region() {
703                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
704                }
705                if let Some(access_key) = self.glue_access_key() {
706                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
707                }
708                if let Some(secret_key) = self.glue_secret_key() {
709                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
710                }
711                // s3
712                if let Some(region) = &self.s3_region {
713                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
714                }
715                if let Some(endpoint) = &self.s3_endpoint {
716                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
717                }
718                if let Some(access_key) = &self.s3_access_key {
719                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
720                }
721                if let Some(secret_key) = &self.s3_secret_key {
722                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
723                }
724                if let Some(role_arn) = &self.s3_iam_role_arn {
725                    iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
726                }
727                if let Some(path_style_access) = &self.s3_path_style_access {
728                    iceberg_configs.insert(
729                        S3_PATH_STYLE_ACCESS.to_owned(),
730                        path_style_access.to_string(),
731                    );
732                }
733                iceberg_configs.insert(
734                    iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
735                    self.warehouse_path
736                        .clone()
737                        .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
738                );
739                if let Some(uri) = self.catalog_uri.as_deref() {
740                    iceberg_configs.insert(
741                        iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
742                        uri.to_owned(),
743                    );
744                }
745                let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
746                    .load("glue", iceberg_configs)
747                    .await
748                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
749                Ok(Arc::new(catalog))
750            }
751            catalog_type
752                if catalog_type == "hive"
753                    || catalog_type == "snowflake"
754                    || catalog_type == "jdbc"
755                    || catalog_type == "rest"
756                    || catalog_type == "glue" =>
757            {
758                // Create java catalog
759                let (file_io_props, java_catalog_props) =
760                    self.build_jni_catalog_configs(java_catalog_props)?;
761                let catalog_impl = match catalog_type {
762                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
763                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
764                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
765                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
766                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
767                    _ => unreachable!(),
768                };
769
770                jni_catalog::JniCatalog::build_catalog(
771                    file_io_props,
772                    self.catalog_name(),
773                    catalog_impl,
774                    java_catalog_props,
775                )
776            }
777            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
778            _ => {
779                bail!(
780                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
781                    self.catalog_type()
782                )
783            }
784        }
785    }
786
787    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
788    pub async fn load_table(
789        &self,
790        table: &IcebergTableIdentifier,
791        java_catalog_props: &HashMap<String, String>,
792    ) -> ConnectorResult<Table> {
793        let catalog = self
794            .create_catalog(java_catalog_props)
795            .await
796            .context("Unable to load iceberg catalog")?;
797
798        let table_id = table
799            .to_table_ident()
800            .context("Unable to parse table name")?;
801
802        let table = catalog.load_table(&table_id).await?;
803        Ok(rebuild_table_with_shared_cache(table).await)
804    }
805}
806
807/// Get a globally shared object cache keyed by table UUID to avoid reuse after drop & recreate.
808pub(crate) async fn shared_object_cache(
809    init_object_cache: Arc<ObjectCache>,
810    table_uuid: Uuid,
811) -> Arc<ObjectCache> {
812    static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
813        MokaCache::builder()
814            .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
815            .build()
816    });
817
818    CACHE
819        .get_with(table_uuid, async { init_object_cache })
820        .await
821}
822
823pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
824    let table_uuid = table.metadata().uuid();
825    let init_object_cache = table.object_cache();
826    let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
827    table.with_object_cache(object_cache)
828}