risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24    S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56    // Catalog type supported by iceberg, such as "storage", "rest".
57    // If not set, we use "storage" as default.
58    #[serde(rename = "catalog.type")]
59    pub catalog_type: Option<String>,
60    #[serde(rename = "s3.region")]
61    pub s3_region: Option<String>,
62    #[serde(rename = "s3.endpoint")]
63    pub s3_endpoint: Option<String>,
64    #[serde(rename = "s3.access.key")]
65    pub s3_access_key: Option<String>,
66    #[serde(rename = "s3.secret.key")]
67    pub s3_secret_key: Option<String>,
68    #[serde(rename = "s3.iam_role_arn")]
69    pub s3_iam_role_arn: Option<String>,
70
71    #[serde(rename = "glue.access.key")]
72    pub glue_access_key: Option<String>,
73    #[serde(rename = "glue.secret.key")]
74    pub glue_secret_key: Option<String>,
75    #[serde(rename = "glue.iam_role_arn")]
76    pub glue_iam_role_arn: Option<String>,
77    #[serde(rename = "glue.region")]
78    pub glue_region: Option<String>,
79    /// AWS Client id, can be omitted for storage catalog or when
80    /// caller's AWS account ID matches glue id
81    #[serde(rename = "glue.id")]
82    pub glue_id: Option<String>,
83
84    #[serde(rename = "gcs.credential")]
85    pub gcs_credential: Option<String>,
86
87    #[serde(rename = "azblob.account_name")]
88    pub azblob_account_name: Option<String>,
89    #[serde(rename = "azblob.account_key")]
90    pub azblob_account_key: Option<String>,
91    #[serde(rename = "azblob.endpoint_url")]
92    pub azblob_endpoint_url: Option<String>,
93
94    #[serde(rename = "adlsgen2.account_name")]
95    pub adlsgen2_account_name: Option<String>,
96    #[serde(rename = "adlsgen2.account_key")]
97    pub adlsgen2_account_key: Option<String>,
98    #[serde(rename = "adlsgen2.endpoint")]
99    pub adlsgen2_endpoint: Option<String>,
100
101    /// Path of iceberg warehouse.
102    #[serde(rename = "warehouse.path")]
103    pub warehouse_path: Option<String>,
104    /// Catalog name, default value is risingwave.
105    #[serde(rename = "catalog.name")]
106    pub catalog_name: Option<String>,
107    /// URI of iceberg catalog, only applicable in rest catalog.
108    #[serde(rename = "catalog.uri")]
109    pub catalog_uri: Option<String>,
110    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
111    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
112    #[serde(rename = "catalog.credential")]
113    pub catalog_credential: Option<String>,
114    /// token for accessing iceberg catalog, only applicable in rest catalog.
115    /// A Bearer token which will be used for interaction with the server.
116    #[serde(rename = "catalog.token")]
117    pub catalog_token: Option<String>,
118    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
119    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
120    #[serde(rename = "catalog.oauth2_server_uri")]
121    pub catalog_oauth2_server_uri: Option<String>,
122    /// scope for accessing iceberg catalog, only applicable in rest catalog.
123    /// Additional scope for `OAuth2`.
124    #[serde(rename = "catalog.scope")]
125    pub catalog_scope: Option<String>,
126
127    /// The signing region to use when signing requests to the REST catalog.
128    #[serde(rename = "catalog.rest.signing_region")]
129    pub rest_signing_region: Option<String>,
130
131    /// The signing name to use when signing requests to the REST catalog.
132    #[serde(rename = "catalog.rest.signing_name")]
133    pub rest_signing_name: Option<String>,
134
135    /// Whether to use `SigV4` for signing requests to the REST catalog.
136    #[serde(
137        rename = "catalog.rest.sigv4_enabled",
138        default,
139        deserialize_with = "deserialize_optional_bool_from_string"
140    )]
141    pub rest_sigv4_enabled: Option<bool>,
142
143    #[serde(
144        rename = "s3.path.style.access",
145        default,
146        deserialize_with = "deserialize_optional_bool_from_string"
147    )]
148    pub s3_path_style_access: Option<bool>,
149    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
150    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151    pub enable_config_load: Option<bool>,
152
153    /// This is only used by iceberg engine to enable the hosted catalog.
154    #[serde(
155        rename = "hosted_catalog",
156        default,
157        deserialize_with = "deserialize_optional_bool_from_string"
158    )]
159    pub hosted_catalog: Option<bool>,
160
161    /// The http header to be used in the catalog requests.
162    /// Example:
163    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
164    /// explain the format of the header:
165    /// - Each header is a key-value pair, separated by an '='.
166    /// - Multiple headers can be specified, separated by a ';'.
167    #[serde(rename = "catalog.header")]
168    pub catalog_header: Option<String>,
169
170    /// Enable vended credentials for iceberg REST catalog.
171    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
172    pub vended_credentials: Option<bool>,
173
174    /// Security type for REST catalog authentication.
175    /// Supported values: `none`, `oauth2`, `google`.
176    /// When set to `google`, uses Iceberg's `GoogleAuthManager` (requires Iceberg 1.10+)
177    /// for authentication using Google Application Default Credentials (ADC).
178    #[serde(rename = "catalog.security")]
179    pub catalog_security: Option<String>,
180
181    /// OAuth-based scopes for Google authentication.
182    /// Comma-separated list of OAuth-based scopes to request.
183    /// Only applicable when `catalog.security` is set to `google`.
184    /// Default: <https://www.googleapis.com/auth/cloud-platform>
185    #[serde(rename = "gcp.auth.scopes")]
186    pub gcp_auth_scopes: Option<String>,
187
188    /// Custom `FileIO` implementation class for the Iceberg catalog.
189    /// Allows specifying a custom `FileIO` implementation instead of the default.
190    /// Examples:
191    /// - `org.apache.iceberg.aws.s3.S3FileIO` for Amazon S3 (default)
192    /// - `org.apache.iceberg.gcp.gcs.GCSFileIO` for Google Cloud Storage
193    /// - `org.apache.iceberg.azure.adlsv2.ADLSFileIO` for Azure Data Lake Storage Gen2
194    #[serde(rename = "catalog.io_impl")]
195    pub catalog_io_impl: Option<String>,
196}
197
198// Matches iceberg::io::object_cache default size (32MB).
199// TODO: change it after object cache get refactored.
200const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
201const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
202const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
203    SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
204
205impl EnforceSecret for IcebergCommon {
206    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
207        "s3.access.key",
208        "s3.secret.key",
209        "gcs.credential",
210        "catalog.credential",
211        "catalog.token",
212        "catalog.oauth2_server_uri",
213        "adlsgen2.account_key",
214        "adlsgen2.client_secret",
215        "glue.access.key",
216        "glue.secret.key",
217    };
218}
219
220#[serde_as]
221#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
222#[serde(deny_unknown_fields)]
223pub struct IcebergTableIdentifier {
224    #[serde(rename = "database.name")]
225    pub database_name: Option<String>,
226    /// Table name or namespace-qualified table name. Dots are treated as
227    /// Iceberg namespace separators.
228    #[serde(rename = "table.name")]
229    pub table_name: String,
230}
231
232impl IcebergTableIdentifier {
233    pub fn database_name(&self) -> Option<&str> {
234        self.database_name.as_deref()
235    }
236
237    pub fn table_name(&self) -> &str {
238        &self.table_name
239    }
240
241    fn identifier_parts(&self) -> ConnectorResult<Vec<&str>> {
242        let mut parts = Vec::new();
243        if let Some(database_name) = &self.database_name {
244            parts.extend(database_name.split('.'));
245        }
246        parts.extend(self.table_name.split('.'));
247
248        if parts.iter().any(|part| part.is_empty()) {
249            bail!(
250                "Invalid iceberg table identifier '{}': identifier parts must not be empty",
251                self.full_identifier()
252            );
253        }
254
255        Ok(parts)
256    }
257
258    fn full_identifier(&self) -> String {
259        match &self.database_name {
260            Some(database_name) => format!("{}.{}", database_name, self.table_name),
261            None => self.table_name.clone(),
262        }
263    }
264
265    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
266        let ret = TableIdent::from_strs(self.identifier_parts()?);
267
268        Ok(ret.context("Failed to create table identifier")?)
269    }
270
271    pub fn validate(&self) -> ConnectorResult<()> {
272        self.identifier_parts().map(|_| ())
273    }
274}
275
276impl IcebergCommon {
277    pub fn catalog_type(&self) -> &str {
278        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
279        if self.vended_credentials() && catalog_type == "rest" {
280            "rest_rust"
281        } else {
282            catalog_type
283        }
284    }
285
286    pub fn vended_credentials(&self) -> bool {
287        self.vended_credentials.unwrap_or(false)
288    }
289
290    fn glue_access_key(&self) -> Option<&str> {
291        self.glue_access_key
292            .as_deref()
293            .or(self.s3_access_key.as_deref())
294    }
295
296    fn glue_secret_key(&self) -> Option<&str> {
297        self.glue_secret_key
298            .as_deref()
299            .or(self.s3_secret_key.as_deref())
300    }
301
302    fn glue_region(&self) -> Option<&str> {
303        self.glue_region.as_deref().or(self.s3_region.as_deref())
304    }
305
306    pub fn catalog_name(&self) -> String {
307        self.catalog_name
308            .as_ref()
309            .cloned()
310            .unwrap_or_else(|| "risingwave".to_owned())
311    }
312
313    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
314        let mut headers = HashMap::new();
315        let user_agent = match Deployment::current() {
316            Deployment::Ci => "RisingWave(CI)".to_owned(),
317            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
318            Deployment::Other => "RisingWave(OSS)".to_owned(),
319        };
320        if self.vended_credentials() {
321            headers.insert(
322                "X-Iceberg-Access-Delegation".to_owned(),
323                "vended-credentials".to_owned(),
324            );
325        }
326        headers.insert("User-Agent".to_owned(), user_agent);
327        if let Some(header) = &self.catalog_header {
328            for pair in header.split(';') {
329                let mut parts = pair.split('=');
330                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
331                    headers.insert(key.to_owned(), value.to_owned());
332                } else {
333                    bail!("Invalid header format: {}", pair);
334                }
335            }
336        }
337        Ok(headers)
338    }
339
340    pub fn enable_config_load(&self) -> bool {
341        // If the env var is set to true, we disable the default config load. (Cloud environment)
342        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
343            if matches!(self.enable_config_load, Some(true)) {
344                tracing::warn!(
345                    "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
346                );
347            }
348            return false;
349        }
350        self.enable_config_load.unwrap_or(false)
351    }
352
353    /// For both V1 and V2.
354    fn build_jni_catalog_configs(
355        &self,
356        java_catalog_props: &HashMap<String, String>,
357    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
358        let mut iceberg_configs = HashMap::new();
359        let enable_config_load = self.enable_config_load();
360        let file_io_props = {
361            let catalog_type = self.catalog_type().to_owned();
362
363            if let Some(region) = &self.s3_region {
364                // iceberg-rust
365                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
366            }
367
368            if let Some(endpoint) = &self.s3_endpoint {
369                // iceberg-rust
370                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
371            }
372
373            // iceberg-rust
374            if let Some(access_key) = &self.s3_access_key {
375                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
376            }
377            if let Some(secret_key) = &self.s3_secret_key {
378                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
379            }
380            if let Some(role_arn) = &self.s3_iam_role_arn {
381                iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
382            }
383            if let Some(gcs_credential) = &self.gcs_credential {
384                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
385                if catalog_type != "rest" && catalog_type != "rest_rust" {
386                    bail!("gcs unsupported in {} catalog", &catalog_type);
387                }
388            }
389
390            if let (
391                Some(azblob_account_name),
392                Some(azblob_account_key),
393                Some(azblob_endpoint_url),
394            ) = (
395                &self.azblob_account_name,
396                &self.azblob_account_key,
397                &self.azblob_endpoint_url,
398            ) {
399                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
400                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
401                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
402
403                if catalog_type != "rest" && catalog_type != "rest_rust" {
404                    bail!("azblob unsupported in {} catalog", &catalog_type);
405                }
406            }
407
408            if let (Some(account_name), Some(account_key)) = (
409                self.adlsgen2_account_name.as_ref(),
410                self.adlsgen2_account_key.as_ref(),
411            ) {
412                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
413                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
414                if catalog_type != "rest" && catalog_type != "rest_rust" {
415                    bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
416                }
417            }
418
419            match &self.warehouse_path {
420                Some(warehouse_path) => {
421                    let (bucket, _) = {
422                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
423                        // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
424                        let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
425                        let url = Url::parse(warehouse_path);
426                        if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
427                            && (catalog_type == "rest" || catalog_type == "rest_rust")
428                        {
429                            // If the warehouse path is not a valid URL, it could be:
430                            // - A warehouse name in REST catalog
431                            // - An S3 Tables path (arn:aws:s3tables:...)
432                            // - A BigLake path (bq://projects/...) for Google Cloud BigQuery integration
433                            // We allow these to pass through for REST catalogs.
434                            (None, None)
435                        } else {
436                            let url = url.with_context(|| {
437                                format!("Invalid warehouse path: {}", warehouse_path)
438                            })?;
439                            let bucket = url
440                                .host_str()
441                                .with_context(|| {
442                                    format!(
443                                        "Invalid s3 path: {}, bucket is missing",
444                                        warehouse_path
445                                    )
446                                })?
447                                .to_owned();
448                            let root = url.path().trim_start_matches('/').to_owned();
449                            (Some(bucket), Some(root))
450                        }
451                    };
452
453                    if let Some(bucket) = bucket {
454                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
455                    }
456                }
457                None => {
458                    if catalog_type != "rest" && catalog_type != "rest_rust" {
459                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
460                    }
461                }
462            }
463            iceberg_configs.insert(
464                S3_DISABLE_CONFIG_LOAD.to_owned(),
465                (!enable_config_load).to_string(),
466            );
467
468            iceberg_configs.insert(
469                GCS_DISABLE_CONFIG_LOAD.to_owned(),
470                (!enable_config_load).to_string(),
471            );
472
473            if let Some(path_style_access) = self.s3_path_style_access {
474                iceberg_configs.insert(
475                    S3_PATH_STYLE_ACCESS.to_owned(),
476                    path_style_access.to_string(),
477                );
478            }
479
480            iceberg_configs
481        };
482
483        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
484        let mut java_catalog_configs = HashMap::new();
485        {
486            if let Some(uri) = self.catalog_uri.as_deref() {
487                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
488            }
489
490            if let Some(warehouse_path) = &self.warehouse_path {
491                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
492            }
493            java_catalog_configs.extend(java_catalog_props.clone());
494
495            // Set io-impl: use custom io-impl if provided, otherwise default to S3FileIO
496            let io_impl = self
497                .catalog_io_impl
498                .clone()
499                .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
500            java_catalog_configs.insert("io-impl".to_owned(), io_impl);
501
502            // suppress log of FileIO like: Unclosed FileIO instance created by...
503            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
504
505            if let Some(region) = &self.s3_region {
506                java_catalog_configs.insert("client.region".to_owned(), region.clone());
507            }
508            if let Some(endpoint) = &self.s3_endpoint {
509                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
510            }
511
512            if let Some(access_key) = &self.s3_access_key {
513                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
514            }
515            if let Some(secret_key) = &self.s3_secret_key {
516                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
517            }
518
519            if let Some(path_style_access) = &self.s3_path_style_access {
520                java_catalog_configs.insert(
521                    "s3.path-style-access".to_owned(),
522                    path_style_access.to_string(),
523                );
524            }
525
526            let headers = self.headers()?;
527            for (header_name, header_value) in headers {
528                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
529            }
530
531            match self.catalog_type() {
532                "rest" => {
533                    // Handle security type for REST catalog (Iceberg 1.10+)
534                    if let Some(security) = &self.catalog_security {
535                        match security.to_lowercase().as_str() {
536                            "google" => {
537                                // Google AuthManager (Iceberg 1.10+) - uses Google ADC
538                                java_catalog_configs.insert(
539                                    "rest.auth.type".to_owned(),
540                                    "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
541                                );
542                                // Set GCP auth scopes if provided
543                                if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
544                                    java_catalog_configs.insert(
545                                        "gcp.auth.scopes".to_owned(),
546                                        gcp_auth_scopes.clone(),
547                                    );
548                                }
549                            }
550                            "oauth2" => {
551                                // Standard OAuth2 authentication
552                                if let Some(credential) = &self.catalog_credential {
553                                    java_catalog_configs
554                                        .insert("credential".to_owned(), credential.clone());
555                                }
556                                if let Some(token) = &self.catalog_token {
557                                    java_catalog_configs.insert("token".to_owned(), token.clone());
558                                }
559                                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
560                                    java_catalog_configs.insert(
561                                        "oauth2-server-uri".to_owned(),
562                                        oauth2_server_uri.clone(),
563                                    );
564                                }
565                                if let Some(scope) = &self.catalog_scope {
566                                    java_catalog_configs.insert("scope".to_owned(), scope.clone());
567                                }
568                            }
569                            "none" | "" => {
570                                // No authentication
571                            }
572                            _ => {
573                                tracing::warn!(
574                                    "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
575                                    security
576                                );
577                            }
578                        }
579                    } else {
580                        // Legacy behavior: use individual OAuth2 properties if security type not specified
581                        if let Some(credential) = &self.catalog_credential {
582                            java_catalog_configs
583                                .insert("credential".to_owned(), credential.clone());
584                        }
585                        if let Some(token) = &self.catalog_token {
586                            java_catalog_configs.insert("token".to_owned(), token.clone());
587                        }
588                        if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
589                            java_catalog_configs
590                                .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
591                        }
592                        if let Some(scope) = &self.catalog_scope {
593                            java_catalog_configs.insert("scope".to_owned(), scope.clone());
594                        }
595                    }
596                    if let Some(rest_signing_region) = &self.rest_signing_region {
597                        java_catalog_configs.insert(
598                            "rest.signing-region".to_owned(),
599                            rest_signing_region.clone(),
600                        );
601                    }
602                    if let Some(rest_signing_name) = &self.rest_signing_name {
603                        java_catalog_configs
604                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
605                    }
606                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
607                        java_catalog_configs.insert(
608                            "rest.sigv4-enabled".to_owned(),
609                            rest_sigv4_enabled.to_string(),
610                        );
611
612                        if let Some(access_key) = &self.s3_access_key {
613                            java_catalog_configs
614                                .insert("rest.access-key-id".to_owned(), access_key.clone());
615                        }
616
617                        if let Some(secret_key) = &self.s3_secret_key {
618                            java_catalog_configs
619                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
620                        }
621                    }
622                }
623                "glue" => {
624                    let glue_access_key = self.glue_access_key();
625                    let glue_secret_key = self.glue_secret_key();
626                    let has_glue_credentials =
627                        glue_access_key.is_some() && glue_secret_key.is_some();
628                    let should_configure_glue_provider = !enable_config_load
629                        || has_glue_credentials
630                        || self.glue_iam_role_arn.is_some();
631
632                    if should_configure_glue_provider {
633                        java_catalog_configs.insert(
634                            "client.credentials-provider".to_owned(),
635                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
636                        );
637                        if let Some(region) = self.glue_region() {
638                            java_catalog_configs.insert(
639                                "client.credentials-provider.glue.region".to_owned(),
640                                region.to_owned(),
641                            );
642                        }
643                        if let Some(access_key) = glue_access_key {
644                            java_catalog_configs.insert(
645                                "client.credentials-provider.glue.access-key-id".to_owned(),
646                                access_key.to_owned(),
647                            );
648                        }
649                        if let Some(secret_key) = glue_secret_key {
650                            java_catalog_configs.insert(
651                                "client.credentials-provider.glue.secret-access-key".to_owned(),
652                                secret_key.to_owned(),
653                            );
654                        }
655                        if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
656                            java_catalog_configs.insert(
657                                "client.credentials-provider.glue.iam-role-arn".to_owned(),
658                                role_arn.to_owned(),
659                            );
660                        }
661                        if enable_config_load && !has_glue_credentials {
662                            java_catalog_configs.insert(
663                                "client.credentials-provider.glue.use-default-credential-chain"
664                                    .to_owned(),
665                                "true".to_owned(),
666                            );
667                        }
668                    }
669
670                    if let Some(region) = self.glue_region() {
671                        java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
672                        java_catalog_configs.insert(
673                            "glue.endpoint".to_owned(),
674                            format!("https://glue.{}.amazonaws.com", region),
675                        );
676                    }
677
678                    if let Some(glue_id) = self.glue_id.as_deref() {
679                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
680                    }
681                }
682                "jdbc" => {
683                    if let Some(iam_role_arn) = &self.s3_iam_role_arn {
684                        java_catalog_configs
685                            .insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
686                        java_catalog_configs.insert(
687                            "client.factory".to_owned(),
688                            "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
689                        );
690                        if let Some(region) = &self.s3_region {
691                            java_catalog_configs
692                                .insert("client.assume-role.region".to_owned(), region.clone());
693                        }
694                    }
695                }
696                _ => {}
697            }
698        }
699
700        Ok((file_io_props, java_catalog_configs))
701    }
702}
703
704impl IcebergCommon {
705    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
706    pub async fn create_catalog(
707        &self,
708        java_catalog_props: &HashMap<String, String>,
709    ) -> ConnectorResult<Arc<dyn Catalog>> {
710        match self.catalog_type() {
711            "storage" => {
712                let warehouse = self
713                    .warehouse_path
714                    .clone()
715                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
716                let url = Url::parse(warehouse.as_ref())
717                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
718
719                let config = match url.scheme() {
720                    "s3" | "s3a" => StorageCatalogConfig::S3(
721                        storage_catalog::StorageCatalogS3Config::builder()
722                            .warehouse(warehouse)
723                            .access_key(self.s3_access_key.clone())
724                            .secret_key(self.s3_secret_key.clone())
725                            .region(self.s3_region.clone())
726                            .endpoint(self.s3_endpoint.clone())
727                            .path_style_access(self.s3_path_style_access)
728                            .enable_config_load(Some(self.enable_config_load()))
729                            .build(),
730                    ),
731                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
732                        storage_catalog::StorageCatalogGcsConfig::builder()
733                            .warehouse(warehouse)
734                            .credential(self.gcs_credential.clone())
735                            .enable_config_load(Some(self.enable_config_load()))
736                            .build(),
737                    ),
738                    "azblob" => StorageCatalogConfig::Azblob(
739                        storage_catalog::StorageCatalogAzblobConfig::builder()
740                            .warehouse(warehouse)
741                            .account_name(self.azblob_account_name.clone())
742                            .account_key(self.azblob_account_key.clone())
743                            .endpoint(self.azblob_endpoint_url.clone())
744                            .build(),
745                    ),
746                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
747                };
748
749                let catalog = storage_catalog::StorageCatalog::new(config)?;
750                Ok(Arc::new(catalog))
751            }
752            "rest_rust" => {
753                let mut iceberg_configs = HashMap::new();
754
755                // check gcs credential or s3 access key and secret key
756                if let Some(gcs_credential) = &self.gcs_credential {
757                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
758                } else {
759                    if let Some(region) = &self.s3_region {
760                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
761                    }
762                    if let Some(endpoint) = &self.s3_endpoint {
763                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
764                    }
765                    if let Some(access_key) = &self.s3_access_key {
766                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
767                    }
768                    if let Some(secret_key) = &self.s3_secret_key {
769                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
770                    }
771                    if let Some(path_style_access) = &self.s3_path_style_access {
772                        iceberg_configs.insert(
773                            S3_PATH_STYLE_ACCESS.to_owned(),
774                            path_style_access.to_string(),
775                        );
776                    }
777                };
778
779                if let Some(credential) = &self.catalog_credential {
780                    iceberg_configs.insert("credential".to_owned(), credential.clone());
781                }
782                if let Some(token) = &self.catalog_token {
783                    iceberg_configs.insert("token".to_owned(), token.clone());
784                }
785                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
786                    iceberg_configs
787                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
788                }
789                if let Some(scope) = &self.catalog_scope {
790                    iceberg_configs.insert("scope".to_owned(), scope.clone());
791                }
792
793                let headers = self.headers()?;
794                for (header_name, header_value) in headers {
795                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
796                }
797
798                iceberg_configs.insert(
799                    iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
800                    self.catalog_uri
801                        .clone()
802                        .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
803                );
804                if let Some(warehouse_path) = &self.warehouse_path {
805                    iceberg_configs.insert(
806                        iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
807                        warehouse_path.clone(),
808                    );
809                }
810                let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
811                    .load("rest", iceberg_configs)
812                    .await
813                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
814                Ok(Arc::new(catalog))
815            }
816            "glue_rust" => {
817                let mut iceberg_configs = HashMap::new();
818                // glue
819                if let Some(region) = self.glue_region() {
820                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
821                }
822                if let Some(access_key) = self.glue_access_key() {
823                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
824                }
825                if let Some(secret_key) = self.glue_secret_key() {
826                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
827                }
828                // s3
829                if let Some(region) = &self.s3_region {
830                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
831                }
832                if let Some(endpoint) = &self.s3_endpoint {
833                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
834                }
835                if let Some(access_key) = &self.s3_access_key {
836                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
837                }
838                if let Some(secret_key) = &self.s3_secret_key {
839                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
840                }
841                if let Some(role_arn) = &self.s3_iam_role_arn {
842                    iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
843                }
844                if let Some(path_style_access) = &self.s3_path_style_access {
845                    iceberg_configs.insert(
846                        S3_PATH_STYLE_ACCESS.to_owned(),
847                        path_style_access.to_string(),
848                    );
849                }
850                iceberg_configs.insert(
851                    iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
852                    self.warehouse_path
853                        .clone()
854                        .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
855                );
856                if let Some(uri) = self.catalog_uri.as_deref() {
857                    iceberg_configs.insert(
858                        iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
859                        uri.to_owned(),
860                    );
861                }
862                let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
863                    .load("glue", iceberg_configs)
864                    .await
865                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
866                Ok(Arc::new(catalog))
867            }
868            catalog_type
869                if catalog_type == "hive"
870                    || catalog_type == "snowflake"
871                    || catalog_type == "jdbc"
872                    || catalog_type == "rest"
873                    || catalog_type == "glue" =>
874            {
875                // Create java catalog
876                let (file_io_props, java_catalog_props) =
877                    self.build_jni_catalog_configs(java_catalog_props)?;
878                let catalog_impl = match catalog_type {
879                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
880                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
881                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
882                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
883                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
884                    _ => unreachable!(),
885                };
886
887                jni_catalog::JniCatalog::build_catalog(
888                    file_io_props,
889                    self.catalog_name(),
890                    catalog_impl,
891                    java_catalog_props,
892                )
893            }
894            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
895            _ => {
896                bail!(
897                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
898                    self.catalog_type()
899                )
900            }
901        }
902    }
903
904    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
905    pub async fn load_table(
906        &self,
907        table: &IcebergTableIdentifier,
908        java_catalog_props: &HashMap<String, String>,
909    ) -> ConnectorResult<Table> {
910        let catalog = self
911            .create_catalog(java_catalog_props)
912            .await
913            .context("Unable to load iceberg catalog")?;
914
915        let table_id = table
916            .to_table_ident()
917            .context("Unable to parse table name")?;
918
919        let table = catalog.load_table(&table_id).await?;
920        Ok(rebuild_table_with_shared_cache(table).await)
921    }
922}
923
924/// Get a globally shared object cache keyed by table UUID to avoid reuse after drop & recreate.
925pub(crate) async fn shared_object_cache(
926    init_object_cache: Arc<ObjectCache>,
927    table_uuid: Uuid,
928) -> Arc<ObjectCache> {
929    static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
930        MokaCache::builder()
931            .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
932            .build()
933    });
934
935    CACHE
936        .get_with(table_uuid, async { init_object_cache })
937        .await
938}
939
940pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
941    let table_uuid = table.metadata().uuid();
942    let init_object_cache = table.object_cache();
943    let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
944    table.with_object_cache(object_cache)
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950
951    #[test]
952    fn test_iceberg_table_identifier_validation() {
953        let valid_identifier = IcebergTableIdentifier {
954            database_name: Some("valid_db".to_owned()),
955            table_name: "test_table".to_owned(),
956        };
957        assert!(valid_identifier.validate().is_ok());
958
959        let valid_underscore = IcebergTableIdentifier {
960            database_name: Some("valid_db_name".to_owned()),
961            table_name: "test_table".to_owned(),
962        };
963        assert!(valid_underscore.validate().is_ok());
964
965        let no_database = IcebergTableIdentifier {
966            database_name: None,
967            table_name: "test_table".to_owned(),
968        };
969        assert!(no_database.validate().is_ok());
970
971        let empty_part = IcebergTableIdentifier {
972            database_name: Some("a..b".to_owned()),
973            table_name: "test_table".to_owned(),
974        };
975        let result = empty_part.validate();
976        assert!(result.is_err());
977        assert!(
978            result
979                .unwrap_err()
980                .to_string()
981                .contains("identifier parts must not be empty")
982        );
983
984        let leading_dot = IcebergTableIdentifier {
985            database_name: None,
986            table_name: ".test_table".to_owned(),
987        };
988        let result = leading_dot.validate();
989        assert!(result.is_err());
990        assert!(
991            result
992                .unwrap_err()
993                .to_string()
994                .contains("identifier parts must not be empty")
995        );
996    }
997
998    #[test]
999    fn test_iceberg_table_identifier_dots_as_namespace_separators() {
1000        let table_ident = IcebergTableIdentifier {
1001            database_name: Some("general.zia.stats".to_owned()),
1002            table_name: "tagged_security_transactions".to_owned(),
1003        }
1004        .to_table_ident()
1005        .unwrap();
1006        let namespace: Vec<_> = table_ident
1007            .namespace()
1008            .as_ref()
1009            .iter()
1010            .map(String::as_str)
1011            .collect();
1012        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1013        assert_eq!(table_ident.name(), "tagged_security_transactions");
1014
1015        let table_ident = IcebergTableIdentifier {
1016            database_name: Some("general".to_owned()),
1017            table_name: "zia.stats.tagged_security_transactions".to_owned(),
1018        }
1019        .to_table_ident()
1020        .unwrap();
1021        let namespace: Vec<_> = table_ident
1022            .namespace()
1023            .as_ref()
1024            .iter()
1025            .map(String::as_str)
1026            .collect();
1027        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1028        assert_eq!(table_ident.name(), "tagged_security_transactions");
1029
1030        let table_ident = IcebergTableIdentifier {
1031            database_name: None,
1032            table_name: "general.zia.stats.tagged_security_transactions".to_owned(),
1033        }
1034        .to_table_ident()
1035        .unwrap();
1036        let namespace: Vec<_> = table_ident
1037            .namespace()
1038            .as_ref()
1039            .iter()
1040            .map(String::as_str)
1041            .collect();
1042        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1043        assert_eq!(table_ident.name(), "tagged_security_transactions");
1044    }
1045}