risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
24use ::iceberg::table::Table;
25use ::iceberg::{Catalog, TableIdent};
26use anyhow::{Context, anyhow};
27use iceberg::io::{
28    AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
29    GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
30};
31use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
32use phf::{Set, phf_set};
33use risingwave_common::bail;
34use risingwave_common::util::env_var::env_var_is_true;
35use serde::Deserialize;
36use serde_with::serde_as;
37use url::Url;
38use with_options::WithOptions;
39
40use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
41use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
42use crate::deserialize_optional_bool_from_string;
43use crate::enforce_secret::EnforceSecret;
44use crate::error::ConnectorResult;
45
46#[serde_as]
47#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
48pub struct IcebergCommon {
49    // Catalog type supported by iceberg, such as "storage", "rest".
50    // If not set, we use "storage" as default.
51    #[serde(rename = "catalog.type")]
52    pub catalog_type: Option<String>,
53    #[serde(rename = "s3.region")]
54    pub region: Option<String>,
55    #[serde(rename = "s3.endpoint")]
56    pub endpoint: Option<String>,
57    #[serde(rename = "s3.access.key")]
58    pub access_key: Option<String>,
59    #[serde(rename = "s3.secret.key")]
60    pub secret_key: Option<String>,
61
62    #[serde(rename = "gcs.credential")]
63    pub gcs_credential: Option<String>,
64
65    #[serde(rename = "azblob.account_name")]
66    pub azblob_account_name: Option<String>,
67    #[serde(rename = "azblob.account_key")]
68    pub azblob_account_key: Option<String>,
69    #[serde(rename = "azblob.endpoint_url")]
70    pub azblob_endpoint_url: Option<String>,
71
72    /// Path of iceberg warehouse.
73    #[serde(rename = "warehouse.path")]
74    pub warehouse_path: Option<String>,
75    /// AWS Client id, can be omitted for storage catalog or when
76    /// caller's AWS account ID matches glue id
77    #[serde(rename = "glue.id")]
78    pub glue_id: Option<String>,
79    /// Catalog name, default value is risingwave.
80    #[serde(rename = "catalog.name")]
81    pub catalog_name: Option<String>,
82    /// URI of iceberg catalog, only applicable in rest catalog.
83    #[serde(rename = "catalog.uri")]
84    pub catalog_uri: Option<String>,
85    #[serde(rename = "database.name")]
86    pub database_name: Option<String>,
87    /// Full name of table, must include schema name.
88    #[serde(rename = "table.name")]
89    pub table_name: String,
90    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
91    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
92    #[serde(rename = "catalog.credential")]
93    pub credential: Option<String>,
94    /// token for accessing iceberg catalog, only applicable in rest catalog.
95    /// A Bearer token which will be used for interaction with the server.
96    #[serde(rename = "catalog.token")]
97    pub token: Option<String>,
98    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
99    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
100    #[serde(rename = "catalog.oauth2_server_uri")]
101    pub oauth2_server_uri: Option<String>,
102    /// scope for accessing iceberg catalog, only applicable in rest catalog.
103    /// Additional scope for `OAuth2`.
104    #[serde(rename = "catalog.scope")]
105    pub scope: Option<String>,
106
107    /// The signing region to use when signing requests to the REST catalog.
108    #[serde(rename = "catalog.rest.signing_region")]
109    pub rest_signing_region: Option<String>,
110
111    /// The signing name to use when signing requests to the REST catalog.
112    #[serde(rename = "catalog.rest.signing_name")]
113    pub rest_signing_name: Option<String>,
114
115    /// Whether to use `SigV4` for signing requests to the REST catalog.
116    #[serde(
117        rename = "catalog.rest.sigv4_enabled",
118        default,
119        deserialize_with = "deserialize_optional_bool_from_string"
120    )]
121    pub rest_sigv4_enabled: Option<bool>,
122
123    #[serde(
124        rename = "s3.path.style.access",
125        default,
126        deserialize_with = "deserialize_optional_bool_from_string"
127    )]
128    pub path_style_access: Option<bool>,
129    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
130    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
131    pub enable_config_load: Option<bool>,
132
133    /// This is only used by iceberg engine to enable the hosted catalog.
134    #[serde(
135        rename = "hosted_catalog",
136        default,
137        deserialize_with = "deserialize_optional_bool_from_string"
138    )]
139    pub hosted_catalog: Option<bool>,
140
141    /// The http header to be used in the catalog requests.
142    /// Example:
143    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
144    /// explain the format of the header:
145    /// - Each header is a key-value pair, separated by an '='.
146    /// - Multiple headers can be specified, separated by a ';'.
147    #[serde(rename = "catalog.header")]
148    pub header: Option<String>,
149}
150
151impl EnforceSecret for IcebergCommon {
152    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
153        "s3.access.key",
154        "s3.secret.key",
155        "gcs.credential",
156        "catalog.credential",
157        "catalog.token",
158        "catalog.oauth2_server_uri",
159    };
160}
161
162impl IcebergCommon {
163    pub fn catalog_type(&self) -> &str {
164        self.catalog_type.as_deref().unwrap_or("storage")
165    }
166
167    pub fn catalog_name(&self) -> String {
168        self.catalog_name
169            .as_ref()
170            .cloned()
171            .unwrap_or_else(|| "risingwave".to_owned())
172    }
173
174    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
175        let mut headers = HashMap::new();
176        headers.insert("User-Agent".to_owned(), "RisingWave".to_owned());
177        if let Some(header) = &self.header {
178            for pair in header.split(';') {
179                let mut parts = pair.split('=');
180                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
181                    headers.insert(key.to_owned(), value.to_owned());
182                } else {
183                    bail!("Invalid header format: {}", pair);
184                }
185            }
186        }
187        Ok(headers)
188    }
189
190    pub fn enable_config_load(&self) -> bool {
191        // If the env var is set to true, we disable the default config load. (Cloud environment)
192        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
193            return false;
194        }
195        self.enable_config_load.unwrap_or(false)
196    }
197
198    /// For both V1 and V2.
199    fn build_jni_catalog_configs(
200        &self,
201        java_catalog_props: &HashMap<String, String>,
202    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
203        let mut iceberg_configs = HashMap::new();
204        let enable_config_load = self.enable_config_load();
205        let file_io_props = {
206            let catalog_type = self.catalog_type().to_owned();
207
208            if let Some(region) = &self.region {
209                // iceberg-rust
210                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
211            }
212
213            if let Some(endpoint) = &self.endpoint {
214                // iceberg-rust
215                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
216            }
217
218            // iceberg-rust
219            if let Some(access_key) = &self.access_key {
220                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
221            }
222            if let Some(secret_key) = &self.secret_key {
223                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
224            }
225            if let Some(gcs_credential) = &self.gcs_credential {
226                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
227                if catalog_type != "rest" && catalog_type != "rest_rust" {
228                    bail!("gcs unsupported in {} catalog", &catalog_type);
229                }
230            }
231
232            if let (
233                Some(azblob_account_name),
234                Some(azblob_account_key),
235                Some(azblob_endpoint_url),
236            ) = (
237                &self.azblob_account_name,
238                &self.azblob_account_key,
239                &self.azblob_endpoint_url,
240            ) {
241                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
242                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
243                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
244
245                if catalog_type != "rest" && catalog_type != "rest_rust" {
246                    bail!("azblob unsupported in {} catalog", &catalog_type);
247                }
248            }
249
250            match &self.warehouse_path {
251                Some(warehouse_path) => {
252                    let (bucket, _) = {
253                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
254                        let url = Url::parse(warehouse_path);
255                        if (url.is_err() || is_s3_tables)
256                            && (catalog_type == "rest" || catalog_type == "rest_rust")
257                        {
258                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
259                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
260                            // so we allow it to pass here.
261                            (None, None)
262                        } else {
263                            let url = url.with_context(|| {
264                                format!("Invalid warehouse path: {}", warehouse_path)
265                            })?;
266                            let bucket = url
267                                .host_str()
268                                .with_context(|| {
269                                    format!(
270                                        "Invalid s3 path: {}, bucket is missing",
271                                        warehouse_path
272                                    )
273                                })?
274                                .to_owned();
275                            let root = url.path().trim_start_matches('/').to_owned();
276                            (Some(bucket), Some(root))
277                        }
278                    };
279
280                    if let Some(bucket) = bucket {
281                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
282                    }
283                }
284                None => {
285                    if catalog_type != "rest" && catalog_type != "rest_rust" {
286                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
287                    }
288                }
289            }
290            iceberg_configs.insert(
291                S3_DISABLE_CONFIG_LOAD.to_owned(),
292                (!enable_config_load).to_string(),
293            );
294
295            iceberg_configs.insert(
296                GCS_DISABLE_CONFIG_LOAD.to_owned(),
297                (!enable_config_load).to_string(),
298            );
299
300            if let Some(path_style_access) = self.path_style_access {
301                iceberg_configs.insert(
302                    S3_PATH_STYLE_ACCESS.to_owned(),
303                    path_style_access.to_string(),
304                );
305            }
306
307            iceberg_configs
308        };
309
310        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
311        let mut java_catalog_configs = HashMap::new();
312        {
313            if let Some(uri) = self.catalog_uri.as_deref() {
314                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
315            }
316
317            if let Some(warehouse_path) = &self.warehouse_path {
318                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
319            }
320            java_catalog_configs.extend(java_catalog_props.clone());
321
322            // Currently we only support s3, so let's set it to s3
323            java_catalog_configs.insert(
324                "io-impl".to_owned(),
325                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
326            );
327
328            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
329            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
330
331            if let Some(region) = &self.region {
332                java_catalog_configs.insert("client.region".to_owned(), region.clone());
333            }
334            if let Some(endpoint) = &self.endpoint {
335                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
336            }
337
338            if let Some(access_key) = &self.access_key {
339                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
340            }
341            if let Some(secret_key) = &self.secret_key {
342                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
343            }
344
345            if let Some(path_style_access) = &self.path_style_access {
346                java_catalog_configs.insert(
347                    "s3.path-style-access".to_owned(),
348                    path_style_access.to_string(),
349                );
350            }
351
352            let headers = self.headers()?;
353            for (header_name, header_value) in headers {
354                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
355            }
356
357            match self.catalog_type.as_deref() {
358                Some("rest") => {
359                    if let Some(credential) = &self.credential {
360                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
361                    }
362                    if let Some(token) = &self.token {
363                        java_catalog_configs.insert("token".to_owned(), token.clone());
364                    }
365                    if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
366                        java_catalog_configs
367                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
368                    }
369                    if let Some(scope) = &self.scope {
370                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
371                    }
372                    if let Some(rest_signing_region) = &self.rest_signing_region {
373                        java_catalog_configs.insert(
374                            "rest.signing-region".to_owned(),
375                            rest_signing_region.clone(),
376                        );
377                    }
378                    if let Some(rest_signing_name) = &self.rest_signing_name {
379                        java_catalog_configs
380                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
381                    }
382                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
383                        java_catalog_configs.insert(
384                            "rest.sigv4-enabled".to_owned(),
385                            rest_sigv4_enabled.to_string(),
386                        );
387
388                        if let Some(access_key) = &self.access_key {
389                            java_catalog_configs
390                                .insert("rest.access-key-id".to_owned(), access_key.clone());
391                        }
392
393                        if let Some(secret_key) = &self.secret_key {
394                            java_catalog_configs
395                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
396                        }
397                    }
398                }
399                Some("glue") => {
400                    if !enable_config_load {
401                        java_catalog_configs.insert(
402                            "client.credentials-provider".to_owned(),
403                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
404                        );
405                        // Use S3 ak/sk and region as glue ak/sk and region by default.
406                        // TODO: use different ak/sk and region for s3 and glue.
407                        if let Some(access_key) = &self.access_key {
408                            java_catalog_configs.insert(
409                                "client.credentials-provider.glue.access-key-id".to_owned(),
410                                access_key.clone(),
411                            );
412                        }
413                        if let Some(secret_key) = &self.secret_key {
414                            java_catalog_configs.insert(
415                                "client.credentials-provider.glue.secret-access-key".to_owned(),
416                                secret_key.clone(),
417                            );
418                        }
419                    }
420
421                    if let Some(region) = &self.region {
422                        java_catalog_configs.insert("client.region".to_owned(), region.clone());
423                        java_catalog_configs.insert(
424                            "glue.endpoint".to_owned(),
425                            format!("https://glue.{}.amazonaws.com", region),
426                        );
427                    }
428
429                    if let Some(glue_id) = self.glue_id.as_deref() {
430                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
431                    }
432                }
433                _ => {}
434            }
435        }
436
437        Ok((file_io_props, java_catalog_configs))
438    }
439}
440
441impl IcebergCommon {
442    pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
443        let ret = if let Some(database_name) = &self.database_name {
444            TableIdent::from_strs(vec![database_name, &self.table_name])
445        } else {
446            TableIdent::from_strs(vec![&self.table_name])
447        };
448
449        Ok(ret.context("Failed to create table identifier")?)
450    }
451
452    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
453    pub async fn create_catalog(
454        &self,
455        java_catalog_props: &HashMap<String, String>,
456    ) -> ConnectorResult<Arc<dyn Catalog>> {
457        match self.catalog_type() {
458            "storage" => {
459                let warehouse = self
460                    .warehouse_path
461                    .clone()
462                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
463                let url = Url::parse(warehouse.as_ref())
464                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
465
466                let config = match url.scheme() {
467                    "s3" | "s3a" => StorageCatalogConfig::S3(
468                        storage_catalog::StorageCatalogS3Config::builder()
469                            .warehouse(warehouse)
470                            .access_key(self.access_key.clone())
471                            .secret_key(self.secret_key.clone())
472                            .region(self.region.clone())
473                            .endpoint(self.endpoint.clone())
474                            .path_style_access(self.path_style_access)
475                            .enable_config_load(Some(self.enable_config_load()))
476                            .build(),
477                    ),
478                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
479                        storage_catalog::StorageCatalogGcsConfig::builder()
480                            .warehouse(warehouse)
481                            .credential(self.gcs_credential.clone())
482                            .enable_config_load(Some(self.enable_config_load()))
483                            .build(),
484                    ),
485                    "azblob" => StorageCatalogConfig::Azblob(
486                        storage_catalog::StorageCatalogAzblobConfig::builder()
487                            .warehouse(warehouse)
488                            .account_name(self.azblob_account_name.clone())
489                            .account_key(self.azblob_account_key.clone())
490                            .endpoint(self.azblob_endpoint_url.clone())
491                            .build(),
492                    ),
493                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
494                };
495
496                let catalog = storage_catalog::StorageCatalog::new(config)?;
497                Ok(Arc::new(catalog))
498            }
499            "rest_rust" => {
500                let mut iceberg_configs = HashMap::new();
501
502                // check gcs credential or s3 access key and secret key
503                if let Some(gcs_credential) = &self.gcs_credential {
504                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
505                } else {
506                    if let Some(region) = &self.region {
507                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
508                    }
509                    if let Some(endpoint) = &self.endpoint {
510                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
511                    }
512                    if let Some(access_key) = &self.access_key {
513                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
514                    }
515                    if let Some(secret_key) = &self.secret_key {
516                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
517                    }
518                    if let Some(path_style_access) = &self.path_style_access {
519                        iceberg_configs.insert(
520                            S3_PATH_STYLE_ACCESS.to_owned(),
521                            path_style_access.to_string(),
522                        );
523                    }
524                };
525
526                if let Some(credential) = &self.credential {
527                    iceberg_configs.insert("credential".to_owned(), credential.clone());
528                }
529                if let Some(token) = &self.token {
530                    iceberg_configs.insert("token".to_owned(), token.clone());
531                }
532                if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
533                    iceberg_configs
534                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
535                }
536                if let Some(scope) = &self.scope {
537                    iceberg_configs.insert("scope".to_owned(), scope.clone());
538                }
539
540                let headers = self.headers()?;
541                for (header_name, header_value) in headers {
542                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
543                }
544
545                let config_builder =
546                    iceberg_catalog_rest::RestCatalogConfig::builder()
547                        .uri(self.catalog_uri.clone().with_context(|| {
548                            "`catalog.uri` must be set in rest catalog".to_owned()
549                        })?)
550                        .props(iceberg_configs);
551
552                let config = match &self.warehouse_path {
553                    Some(warehouse_path) => {
554                        config_builder.warehouse(warehouse_path.clone()).build()
555                    }
556                    None => config_builder.build(),
557                };
558                let catalog = iceberg_catalog_rest::RestCatalog::new(config);
559                Ok(Arc::new(catalog))
560            }
561            "glue_rust" => {
562                let mut iceberg_configs = HashMap::new();
563                // glue
564                if let Some(region) = &self.region {
565                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
566                }
567                if let Some(access_key) = &self.access_key {
568                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
569                }
570                if let Some(secret_key) = &self.secret_key {
571                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
572                }
573                // s3
574                if let Some(region) = &self.region {
575                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
576                }
577                if let Some(endpoint) = &self.endpoint {
578                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
579                }
580                if let Some(access_key) = &self.access_key {
581                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
582                }
583                if let Some(secret_key) = &self.secret_key {
584                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
585                }
586                if let Some(path_style_access) = &self.path_style_access {
587                    iceberg_configs.insert(
588                        S3_PATH_STYLE_ACCESS.to_owned(),
589                        path_style_access.to_string(),
590                    );
591                }
592                let config_builder =
593                    iceberg_catalog_glue::GlueCatalogConfig::builder()
594                        .warehouse(self.warehouse_path.clone().ok_or_else(|| {
595                            anyhow!("`warehouse.path` must be set in glue catalog")
596                        })?)
597                        .props(iceberg_configs);
598                let config = if let Some(uri) = self.catalog_uri.as_deref() {
599                    config_builder.uri(uri.to_owned()).build()
600                } else {
601                    config_builder.build()
602                };
603                let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
604                Ok(Arc::new(catalog))
605            }
606            catalog_type
607                if catalog_type == "hive"
608                    || catalog_type == "snowflake"
609                    || catalog_type == "jdbc"
610                    || catalog_type == "rest"
611                    || catalog_type == "glue" =>
612            {
613                // Create java catalog
614                let (file_io_props, java_catalog_props) =
615                    self.build_jni_catalog_configs(java_catalog_props)?;
616                let catalog_impl = match catalog_type {
617                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
618                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
619                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
620                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
621                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
622                    _ => unreachable!(),
623                };
624
625                jni_catalog::JniCatalog::build_catalog(
626                    file_io_props,
627                    self.catalog_name(),
628                    catalog_impl,
629                    java_catalog_props,
630                )
631            }
632            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
633            _ => {
634                bail!(
635                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
636                    self.catalog_type()
637                )
638            }
639        }
640    }
641
642    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
643    pub async fn load_table(
644        &self,
645        java_catalog_props: &HashMap<String, String>,
646    ) -> ConnectorResult<Table> {
647        let catalog = self
648            .create_catalog(java_catalog_props)
649            .await
650            .context("Unable to load iceberg catalog")?;
651
652        let table_id = self
653            .full_table_name()
654            .context("Unable to parse table name")?;
655
656        catalog.load_table(&table_id).await.map_err(Into::into)
657    }
658}