risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
24use ::iceberg::table::Table;
25use ::iceberg::{Catalog, TableIdent};
26use anyhow::{Context, anyhow};
27use iceberg::io::{
28    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
29    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
30};
31use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
32use phf::{Set, phf_set};
33use risingwave_common::bail;
34use risingwave_common::util::deployment::Deployment;
35use risingwave_common::util::env_var::env_var_is_true;
36use serde::Deserialize;
37use serde_with::serde_as;
38use url::Url;
39use with_options::WithOptions;
40
41use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
42use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
43use crate::deserialize_optional_bool_from_string;
44use crate::enforce_secret::EnforceSecret;
45use crate::error::ConnectorResult;
46
47#[serde_as]
48#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
49pub struct IcebergCommon {
50    // Catalog type supported by iceberg, such as "storage", "rest".
51    // If not set, we use "storage" as default.
52    #[serde(rename = "catalog.type")]
53    pub catalog_type: Option<String>,
54    #[serde(rename = "s3.region")]
55    pub region: Option<String>,
56    #[serde(rename = "s3.endpoint")]
57    pub endpoint: Option<String>,
58    #[serde(rename = "s3.access.key")]
59    pub access_key: Option<String>,
60    #[serde(rename = "s3.secret.key")]
61    pub secret_key: Option<String>,
62
63    #[serde(rename = "gcs.credential")]
64    pub gcs_credential: Option<String>,
65
66    #[serde(rename = "azblob.account_name")]
67    pub azblob_account_name: Option<String>,
68    #[serde(rename = "azblob.account_key")]
69    pub azblob_account_key: Option<String>,
70    #[serde(rename = "azblob.endpoint_url")]
71    pub azblob_endpoint_url: Option<String>,
72
73    #[serde(rename = "adlsgen2.account_name")]
74    pub adlsgen2_account_name: Option<String>,
75    #[serde(rename = "adlsgen2.account_key")]
76    pub adlsgen2_account_key: Option<String>,
77    #[serde(rename = "adlsgen2.endpoint")]
78    pub adlsgen2_endpoint: Option<String>,
79
80    /// Path of iceberg warehouse.
81    #[serde(rename = "warehouse.path")]
82    pub warehouse_path: Option<String>,
83    /// AWS Client id, can be omitted for storage catalog or when
84    /// caller's AWS account ID matches glue id
85    #[serde(rename = "glue.id")]
86    pub glue_id: Option<String>,
87    /// Catalog name, default value is risingwave.
88    #[serde(rename = "catalog.name")]
89    pub catalog_name: Option<String>,
90    /// URI of iceberg catalog, only applicable in rest catalog.
91    #[serde(rename = "catalog.uri")]
92    pub catalog_uri: Option<String>,
93    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
94    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
95    #[serde(rename = "catalog.credential")]
96    pub credential: Option<String>,
97    /// token for accessing iceberg catalog, only applicable in rest catalog.
98    /// A Bearer token which will be used for interaction with the server.
99    #[serde(rename = "catalog.token")]
100    pub token: Option<String>,
101    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
102    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
103    #[serde(rename = "catalog.oauth2_server_uri")]
104    pub oauth2_server_uri: Option<String>,
105    /// scope for accessing iceberg catalog, only applicable in rest catalog.
106    /// Additional scope for `OAuth2`.
107    #[serde(rename = "catalog.scope")]
108    pub scope: Option<String>,
109
110    /// The signing region to use when signing requests to the REST catalog.
111    #[serde(rename = "catalog.rest.signing_region")]
112    pub rest_signing_region: Option<String>,
113
114    /// The signing name to use when signing requests to the REST catalog.
115    #[serde(rename = "catalog.rest.signing_name")]
116    pub rest_signing_name: Option<String>,
117
118    /// Whether to use `SigV4` for signing requests to the REST catalog.
119    #[serde(
120        rename = "catalog.rest.sigv4_enabled",
121        default,
122        deserialize_with = "deserialize_optional_bool_from_string"
123    )]
124    pub rest_sigv4_enabled: Option<bool>,
125
126    #[serde(
127        rename = "s3.path.style.access",
128        default,
129        deserialize_with = "deserialize_optional_bool_from_string"
130    )]
131    pub path_style_access: Option<bool>,
132    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
133    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
134    pub enable_config_load: Option<bool>,
135
136    /// This is only used by iceberg engine to enable the hosted catalog.
137    #[serde(
138        rename = "hosted_catalog",
139        default,
140        deserialize_with = "deserialize_optional_bool_from_string"
141    )]
142    pub hosted_catalog: Option<bool>,
143
144    /// The http header to be used in the catalog requests.
145    /// Example:
146    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
147    /// explain the format of the header:
148    /// - Each header is a key-value pair, separated by an '='.
149    /// - Multiple headers can be specified, separated by a ';'.
150    #[serde(rename = "catalog.header")]
151    pub header: Option<String>,
152
153    /// Enable vended credentials for iceberg REST catalog.
154    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
155    pub vended_credentials: Option<bool>,
156}
157
158impl EnforceSecret for IcebergCommon {
159    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
160        "s3.access.key",
161        "s3.secret.key",
162        "gcs.credential",
163        "catalog.credential",
164        "catalog.token",
165        "catalog.oauth2_server_uri",
166        "adlsgen2.account_key",
167        "adlsgen2.client_secret",
168    };
169}
170
171#[serde_as]
172#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
173#[serde(deny_unknown_fields)]
174pub struct IcebergTableIdentifier {
175    #[serde(rename = "database.name")]
176    pub database_name: Option<String>,
177    /// Full name of table, must include schema name when database is provided.
178    #[serde(rename = "table.name")]
179    pub table_name: String,
180}
181
182impl IcebergTableIdentifier {
183    pub fn database_name(&self) -> Option<&str> {
184        self.database_name.as_deref()
185    }
186
187    pub fn table_name(&self) -> &str {
188        &self.table_name
189    }
190
191    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
192        let ret = if let Some(database_name) = &self.database_name {
193            TableIdent::from_strs(vec![database_name, &self.table_name])
194        } else {
195            TableIdent::from_strs(vec![&self.table_name])
196        };
197
198        Ok(ret.context("Failed to create table identifier")?)
199    }
200}
201
202impl IcebergCommon {
203    pub fn catalog_type(&self) -> &str {
204        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
205        if self.vended_credentials() && catalog_type == "rest" {
206            "rest_rust"
207        } else {
208            catalog_type
209        }
210    }
211
212    pub fn vended_credentials(&self) -> bool {
213        self.vended_credentials.unwrap_or(false)
214    }
215
216    pub fn catalog_name(&self) -> String {
217        self.catalog_name
218            .as_ref()
219            .cloned()
220            .unwrap_or_else(|| "risingwave".to_owned())
221    }
222
223    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
224        let mut headers = HashMap::new();
225        let user_agent = match Deployment::current() {
226            Deployment::Ci => "RisingWave(CI)".to_owned(),
227            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
228            Deployment::Other => "RisingWave(OSS)".to_owned(),
229        };
230        if self.vended_credentials() {
231            headers.insert(
232                "X-Iceberg-Access-Delegation".to_owned(),
233                "vended-credentials".to_owned(),
234            );
235        }
236        headers.insert("User-Agent".to_owned(), user_agent);
237        if let Some(header) = &self.header {
238            for pair in header.split(';') {
239                let mut parts = pair.split('=');
240                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
241                    headers.insert(key.to_owned(), value.to_owned());
242                } else {
243                    bail!("Invalid header format: {}", pair);
244                }
245            }
246        }
247        Ok(headers)
248    }
249
250    pub fn enable_config_load(&self) -> bool {
251        // If the env var is set to true, we disable the default config load. (Cloud environment)
252        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
253            return false;
254        }
255        self.enable_config_load.unwrap_or(false)
256    }
257
258    /// For both V1 and V2.
259    fn build_jni_catalog_configs(
260        &self,
261        java_catalog_props: &HashMap<String, String>,
262    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
263        let mut iceberg_configs = HashMap::new();
264        let enable_config_load = self.enable_config_load();
265        let file_io_props = {
266            let catalog_type = self.catalog_type().to_owned();
267
268            if let Some(region) = &self.region {
269                // iceberg-rust
270                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
271            }
272
273            if let Some(endpoint) = &self.endpoint {
274                // iceberg-rust
275                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
276            }
277
278            // iceberg-rust
279            if let Some(access_key) = &self.access_key {
280                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
281            }
282            if let Some(secret_key) = &self.secret_key {
283                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
284            }
285            if let Some(gcs_credential) = &self.gcs_credential {
286                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
287                if catalog_type != "rest" && catalog_type != "rest_rust" {
288                    bail!("gcs unsupported in {} catalog", &catalog_type);
289                }
290            }
291
292            if let (
293                Some(azblob_account_name),
294                Some(azblob_account_key),
295                Some(azblob_endpoint_url),
296            ) = (
297                &self.azblob_account_name,
298                &self.azblob_account_key,
299                &self.azblob_endpoint_url,
300            ) {
301                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
302                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
303                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
304
305                if catalog_type != "rest" && catalog_type != "rest_rust" {
306                    bail!("azblob unsupported in {} catalog", &catalog_type);
307                }
308            }
309
310            if let (Some(account_name), Some(account_key)) = (
311                self.adlsgen2_account_name.as_ref(),
312                self.adlsgen2_account_key.as_ref(),
313            ) {
314                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
315                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
316                if catalog_type != "rest" && catalog_type != "rest_rust" {
317                    bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
318                }
319            }
320
321            match &self.warehouse_path {
322                Some(warehouse_path) => {
323                    let (bucket, _) = {
324                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
325                        let url = Url::parse(warehouse_path);
326                        if (url.is_err() || is_s3_tables)
327                            && (catalog_type == "rest" || catalog_type == "rest_rust")
328                        {
329                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
330                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
331                            // so we allow it to pass here.
332                            (None, None)
333                        } else {
334                            let url = url.with_context(|| {
335                                format!("Invalid warehouse path: {}", warehouse_path)
336                            })?;
337                            let bucket = url
338                                .host_str()
339                                .with_context(|| {
340                                    format!(
341                                        "Invalid s3 path: {}, bucket is missing",
342                                        warehouse_path
343                                    )
344                                })?
345                                .to_owned();
346                            let root = url.path().trim_start_matches('/').to_owned();
347                            (Some(bucket), Some(root))
348                        }
349                    };
350
351                    if let Some(bucket) = bucket {
352                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
353                    }
354                }
355                None => {
356                    if catalog_type != "rest" && catalog_type != "rest_rust" {
357                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
358                    }
359                }
360            }
361            iceberg_configs.insert(
362                S3_DISABLE_CONFIG_LOAD.to_owned(),
363                (!enable_config_load).to_string(),
364            );
365
366            iceberg_configs.insert(
367                GCS_DISABLE_CONFIG_LOAD.to_owned(),
368                (!enable_config_load).to_string(),
369            );
370
371            if let Some(path_style_access) = self.path_style_access {
372                iceberg_configs.insert(
373                    S3_PATH_STYLE_ACCESS.to_owned(),
374                    path_style_access.to_string(),
375                );
376            }
377
378            iceberg_configs
379        };
380
381        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
382        let mut java_catalog_configs = HashMap::new();
383        {
384            if let Some(uri) = self.catalog_uri.as_deref() {
385                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
386            }
387
388            if let Some(warehouse_path) = &self.warehouse_path {
389                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
390            }
391            java_catalog_configs.extend(java_catalog_props.clone());
392
393            // Currently we only support s3, so let's set it to s3
394            java_catalog_configs.insert(
395                "io-impl".to_owned(),
396                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
397            );
398
399            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
400            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
401
402            if let Some(region) = &self.region {
403                java_catalog_configs.insert("client.region".to_owned(), region.clone());
404            }
405            if let Some(endpoint) = &self.endpoint {
406                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
407            }
408
409            if let Some(access_key) = &self.access_key {
410                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
411            }
412            if let Some(secret_key) = &self.secret_key {
413                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
414            }
415
416            if let Some(path_style_access) = &self.path_style_access {
417                java_catalog_configs.insert(
418                    "s3.path-style-access".to_owned(),
419                    path_style_access.to_string(),
420                );
421            }
422
423            let headers = self.headers()?;
424            for (header_name, header_value) in headers {
425                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
426            }
427
428            match self.catalog_type() {
429                "rest" => {
430                    if let Some(credential) = &self.credential {
431                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
432                    }
433                    if let Some(token) = &self.token {
434                        java_catalog_configs.insert("token".to_owned(), token.clone());
435                    }
436                    if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
437                        java_catalog_configs
438                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
439                    }
440                    if let Some(scope) = &self.scope {
441                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
442                    }
443                    if let Some(rest_signing_region) = &self.rest_signing_region {
444                        java_catalog_configs.insert(
445                            "rest.signing-region".to_owned(),
446                            rest_signing_region.clone(),
447                        );
448                    }
449                    if let Some(rest_signing_name) = &self.rest_signing_name {
450                        java_catalog_configs
451                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
452                    }
453                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
454                        java_catalog_configs.insert(
455                            "rest.sigv4-enabled".to_owned(),
456                            rest_sigv4_enabled.to_string(),
457                        );
458
459                        if let Some(access_key) = &self.access_key {
460                            java_catalog_configs
461                                .insert("rest.access-key-id".to_owned(), access_key.clone());
462                        }
463
464                        if let Some(secret_key) = &self.secret_key {
465                            java_catalog_configs
466                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
467                        }
468                    }
469                }
470                "glue" => {
471                    if !enable_config_load {
472                        java_catalog_configs.insert(
473                            "client.credentials-provider".to_owned(),
474                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
475                        );
476                        // Use S3 ak/sk and region as glue ak/sk and region by default.
477                        // TODO: use different ak/sk and region for s3 and glue.
478                        if let Some(access_key) = &self.access_key {
479                            java_catalog_configs.insert(
480                                "client.credentials-provider.glue.access-key-id".to_owned(),
481                                access_key.clone(),
482                            );
483                        }
484                        if let Some(secret_key) = &self.secret_key {
485                            java_catalog_configs.insert(
486                                "client.credentials-provider.glue.secret-access-key".to_owned(),
487                                secret_key.clone(),
488                            );
489                        }
490                    }
491
492                    if let Some(region) = &self.region {
493                        java_catalog_configs.insert("client.region".to_owned(), region.clone());
494                        java_catalog_configs.insert(
495                            "glue.endpoint".to_owned(),
496                            format!("https://glue.{}.amazonaws.com", region),
497                        );
498                    }
499
500                    if let Some(glue_id) = self.glue_id.as_deref() {
501                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
502                    }
503                }
504                _ => {}
505            }
506        }
507
508        Ok((file_io_props, java_catalog_configs))
509    }
510}
511
512impl IcebergCommon {
513    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
514    pub async fn create_catalog(
515        &self,
516        java_catalog_props: &HashMap<String, String>,
517    ) -> ConnectorResult<Arc<dyn Catalog>> {
518        match self.catalog_type() {
519            "storage" => {
520                let warehouse = self
521                    .warehouse_path
522                    .clone()
523                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
524                let url = Url::parse(warehouse.as_ref())
525                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
526
527                let config = match url.scheme() {
528                    "s3" | "s3a" => StorageCatalogConfig::S3(
529                        storage_catalog::StorageCatalogS3Config::builder()
530                            .warehouse(warehouse)
531                            .access_key(self.access_key.clone())
532                            .secret_key(self.secret_key.clone())
533                            .region(self.region.clone())
534                            .endpoint(self.endpoint.clone())
535                            .path_style_access(self.path_style_access)
536                            .enable_config_load(Some(self.enable_config_load()))
537                            .build(),
538                    ),
539                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
540                        storage_catalog::StorageCatalogGcsConfig::builder()
541                            .warehouse(warehouse)
542                            .credential(self.gcs_credential.clone())
543                            .enable_config_load(Some(self.enable_config_load()))
544                            .build(),
545                    ),
546                    "azblob" => StorageCatalogConfig::Azblob(
547                        storage_catalog::StorageCatalogAzblobConfig::builder()
548                            .warehouse(warehouse)
549                            .account_name(self.azblob_account_name.clone())
550                            .account_key(self.azblob_account_key.clone())
551                            .endpoint(self.azblob_endpoint_url.clone())
552                            .build(),
553                    ),
554                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
555                };
556
557                let catalog = storage_catalog::StorageCatalog::new(config)?;
558                Ok(Arc::new(catalog))
559            }
560            "rest_rust" => {
561                let mut iceberg_configs = HashMap::new();
562
563                // check gcs credential or s3 access key and secret key
564                if let Some(gcs_credential) = &self.gcs_credential {
565                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
566                } else {
567                    if let Some(region) = &self.region {
568                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
569                    }
570                    if let Some(endpoint) = &self.endpoint {
571                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
572                    }
573                    if let Some(access_key) = &self.access_key {
574                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
575                    }
576                    if let Some(secret_key) = &self.secret_key {
577                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
578                    }
579                    if let Some(path_style_access) = &self.path_style_access {
580                        iceberg_configs.insert(
581                            S3_PATH_STYLE_ACCESS.to_owned(),
582                            path_style_access.to_string(),
583                        );
584                    }
585                };
586
587                if let Some(credential) = &self.credential {
588                    iceberg_configs.insert("credential".to_owned(), credential.clone());
589                }
590                if let Some(token) = &self.token {
591                    iceberg_configs.insert("token".to_owned(), token.clone());
592                }
593                if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
594                    iceberg_configs
595                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
596                }
597                if let Some(scope) = &self.scope {
598                    iceberg_configs.insert("scope".to_owned(), scope.clone());
599                }
600
601                let headers = self.headers()?;
602                for (header_name, header_value) in headers {
603                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
604                }
605
606                let config_builder =
607                    iceberg_catalog_rest::RestCatalogConfig::builder()
608                        .uri(self.catalog_uri.clone().with_context(|| {
609                            "`catalog.uri` must be set in rest catalog".to_owned()
610                        })?)
611                        .props(iceberg_configs);
612
613                let config = match &self.warehouse_path {
614                    Some(warehouse_path) => {
615                        config_builder.warehouse(warehouse_path.clone()).build()
616                    }
617                    None => config_builder.build(),
618                };
619                let catalog = iceberg_catalog_rest::RestCatalog::new(config);
620                Ok(Arc::new(catalog))
621            }
622            "glue_rust" => {
623                let mut iceberg_configs = HashMap::new();
624                // glue
625                if let Some(region) = &self.region {
626                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
627                }
628                if let Some(access_key) = &self.access_key {
629                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
630                }
631                if let Some(secret_key) = &self.secret_key {
632                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
633                }
634                // s3
635                if let Some(region) = &self.region {
636                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
637                }
638                if let Some(endpoint) = &self.endpoint {
639                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
640                }
641                if let Some(access_key) = &self.access_key {
642                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
643                }
644                if let Some(secret_key) = &self.secret_key {
645                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
646                }
647                if let Some(path_style_access) = &self.path_style_access {
648                    iceberg_configs.insert(
649                        S3_PATH_STYLE_ACCESS.to_owned(),
650                        path_style_access.to_string(),
651                    );
652                }
653                let config_builder =
654                    iceberg_catalog_glue::GlueCatalogConfig::builder()
655                        .warehouse(self.warehouse_path.clone().ok_or_else(|| {
656                            anyhow!("`warehouse.path` must be set in glue catalog")
657                        })?)
658                        .props(iceberg_configs);
659                let config = if let Some(uri) = self.catalog_uri.as_deref() {
660                    config_builder.uri(uri.to_owned()).build()
661                } else {
662                    config_builder.build()
663                };
664                let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
665                Ok(Arc::new(catalog))
666            }
667            catalog_type
668                if catalog_type == "hive"
669                    || catalog_type == "snowflake"
670                    || catalog_type == "jdbc"
671                    || catalog_type == "rest"
672                    || catalog_type == "glue" =>
673            {
674                // Create java catalog
675                let (file_io_props, java_catalog_props) =
676                    self.build_jni_catalog_configs(java_catalog_props)?;
677                let catalog_impl = match catalog_type {
678                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
679                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
680                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
681                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
682                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
683                    _ => unreachable!(),
684                };
685
686                jni_catalog::JniCatalog::build_catalog(
687                    file_io_props,
688                    self.catalog_name(),
689                    catalog_impl,
690                    java_catalog_props,
691                )
692            }
693            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
694            _ => {
695                bail!(
696                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
697                    self.catalog_type()
698                )
699            }
700        }
701    }
702
703    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
704    pub async fn load_table(
705        &self,
706        table: &IcebergTableIdentifier,
707        java_catalog_props: &HashMap<String, String>,
708    ) -> ConnectorResult<Table> {
709        let catalog = self
710            .create_catalog(java_catalog_props)
711            .await
712            .context("Unable to load iceberg catalog")?;
713
714        let table_id = table
715            .to_table_ident()
716            .context("Unable to parse table name")?;
717
718        catalog.load_table(&table_id).await.map_err(Into::into)
719    }
720}