risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{
24    S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::{
30    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
31    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
32};
33use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
34use phf::{Set, phf_set};
35use risingwave_common::bail;
36use risingwave_common::error::IcebergError;
37use risingwave_common::util::deployment::Deployment;
38use risingwave_common::util::env_var::env_var_is_true;
39use serde::Deserialize;
40use serde_with::serde_as;
41use url::Url;
42use with_options::WithOptions;
43
44use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
45use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
46use crate::deserialize_optional_bool_from_string;
47use crate::enforce_secret::EnforceSecret;
48use crate::error::ConnectorResult;
49
50#[serde_as]
51#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
52pub struct IcebergCommon {
53    // Catalog type supported by iceberg, such as "storage", "rest".
54    // If not set, we use "storage" as default.
55    #[serde(rename = "catalog.type")]
56    pub catalog_type: Option<String>,
57    #[serde(rename = "s3.region")]
58    pub s3_region: Option<String>,
59    #[serde(rename = "s3.endpoint")]
60    pub s3_endpoint: Option<String>,
61    #[serde(rename = "s3.access.key")]
62    pub s3_access_key: Option<String>,
63    #[serde(rename = "s3.secret.key")]
64    pub s3_secret_key: Option<String>,
65    #[serde(rename = "s3.iam_role_arn")]
66    pub s3_iam_role_arn: Option<String>,
67
68    #[serde(rename = "glue.access.key")]
69    pub glue_access_key: Option<String>,
70    #[serde(rename = "glue.secret.key")]
71    pub glue_secret_key: Option<String>,
72    #[serde(rename = "glue.iam_role_arn")]
73    pub glue_iam_role_arn: Option<String>,
74    #[serde(rename = "glue.region")]
75    pub glue_region: Option<String>,
76    /// AWS Client id, can be omitted for storage catalog or when
77    /// caller's AWS account ID matches glue id
78    #[serde(rename = "glue.id")]
79    pub glue_id: Option<String>,
80
81    #[serde(rename = "gcs.credential")]
82    pub gcs_credential: Option<String>,
83
84    #[serde(rename = "azblob.account_name")]
85    pub azblob_account_name: Option<String>,
86    #[serde(rename = "azblob.account_key")]
87    pub azblob_account_key: Option<String>,
88    #[serde(rename = "azblob.endpoint_url")]
89    pub azblob_endpoint_url: Option<String>,
90
91    #[serde(rename = "adlsgen2.account_name")]
92    pub adlsgen2_account_name: Option<String>,
93    #[serde(rename = "adlsgen2.account_key")]
94    pub adlsgen2_account_key: Option<String>,
95    #[serde(rename = "adlsgen2.endpoint")]
96    pub adlsgen2_endpoint: Option<String>,
97
98    /// Path of iceberg warehouse.
99    #[serde(rename = "warehouse.path")]
100    pub warehouse_path: Option<String>,
101    /// Catalog name, default value is risingwave.
102    #[serde(rename = "catalog.name")]
103    pub catalog_name: Option<String>,
104    /// URI of iceberg catalog, only applicable in rest catalog.
105    #[serde(rename = "catalog.uri")]
106    pub catalog_uri: Option<String>,
107    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
108    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
109    #[serde(rename = "catalog.credential")]
110    pub catalog_credential: Option<String>,
111    /// token for accessing iceberg catalog, only applicable in rest catalog.
112    /// A Bearer token which will be used for interaction with the server.
113    #[serde(rename = "catalog.token")]
114    pub catalog_token: Option<String>,
115    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
116    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
117    #[serde(rename = "catalog.oauth2_server_uri")]
118    pub catalog_oauth2_server_uri: Option<String>,
119    /// scope for accessing iceberg catalog, only applicable in rest catalog.
120    /// Additional scope for `OAuth2`.
121    #[serde(rename = "catalog.scope")]
122    pub catalog_scope: Option<String>,
123
124    /// The signing region to use when signing requests to the REST catalog.
125    #[serde(rename = "catalog.rest.signing_region")]
126    pub rest_signing_region: Option<String>,
127
128    /// The signing name to use when signing requests to the REST catalog.
129    #[serde(rename = "catalog.rest.signing_name")]
130    pub rest_signing_name: Option<String>,
131
132    /// Whether to use `SigV4` for signing requests to the REST catalog.
133    #[serde(
134        rename = "catalog.rest.sigv4_enabled",
135        default,
136        deserialize_with = "deserialize_optional_bool_from_string"
137    )]
138    pub rest_sigv4_enabled: Option<bool>,
139
140    #[serde(
141        rename = "s3.path.style.access",
142        default,
143        deserialize_with = "deserialize_optional_bool_from_string"
144    )]
145    pub s3_path_style_access: Option<bool>,
146    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
147    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
148    pub enable_config_load: Option<bool>,
149
150    /// This is only used by iceberg engine to enable the hosted catalog.
151    #[serde(
152        rename = "hosted_catalog",
153        default,
154        deserialize_with = "deserialize_optional_bool_from_string"
155    )]
156    pub hosted_catalog: Option<bool>,
157
158    /// The http header to be used in the catalog requests.
159    /// Example:
160    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
161    /// explain the format of the header:
162    /// - Each header is a key-value pair, separated by an '='.
163    /// - Multiple headers can be specified, separated by a ';'.
164    #[serde(rename = "catalog.header")]
165    pub catalog_header: Option<String>,
166
167    /// Enable vended credentials for iceberg REST catalog.
168    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
169    pub vended_credentials: Option<bool>,
170}
171
172impl EnforceSecret for IcebergCommon {
173    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
174        "s3.access.key",
175        "s3.secret.key",
176        "gcs.credential",
177        "catalog.credential",
178        "catalog.token",
179        "catalog.oauth2_server_uri",
180        "adlsgen2.account_key",
181        "adlsgen2.client_secret",
182        "glue.access.key",
183        "glue.secret.key",
184    };
185}
186
187#[serde_as]
188#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
189#[serde(deny_unknown_fields)]
190pub struct IcebergTableIdentifier {
191    #[serde(rename = "database.name")]
192    pub database_name: Option<String>,
193    /// Full name of table, must include schema name when database is provided.
194    #[serde(rename = "table.name")]
195    pub table_name: String,
196}
197
198impl IcebergTableIdentifier {
199    pub fn database_name(&self) -> Option<&str> {
200        self.database_name.as_deref()
201    }
202
203    pub fn table_name(&self) -> &str {
204        &self.table_name
205    }
206
207    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
208        let ret = if let Some(database_name) = &self.database_name {
209            TableIdent::from_strs(vec![database_name, &self.table_name])
210        } else {
211            TableIdent::from_strs(vec![&self.table_name])
212        };
213
214        Ok(ret.context("Failed to create table identifier")?)
215    }
216}
217
218impl IcebergCommon {
219    pub fn catalog_type(&self) -> &str {
220        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
221        if self.vended_credentials() && catalog_type == "rest" {
222            "rest_rust"
223        } else {
224            catalog_type
225        }
226    }
227
228    pub fn vended_credentials(&self) -> bool {
229        self.vended_credentials.unwrap_or(false)
230    }
231
232    fn glue_access_key(&self) -> Option<&str> {
233        self.glue_access_key
234            .as_deref()
235            .or(self.s3_access_key.as_deref())
236    }
237
238    fn glue_secret_key(&self) -> Option<&str> {
239        self.glue_secret_key
240            .as_deref()
241            .or(self.s3_secret_key.as_deref())
242    }
243
244    fn glue_region(&self) -> Option<&str> {
245        self.glue_region.as_deref().or(self.s3_region.as_deref())
246    }
247
248    pub fn catalog_name(&self) -> String {
249        self.catalog_name
250            .as_ref()
251            .cloned()
252            .unwrap_or_else(|| "risingwave".to_owned())
253    }
254
255    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
256        let mut headers = HashMap::new();
257        let user_agent = match Deployment::current() {
258            Deployment::Ci => "RisingWave(CI)".to_owned(),
259            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
260            Deployment::Other => "RisingWave(OSS)".to_owned(),
261        };
262        if self.vended_credentials() {
263            headers.insert(
264                "X-Iceberg-Access-Delegation".to_owned(),
265                "vended-credentials".to_owned(),
266            );
267        }
268        headers.insert("User-Agent".to_owned(), user_agent);
269        if let Some(header) = &self.catalog_header {
270            for pair in header.split(';') {
271                let mut parts = pair.split('=');
272                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
273                    headers.insert(key.to_owned(), value.to_owned());
274                } else {
275                    bail!("Invalid header format: {}", pair);
276                }
277            }
278        }
279        Ok(headers)
280    }
281
282    pub fn enable_config_load(&self) -> bool {
283        // If the env var is set to true, we disable the default config load. (Cloud environment)
284        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
285            if matches!(self.enable_config_load, Some(true)) {
286                tracing::warn!(
287                    "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
288                );
289            }
290            return false;
291        }
292        self.enable_config_load.unwrap_or(false)
293    }
294
295    /// For both V1 and V2.
296    fn build_jni_catalog_configs(
297        &self,
298        java_catalog_props: &HashMap<String, String>,
299    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
300        let mut iceberg_configs = HashMap::new();
301        let enable_config_load = self.enable_config_load();
302        let file_io_props = {
303            let catalog_type = self.catalog_type().to_owned();
304
305            if let Some(region) = &self.s3_region {
306                // iceberg-rust
307                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
308            }
309
310            if let Some(endpoint) = &self.s3_endpoint {
311                // iceberg-rust
312                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
313            }
314
315            // iceberg-rust
316            if let Some(access_key) = &self.s3_access_key {
317                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
318            }
319            if let Some(secret_key) = &self.s3_secret_key {
320                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
321            }
322            if let Some(role_arn) = &self.s3_iam_role_arn {
323                iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
324            }
325            if let Some(gcs_credential) = &self.gcs_credential {
326                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
327                if catalog_type != "rest" && catalog_type != "rest_rust" {
328                    bail!("gcs unsupported in {} catalog", &catalog_type);
329                }
330            }
331
332            if let (
333                Some(azblob_account_name),
334                Some(azblob_account_key),
335                Some(azblob_endpoint_url),
336            ) = (
337                &self.azblob_account_name,
338                &self.azblob_account_key,
339                &self.azblob_endpoint_url,
340            ) {
341                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
342                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
343                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
344
345                if catalog_type != "rest" && catalog_type != "rest_rust" {
346                    bail!("azblob unsupported in {} catalog", &catalog_type);
347                }
348            }
349
350            if let (Some(account_name), Some(account_key)) = (
351                self.adlsgen2_account_name.as_ref(),
352                self.adlsgen2_account_key.as_ref(),
353            ) {
354                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
355                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
356                if catalog_type != "rest" && catalog_type != "rest_rust" {
357                    bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
358                }
359            }
360
361            match &self.warehouse_path {
362                Some(warehouse_path) => {
363                    let (bucket, _) = {
364                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
365                        let url = Url::parse(warehouse_path);
366                        if (url.is_err() || is_s3_tables)
367                            && (catalog_type == "rest" || catalog_type == "rest_rust")
368                        {
369                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
370                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
371                            // so we allow it to pass here.
372                            (None, None)
373                        } else {
374                            let url = url.with_context(|| {
375                                format!("Invalid warehouse path: {}", warehouse_path)
376                            })?;
377                            let bucket = url
378                                .host_str()
379                                .with_context(|| {
380                                    format!(
381                                        "Invalid s3 path: {}, bucket is missing",
382                                        warehouse_path
383                                    )
384                                })?
385                                .to_owned();
386                            let root = url.path().trim_start_matches('/').to_owned();
387                            (Some(bucket), Some(root))
388                        }
389                    };
390
391                    if let Some(bucket) = bucket {
392                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
393                    }
394                }
395                None => {
396                    if catalog_type != "rest" && catalog_type != "rest_rust" {
397                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
398                    }
399                }
400            }
401            iceberg_configs.insert(
402                S3_DISABLE_CONFIG_LOAD.to_owned(),
403                (!enable_config_load).to_string(),
404            );
405
406            iceberg_configs.insert(
407                GCS_DISABLE_CONFIG_LOAD.to_owned(),
408                (!enable_config_load).to_string(),
409            );
410
411            if let Some(path_style_access) = self.s3_path_style_access {
412                iceberg_configs.insert(
413                    S3_PATH_STYLE_ACCESS.to_owned(),
414                    path_style_access.to_string(),
415                );
416            }
417
418            iceberg_configs
419        };
420
421        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
422        let mut java_catalog_configs = HashMap::new();
423        {
424            if let Some(uri) = self.catalog_uri.as_deref() {
425                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
426            }
427
428            if let Some(warehouse_path) = &self.warehouse_path {
429                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
430            }
431            java_catalog_configs.extend(java_catalog_props.clone());
432
433            // Currently we only support s3, so let's set it to s3
434            java_catalog_configs.insert(
435                "io-impl".to_owned(),
436                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
437            );
438
439            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
440            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
441
442            if let Some(region) = &self.s3_region {
443                java_catalog_configs.insert("client.region".to_owned(), region.clone());
444            }
445            if let Some(endpoint) = &self.s3_endpoint {
446                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
447            }
448
449            if let Some(access_key) = &self.s3_access_key {
450                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
451            }
452            if let Some(secret_key) = &self.s3_secret_key {
453                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
454            }
455
456            if let Some(path_style_access) = &self.s3_path_style_access {
457                java_catalog_configs.insert(
458                    "s3.path-style-access".to_owned(),
459                    path_style_access.to_string(),
460                );
461            }
462
463            let headers = self.headers()?;
464            for (header_name, header_value) in headers {
465                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
466            }
467
468            match self.catalog_type() {
469                "rest" => {
470                    if let Some(credential) = &self.catalog_credential {
471                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
472                    }
473                    if let Some(token) = &self.catalog_token {
474                        java_catalog_configs.insert("token".to_owned(), token.clone());
475                    }
476                    if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
477                        java_catalog_configs
478                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
479                    }
480                    if let Some(scope) = &self.catalog_scope {
481                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
482                    }
483                    if let Some(rest_signing_region) = &self.rest_signing_region {
484                        java_catalog_configs.insert(
485                            "rest.signing-region".to_owned(),
486                            rest_signing_region.clone(),
487                        );
488                    }
489                    if let Some(rest_signing_name) = &self.rest_signing_name {
490                        java_catalog_configs
491                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
492                    }
493                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
494                        java_catalog_configs.insert(
495                            "rest.sigv4-enabled".to_owned(),
496                            rest_sigv4_enabled.to_string(),
497                        );
498
499                        if let Some(access_key) = &self.s3_access_key {
500                            java_catalog_configs
501                                .insert("rest.access-key-id".to_owned(), access_key.clone());
502                        }
503
504                        if let Some(secret_key) = &self.s3_secret_key {
505                            java_catalog_configs
506                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
507                        }
508                    }
509                }
510                "glue" => {
511                    let glue_access_key = self.glue_access_key();
512                    let glue_secret_key = self.glue_secret_key();
513                    let has_glue_credentials =
514                        glue_access_key.is_some() && glue_secret_key.is_some();
515                    let should_configure_glue_provider = !enable_config_load
516                        || has_glue_credentials
517                        || self.glue_iam_role_arn.is_some();
518
519                    if should_configure_glue_provider {
520                        java_catalog_configs.insert(
521                            "client.credentials-provider".to_owned(),
522                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
523                        );
524                        if let Some(region) = self.glue_region() {
525                            java_catalog_configs.insert(
526                                "client.credentials-provider.glue.region".to_owned(),
527                                region.to_owned(),
528                            );
529                        }
530                        if let Some(access_key) = glue_access_key {
531                            java_catalog_configs.insert(
532                                "client.credentials-provider.glue.access-key-id".to_owned(),
533                                access_key.to_owned(),
534                            );
535                        }
536                        if let Some(secret_key) = glue_secret_key {
537                            java_catalog_configs.insert(
538                                "client.credentials-provider.glue.secret-access-key".to_owned(),
539                                secret_key.to_owned(),
540                            );
541                        }
542                        if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
543                            java_catalog_configs.insert(
544                                "client.credentials-provider.glue.iam-role-arn".to_owned(),
545                                role_arn.to_owned(),
546                            );
547                        }
548                        if enable_config_load && !has_glue_credentials {
549                            java_catalog_configs.insert(
550                                "client.credentials-provider.glue.use-default-credential-chain"
551                                    .to_owned(),
552                                "true".to_owned(),
553                            );
554                        }
555                    }
556
557                    if let Some(region) = self.glue_region() {
558                        java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
559                        java_catalog_configs.insert(
560                            "glue.endpoint".to_owned(),
561                            format!("https://glue.{}.amazonaws.com", region),
562                        );
563                    }
564
565                    if let Some(glue_id) = self.glue_id.as_deref() {
566                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
567                    }
568                }
569                _ => {}
570            }
571        }
572
573        Ok((file_io_props, java_catalog_configs))
574    }
575}
576
577impl IcebergCommon {
578    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
579    pub async fn create_catalog(
580        &self,
581        java_catalog_props: &HashMap<String, String>,
582    ) -> ConnectorResult<Arc<dyn Catalog>> {
583        match self.catalog_type() {
584            "storage" => {
585                let warehouse = self
586                    .warehouse_path
587                    .clone()
588                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
589                let url = Url::parse(warehouse.as_ref())
590                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
591
592                let config = match url.scheme() {
593                    "s3" | "s3a" => StorageCatalogConfig::S3(
594                        storage_catalog::StorageCatalogS3Config::builder()
595                            .warehouse(warehouse)
596                            .access_key(self.s3_access_key.clone())
597                            .secret_key(self.s3_secret_key.clone())
598                            .region(self.s3_region.clone())
599                            .endpoint(self.s3_endpoint.clone())
600                            .path_style_access(self.s3_path_style_access)
601                            .enable_config_load(Some(self.enable_config_load()))
602                            .build(),
603                    ),
604                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
605                        storage_catalog::StorageCatalogGcsConfig::builder()
606                            .warehouse(warehouse)
607                            .credential(self.gcs_credential.clone())
608                            .enable_config_load(Some(self.enable_config_load()))
609                            .build(),
610                    ),
611                    "azblob" => StorageCatalogConfig::Azblob(
612                        storage_catalog::StorageCatalogAzblobConfig::builder()
613                            .warehouse(warehouse)
614                            .account_name(self.azblob_account_name.clone())
615                            .account_key(self.azblob_account_key.clone())
616                            .endpoint(self.azblob_endpoint_url.clone())
617                            .build(),
618                    ),
619                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
620                };
621
622                let catalog = storage_catalog::StorageCatalog::new(config)?;
623                Ok(Arc::new(catalog))
624            }
625            "rest_rust" => {
626                let mut iceberg_configs = HashMap::new();
627
628                // check gcs credential or s3 access key and secret key
629                if let Some(gcs_credential) = &self.gcs_credential {
630                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
631                } else {
632                    if let Some(region) = &self.s3_region {
633                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
634                    }
635                    if let Some(endpoint) = &self.s3_endpoint {
636                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
637                    }
638                    if let Some(access_key) = &self.s3_access_key {
639                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
640                    }
641                    if let Some(secret_key) = &self.s3_secret_key {
642                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
643                    }
644                    if let Some(path_style_access) = &self.s3_path_style_access {
645                        iceberg_configs.insert(
646                            S3_PATH_STYLE_ACCESS.to_owned(),
647                            path_style_access.to_string(),
648                        );
649                    }
650                };
651
652                if let Some(credential) = &self.catalog_credential {
653                    iceberg_configs.insert("credential".to_owned(), credential.clone());
654                }
655                if let Some(token) = &self.catalog_token {
656                    iceberg_configs.insert("token".to_owned(), token.clone());
657                }
658                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
659                    iceberg_configs
660                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
661                }
662                if let Some(scope) = &self.catalog_scope {
663                    iceberg_configs.insert("scope".to_owned(), scope.clone());
664                }
665
666                let headers = self.headers()?;
667                for (header_name, header_value) in headers {
668                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
669                }
670
671                iceberg_configs.insert(
672                    iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
673                    self.catalog_uri
674                        .clone()
675                        .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
676                );
677                if let Some(warehouse_path) = &self.warehouse_path {
678                    iceberg_configs.insert(
679                        iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
680                        warehouse_path.clone(),
681                    );
682                }
683                let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
684                    .load("rest", iceberg_configs)
685                    .await
686                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
687                Ok(Arc::new(catalog))
688            }
689            "glue_rust" => {
690                let mut iceberg_configs = HashMap::new();
691                // glue
692                if let Some(region) = self.glue_region() {
693                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
694                }
695                if let Some(access_key) = self.glue_access_key() {
696                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
697                }
698                if let Some(secret_key) = self.glue_secret_key() {
699                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
700                }
701                // s3
702                if let Some(region) = &self.s3_region {
703                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
704                }
705                if let Some(endpoint) = &self.s3_endpoint {
706                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
707                }
708                if let Some(access_key) = &self.s3_access_key {
709                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
710                }
711                if let Some(secret_key) = &self.s3_secret_key {
712                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
713                }
714                if let Some(role_arn) = &self.s3_iam_role_arn {
715                    iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
716                }
717                if let Some(path_style_access) = &self.s3_path_style_access {
718                    iceberg_configs.insert(
719                        S3_PATH_STYLE_ACCESS.to_owned(),
720                        path_style_access.to_string(),
721                    );
722                }
723                iceberg_configs.insert(
724                    iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
725                    self.warehouse_path
726                        .clone()
727                        .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
728                );
729                if let Some(uri) = self.catalog_uri.as_deref() {
730                    iceberg_configs.insert(
731                        iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
732                        uri.to_owned(),
733                    );
734                }
735                let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
736                    .load("glue", iceberg_configs)
737                    .await
738                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
739                Ok(Arc::new(catalog))
740            }
741            catalog_type
742                if catalog_type == "hive"
743                    || catalog_type == "snowflake"
744                    || catalog_type == "jdbc"
745                    || catalog_type == "rest"
746                    || catalog_type == "glue" =>
747            {
748                // Create java catalog
749                let (file_io_props, java_catalog_props) =
750                    self.build_jni_catalog_configs(java_catalog_props)?;
751                let catalog_impl = match catalog_type {
752                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
753                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
754                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
755                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
756                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
757                    _ => unreachable!(),
758                };
759
760                jni_catalog::JniCatalog::build_catalog(
761                    file_io_props,
762                    self.catalog_name(),
763                    catalog_impl,
764                    java_catalog_props,
765                )
766            }
767            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
768            _ => {
769                bail!(
770                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
771                    self.catalog_type()
772                )
773            }
774        }
775    }
776
777    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
778    pub async fn load_table(
779        &self,
780        table: &IcebergTableIdentifier,
781        java_catalog_props: &HashMap<String, String>,
782    ) -> ConnectorResult<Table> {
783        let catalog = self
784            .create_catalog(java_catalog_props)
785            .await
786            .context("Unable to load iceberg catalog")?;
787
788        let table_id = table
789            .to_table_ident()
790            .context("Unable to parse table name")?;
791
792        catalog.load_table(&table_id).await.map_err(Into::into)
793    }
794}