risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{
24    S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::{
30    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
31    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
32};
33use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
34use phf::{Set, phf_set};
35use risingwave_common::bail;
36use risingwave_common::util::deployment::Deployment;
37use risingwave_common::util::env_var::env_var_is_true;
38use serde::Deserialize;
39use serde_with::serde_as;
40use url::Url;
41use with_options::WithOptions;
42
43use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
44use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
45use crate::deserialize_optional_bool_from_string;
46use crate::enforce_secret::EnforceSecret;
47use crate::error::ConnectorResult;
48
49#[serde_as]
50#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
51pub struct IcebergCommon {
52    // Catalog type supported by iceberg, such as "storage", "rest".
53    // If not set, we use "storage" as default.
54    #[serde(rename = "catalog.type")]
55    pub catalog_type: Option<String>,
56    #[serde(rename = "s3.region")]
57    pub s3_region: Option<String>,
58    #[serde(rename = "s3.endpoint")]
59    pub s3_endpoint: Option<String>,
60    #[serde(rename = "s3.access.key")]
61    pub s3_access_key: Option<String>,
62    #[serde(rename = "s3.secret.key")]
63    pub s3_secret_key: Option<String>,
64    #[serde(rename = "s3.iam_role_arn")]
65    pub s3_iam_role_arn: Option<String>,
66
67    #[serde(rename = "glue.access.key")]
68    pub glue_access_key: Option<String>,
69    #[serde(rename = "glue.secret.key")]
70    pub glue_secret_key: Option<String>,
71    #[serde(rename = "glue.iam_role_arn")]
72    pub glue_iam_role_arn: Option<String>,
73    #[serde(rename = "glue.region")]
74    pub glue_region: Option<String>,
75    /// AWS Client id, can be omitted for storage catalog or when
76    /// caller's AWS account ID matches glue id
77    #[serde(rename = "glue.id")]
78    pub glue_id: Option<String>,
79
80    #[serde(rename = "gcs.credential")]
81    pub gcs_credential: Option<String>,
82
83    #[serde(rename = "azblob.account_name")]
84    pub azblob_account_name: Option<String>,
85    #[serde(rename = "azblob.account_key")]
86    pub azblob_account_key: Option<String>,
87    #[serde(rename = "azblob.endpoint_url")]
88    pub azblob_endpoint_url: Option<String>,
89
90    #[serde(rename = "adlsgen2.account_name")]
91    pub adlsgen2_account_name: Option<String>,
92    #[serde(rename = "adlsgen2.account_key")]
93    pub adlsgen2_account_key: Option<String>,
94    #[serde(rename = "adlsgen2.endpoint")]
95    pub adlsgen2_endpoint: Option<String>,
96
97    /// Path of iceberg warehouse.
98    #[serde(rename = "warehouse.path")]
99    pub warehouse_path: Option<String>,
100    /// Catalog name, default value is risingwave.
101    #[serde(rename = "catalog.name")]
102    pub catalog_name: Option<String>,
103    /// URI of iceberg catalog, only applicable in rest catalog.
104    #[serde(rename = "catalog.uri")]
105    pub catalog_uri: Option<String>,
106    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
107    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
108    #[serde(rename = "catalog.credential")]
109    pub catalog_credential: Option<String>,
110    /// token for accessing iceberg catalog, only applicable in rest catalog.
111    /// A Bearer token which will be used for interaction with the server.
112    #[serde(rename = "catalog.token")]
113    pub catalog_token: Option<String>,
114    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
115    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
116    #[serde(rename = "catalog.oauth2_server_uri")]
117    pub catalog_oauth2_server_uri: Option<String>,
118    /// scope for accessing iceberg catalog, only applicable in rest catalog.
119    /// Additional scope for `OAuth2`.
120    #[serde(rename = "catalog.scope")]
121    pub catalog_scope: Option<String>,
122
123    /// The signing region to use when signing requests to the REST catalog.
124    #[serde(rename = "catalog.rest.signing_region")]
125    pub rest_signing_region: Option<String>,
126
127    /// The signing name to use when signing requests to the REST catalog.
128    #[serde(rename = "catalog.rest.signing_name")]
129    pub rest_signing_name: Option<String>,
130
131    /// Whether to use `SigV4` for signing requests to the REST catalog.
132    #[serde(
133        rename = "catalog.rest.sigv4_enabled",
134        default,
135        deserialize_with = "deserialize_optional_bool_from_string"
136    )]
137    pub rest_sigv4_enabled: Option<bool>,
138
139    #[serde(
140        rename = "s3.path.style.access",
141        default,
142        deserialize_with = "deserialize_optional_bool_from_string"
143    )]
144    pub s3_path_style_access: Option<bool>,
145    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
146    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
147    pub enable_config_load: Option<bool>,
148
149    /// This is only used by iceberg engine to enable the hosted catalog.
150    #[serde(
151        rename = "hosted_catalog",
152        default,
153        deserialize_with = "deserialize_optional_bool_from_string"
154    )]
155    pub hosted_catalog: Option<bool>,
156
157    /// The http header to be used in the catalog requests.
158    /// Example:
159    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
160    /// explain the format of the header:
161    /// - Each header is a key-value pair, separated by an '='.
162    /// - Multiple headers can be specified, separated by a ';'.
163    #[serde(rename = "catalog.header")]
164    pub catalog_header: Option<String>,
165
166    /// Enable vended credentials for iceberg REST catalog.
167    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
168    pub vended_credentials: Option<bool>,
169}
170
171impl EnforceSecret for IcebergCommon {
172    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
173        "s3.access.key",
174        "s3.secret.key",
175        "gcs.credential",
176        "catalog.credential",
177        "catalog.token",
178        "catalog.oauth2_server_uri",
179        "adlsgen2.account_key",
180        "adlsgen2.client_secret",
181        "glue.access.key",
182        "glue.secret.key",
183    };
184}
185
186#[serde_as]
187#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
188#[serde(deny_unknown_fields)]
189pub struct IcebergTableIdentifier {
190    #[serde(rename = "database.name")]
191    pub database_name: Option<String>,
192    /// Full name of table, must include schema name when database is provided.
193    #[serde(rename = "table.name")]
194    pub table_name: String,
195}
196
197impl IcebergTableIdentifier {
198    pub fn database_name(&self) -> Option<&str> {
199        self.database_name.as_deref()
200    }
201
202    pub fn table_name(&self) -> &str {
203        &self.table_name
204    }
205
206    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
207        let ret = if let Some(database_name) = &self.database_name {
208            TableIdent::from_strs(vec![database_name, &self.table_name])
209        } else {
210            TableIdent::from_strs(vec![&self.table_name])
211        };
212
213        Ok(ret.context("Failed to create table identifier")?)
214    }
215}
216
217impl IcebergCommon {
218    pub fn catalog_type(&self) -> &str {
219        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
220        if self.vended_credentials() && catalog_type == "rest" {
221            "rest_rust"
222        } else {
223            catalog_type
224        }
225    }
226
227    pub fn vended_credentials(&self) -> bool {
228        self.vended_credentials.unwrap_or(false)
229    }
230
231    fn glue_access_key(&self) -> Option<&str> {
232        self.glue_access_key
233            .as_deref()
234            .or(self.s3_access_key.as_deref())
235    }
236
237    fn glue_secret_key(&self) -> Option<&str> {
238        self.glue_secret_key
239            .as_deref()
240            .or(self.s3_secret_key.as_deref())
241    }
242
243    fn glue_region(&self) -> Option<&str> {
244        self.glue_region.as_deref().or(self.s3_region.as_deref())
245    }
246
247    pub fn catalog_name(&self) -> String {
248        self.catalog_name
249            .as_ref()
250            .cloned()
251            .unwrap_or_else(|| "risingwave".to_owned())
252    }
253
254    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
255        let mut headers = HashMap::new();
256        let user_agent = match Deployment::current() {
257            Deployment::Ci => "RisingWave(CI)".to_owned(),
258            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
259            Deployment::Other => "RisingWave(OSS)".to_owned(),
260        };
261        if self.vended_credentials() {
262            headers.insert(
263                "X-Iceberg-Access-Delegation".to_owned(),
264                "vended-credentials".to_owned(),
265            );
266        }
267        headers.insert("User-Agent".to_owned(), user_agent);
268        if let Some(header) = &self.catalog_header {
269            for pair in header.split(';') {
270                let mut parts = pair.split('=');
271                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
272                    headers.insert(key.to_owned(), value.to_owned());
273                } else {
274                    bail!("Invalid header format: {}", pair);
275                }
276            }
277        }
278        Ok(headers)
279    }
280
281    pub fn enable_config_load(&self) -> bool {
282        // If the env var is set to true, we disable the default config load. (Cloud environment)
283        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
284            if matches!(self.enable_config_load, Some(true)) {
285                tracing::warn!(
286                    "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
287                );
288            }
289            return false;
290        }
291        self.enable_config_load.unwrap_or(false)
292    }
293
294    /// For both V1 and V2.
295    fn build_jni_catalog_configs(
296        &self,
297        java_catalog_props: &HashMap<String, String>,
298    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
299        let mut iceberg_configs = HashMap::new();
300        let enable_config_load = self.enable_config_load();
301        let file_io_props = {
302            let catalog_type = self.catalog_type().to_owned();
303
304            if let Some(region) = &self.s3_region {
305                // iceberg-rust
306                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
307            }
308
309            if let Some(endpoint) = &self.s3_endpoint {
310                // iceberg-rust
311                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
312            }
313
314            // iceberg-rust
315            if let Some(access_key) = &self.s3_access_key {
316                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
317            }
318            if let Some(secret_key) = &self.s3_secret_key {
319                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
320            }
321            if let Some(role_arn) = &self.s3_iam_role_arn {
322                iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
323            }
324            if let Some(gcs_credential) = &self.gcs_credential {
325                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
326                if catalog_type != "rest" && catalog_type != "rest_rust" {
327                    bail!("gcs unsupported in {} catalog", &catalog_type);
328                }
329            }
330
331            if let (
332                Some(azblob_account_name),
333                Some(azblob_account_key),
334                Some(azblob_endpoint_url),
335            ) = (
336                &self.azblob_account_name,
337                &self.azblob_account_key,
338                &self.azblob_endpoint_url,
339            ) {
340                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
341                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
342                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
343
344                if catalog_type != "rest" && catalog_type != "rest_rust" {
345                    bail!("azblob unsupported in {} catalog", &catalog_type);
346                }
347            }
348
349            if let (Some(account_name), Some(account_key)) = (
350                self.adlsgen2_account_name.as_ref(),
351                self.adlsgen2_account_key.as_ref(),
352            ) {
353                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
354                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
355                if catalog_type != "rest" && catalog_type != "rest_rust" {
356                    bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
357                }
358            }
359
360            match &self.warehouse_path {
361                Some(warehouse_path) => {
362                    let (bucket, _) = {
363                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
364                        let url = Url::parse(warehouse_path);
365                        if (url.is_err() || is_s3_tables)
366                            && (catalog_type == "rest" || catalog_type == "rest_rust")
367                        {
368                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
369                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
370                            // so we allow it to pass here.
371                            (None, None)
372                        } else {
373                            let url = url.with_context(|| {
374                                format!("Invalid warehouse path: {}", warehouse_path)
375                            })?;
376                            let bucket = url
377                                .host_str()
378                                .with_context(|| {
379                                    format!(
380                                        "Invalid s3 path: {}, bucket is missing",
381                                        warehouse_path
382                                    )
383                                })?
384                                .to_owned();
385                            let root = url.path().trim_start_matches('/').to_owned();
386                            (Some(bucket), Some(root))
387                        }
388                    };
389
390                    if let Some(bucket) = bucket {
391                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
392                    }
393                }
394                None => {
395                    if catalog_type != "rest" && catalog_type != "rest_rust" {
396                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
397                    }
398                }
399            }
400            iceberg_configs.insert(
401                S3_DISABLE_CONFIG_LOAD.to_owned(),
402                (!enable_config_load).to_string(),
403            );
404
405            iceberg_configs.insert(
406                GCS_DISABLE_CONFIG_LOAD.to_owned(),
407                (!enable_config_load).to_string(),
408            );
409
410            if let Some(path_style_access) = self.s3_path_style_access {
411                iceberg_configs.insert(
412                    S3_PATH_STYLE_ACCESS.to_owned(),
413                    path_style_access.to_string(),
414                );
415            }
416
417            iceberg_configs
418        };
419
420        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
421        let mut java_catalog_configs = HashMap::new();
422        {
423            if let Some(uri) = self.catalog_uri.as_deref() {
424                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
425            }
426
427            if let Some(warehouse_path) = &self.warehouse_path {
428                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
429            }
430            java_catalog_configs.extend(java_catalog_props.clone());
431
432            // Currently we only support s3, so let's set it to s3
433            java_catalog_configs.insert(
434                "io-impl".to_owned(),
435                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
436            );
437
438            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
439            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
440
441            if let Some(region) = &self.s3_region {
442                java_catalog_configs.insert("client.region".to_owned(), region.clone());
443            }
444            if let Some(endpoint) = &self.s3_endpoint {
445                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
446            }
447
448            if let Some(access_key) = &self.s3_access_key {
449                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
450            }
451            if let Some(secret_key) = &self.s3_secret_key {
452                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
453            }
454
455            if let Some(path_style_access) = &self.s3_path_style_access {
456                java_catalog_configs.insert(
457                    "s3.path-style-access".to_owned(),
458                    path_style_access.to_string(),
459                );
460            }
461
462            let headers = self.headers()?;
463            for (header_name, header_value) in headers {
464                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
465            }
466
467            match self.catalog_type() {
468                "rest" => {
469                    if let Some(credential) = &self.catalog_credential {
470                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
471                    }
472                    if let Some(token) = &self.catalog_token {
473                        java_catalog_configs.insert("token".to_owned(), token.clone());
474                    }
475                    if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
476                        java_catalog_configs
477                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
478                    }
479                    if let Some(scope) = &self.catalog_scope {
480                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
481                    }
482                    if let Some(rest_signing_region) = &self.rest_signing_region {
483                        java_catalog_configs.insert(
484                            "rest.signing-region".to_owned(),
485                            rest_signing_region.clone(),
486                        );
487                    }
488                    if let Some(rest_signing_name) = &self.rest_signing_name {
489                        java_catalog_configs
490                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
491                    }
492                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
493                        java_catalog_configs.insert(
494                            "rest.sigv4-enabled".to_owned(),
495                            rest_sigv4_enabled.to_string(),
496                        );
497
498                        if let Some(access_key) = &self.s3_access_key {
499                            java_catalog_configs
500                                .insert("rest.access-key-id".to_owned(), access_key.clone());
501                        }
502
503                        if let Some(secret_key) = &self.s3_secret_key {
504                            java_catalog_configs
505                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
506                        }
507                    }
508                }
509                "glue" => {
510                    let glue_access_key = self.glue_access_key();
511                    let glue_secret_key = self.glue_secret_key();
512                    let has_glue_credentials =
513                        glue_access_key.is_some() && glue_secret_key.is_some();
514                    let should_configure_glue_provider = !enable_config_load
515                        || has_glue_credentials
516                        || self.glue_iam_role_arn.is_some();
517
518                    if should_configure_glue_provider {
519                        java_catalog_configs.insert(
520                            "client.credentials-provider".to_owned(),
521                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
522                        );
523                        if let Some(region) = self.glue_region() {
524                            java_catalog_configs.insert(
525                                "client.credentials-provider.glue.region".to_owned(),
526                                region.to_owned(),
527                            );
528                        }
529                        if let Some(access_key) = glue_access_key {
530                            java_catalog_configs.insert(
531                                "client.credentials-provider.glue.access-key-id".to_owned(),
532                                access_key.to_owned(),
533                            );
534                        }
535                        if let Some(secret_key) = glue_secret_key {
536                            java_catalog_configs.insert(
537                                "client.credentials-provider.glue.secret-access-key".to_owned(),
538                                secret_key.to_owned(),
539                            );
540                        }
541                        if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
542                            java_catalog_configs.insert(
543                                "client.credentials-provider.glue.iam-role-arn".to_owned(),
544                                role_arn.to_owned(),
545                            );
546                        }
547                        if enable_config_load && !has_glue_credentials {
548                            java_catalog_configs.insert(
549                                "client.credentials-provider.glue.use-default-credential-chain"
550                                    .to_owned(),
551                                "true".to_owned(),
552                            );
553                        }
554                    }
555
556                    if let Some(region) = self.glue_region() {
557                        java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
558                        java_catalog_configs.insert(
559                            "glue.endpoint".to_owned(),
560                            format!("https://glue.{}.amazonaws.com", region),
561                        );
562                    }
563
564                    if let Some(glue_id) = self.glue_id.as_deref() {
565                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
566                    }
567                }
568                _ => {}
569            }
570        }
571
572        Ok((file_io_props, java_catalog_configs))
573    }
574}
575
576impl IcebergCommon {
577    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
578    pub async fn create_catalog(
579        &self,
580        java_catalog_props: &HashMap<String, String>,
581    ) -> ConnectorResult<Arc<dyn Catalog>> {
582        match self.catalog_type() {
583            "storage" => {
584                let warehouse = self
585                    .warehouse_path
586                    .clone()
587                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
588                let url = Url::parse(warehouse.as_ref())
589                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
590
591                let config = match url.scheme() {
592                    "s3" | "s3a" => StorageCatalogConfig::S3(
593                        storage_catalog::StorageCatalogS3Config::builder()
594                            .warehouse(warehouse)
595                            .access_key(self.s3_access_key.clone())
596                            .secret_key(self.s3_secret_key.clone())
597                            .region(self.s3_region.clone())
598                            .endpoint(self.s3_endpoint.clone())
599                            .path_style_access(self.s3_path_style_access)
600                            .enable_config_load(Some(self.enable_config_load()))
601                            .build(),
602                    ),
603                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
604                        storage_catalog::StorageCatalogGcsConfig::builder()
605                            .warehouse(warehouse)
606                            .credential(self.gcs_credential.clone())
607                            .enable_config_load(Some(self.enable_config_load()))
608                            .build(),
609                    ),
610                    "azblob" => StorageCatalogConfig::Azblob(
611                        storage_catalog::StorageCatalogAzblobConfig::builder()
612                            .warehouse(warehouse)
613                            .account_name(self.azblob_account_name.clone())
614                            .account_key(self.azblob_account_key.clone())
615                            .endpoint(self.azblob_endpoint_url.clone())
616                            .build(),
617                    ),
618                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
619                };
620
621                let catalog = storage_catalog::StorageCatalog::new(config)?;
622                Ok(Arc::new(catalog))
623            }
624            "rest_rust" => {
625                let mut iceberg_configs = HashMap::new();
626
627                // check gcs credential or s3 access key and secret key
628                if let Some(gcs_credential) = &self.gcs_credential {
629                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
630                } else {
631                    if let Some(region) = &self.s3_region {
632                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
633                    }
634                    if let Some(endpoint) = &self.s3_endpoint {
635                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
636                    }
637                    if let Some(access_key) = &self.s3_access_key {
638                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
639                    }
640                    if let Some(secret_key) = &self.s3_secret_key {
641                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
642                    }
643                    if let Some(path_style_access) = &self.s3_path_style_access {
644                        iceberg_configs.insert(
645                            S3_PATH_STYLE_ACCESS.to_owned(),
646                            path_style_access.to_string(),
647                        );
648                    }
649                };
650
651                if let Some(credential) = &self.catalog_credential {
652                    iceberg_configs.insert("credential".to_owned(), credential.clone());
653                }
654                if let Some(token) = &self.catalog_token {
655                    iceberg_configs.insert("token".to_owned(), token.clone());
656                }
657                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
658                    iceberg_configs
659                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
660                }
661                if let Some(scope) = &self.catalog_scope {
662                    iceberg_configs.insert("scope".to_owned(), scope.clone());
663                }
664
665                let headers = self.headers()?;
666                for (header_name, header_value) in headers {
667                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
668                }
669
670                let config_builder =
671                    iceberg_catalog_rest::RestCatalogConfig::builder()
672                        .uri(self.catalog_uri.clone().with_context(|| {
673                            "`catalog.uri` must be set in rest catalog".to_owned()
674                        })?)
675                        .props(iceberg_configs);
676
677                let config = match &self.warehouse_path {
678                    Some(warehouse_path) => {
679                        config_builder.warehouse(warehouse_path.clone()).build()
680                    }
681                    None => config_builder.build(),
682                };
683                let catalog = iceberg_catalog_rest::RestCatalog::new(config);
684                Ok(Arc::new(catalog))
685            }
686            "glue_rust" => {
687                let mut iceberg_configs = HashMap::new();
688                // glue
689                if let Some(region) = self.glue_region() {
690                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
691                }
692                if let Some(access_key) = self.glue_access_key() {
693                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
694                }
695                if let Some(secret_key) = self.glue_secret_key() {
696                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
697                }
698                // s3
699                if let Some(region) = &self.s3_region {
700                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
701                }
702                if let Some(endpoint) = &self.s3_endpoint {
703                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
704                }
705                if let Some(access_key) = &self.s3_access_key {
706                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
707                }
708                if let Some(secret_key) = &self.s3_secret_key {
709                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
710                }
711                if let Some(role_arn) = &self.s3_iam_role_arn {
712                    iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
713                }
714                if let Some(path_style_access) = &self.s3_path_style_access {
715                    iceberg_configs.insert(
716                        S3_PATH_STYLE_ACCESS.to_owned(),
717                        path_style_access.to_string(),
718                    );
719                }
720                let config_builder =
721                    iceberg_catalog_glue::GlueCatalogConfig::builder()
722                        .warehouse(self.warehouse_path.clone().ok_or_else(|| {
723                            anyhow!("`warehouse.path` must be set in glue catalog")
724                        })?)
725                        .props(iceberg_configs);
726                let config = if let Some(uri) = self.catalog_uri.as_deref() {
727                    config_builder.uri(uri.to_owned()).build()
728                } else {
729                    config_builder.build()
730                };
731                let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
732                Ok(Arc::new(catalog))
733            }
734            catalog_type
735                if catalog_type == "hive"
736                    || catalog_type == "snowflake"
737                    || catalog_type == "jdbc"
738                    || catalog_type == "rest"
739                    || catalog_type == "glue" =>
740            {
741                // Create java catalog
742                let (file_io_props, java_catalog_props) =
743                    self.build_jni_catalog_configs(java_catalog_props)?;
744                let catalog_impl = match catalog_type {
745                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
746                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
747                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
748                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
749                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
750                    _ => unreachable!(),
751                };
752
753                jni_catalog::JniCatalog::build_catalog(
754                    file_io_props,
755                    self.catalog_name(),
756                    catalog_impl,
757                    java_catalog_props,
758                )
759            }
760            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
761            _ => {
762                bail!(
763                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
764                    self.catalog_type()
765                )
766            }
767        }
768    }
769
770    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
771    pub async fn load_table(
772        &self,
773        table: &IcebergTableIdentifier,
774        java_catalog_props: &HashMap<String, String>,
775    ) -> ConnectorResult<Table> {
776        let catalog = self
777            .create_catalog(java_catalog_props)
778            .await
779            .context("Unable to load iceberg catalog")?;
780
781        let table_id = table
782            .to_table_ident()
783            .context("Unable to parse table name")?;
784
785        catalog.load_table(&table_id).await.map_err(Into::into)
786    }
787}