risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24    S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56    // Catalog type supported by iceberg, such as "storage", "rest".
57    // If not set, we use "storage" as default.
58    #[serde(rename = "catalog.type")]
59    pub catalog_type: Option<String>,
60    #[serde(rename = "s3.region")]
61    pub s3_region: Option<String>,
62    #[serde(rename = "s3.endpoint")]
63    pub s3_endpoint: Option<String>,
64    #[serde(rename = "s3.access.key")]
65    pub s3_access_key: Option<String>,
66    #[serde(rename = "s3.secret.key")]
67    pub s3_secret_key: Option<String>,
68    #[serde(rename = "s3.iam_role_arn")]
69    pub s3_iam_role_arn: Option<String>,
70
71    #[serde(rename = "glue.access.key")]
72    pub glue_access_key: Option<String>,
73    #[serde(rename = "glue.secret.key")]
74    pub glue_secret_key: Option<String>,
75    #[serde(rename = "glue.iam_role_arn")]
76    pub glue_iam_role_arn: Option<String>,
77    #[serde(rename = "glue.region")]
78    pub glue_region: Option<String>,
79    /// AWS Client id, can be omitted for storage catalog or when
80    /// caller's AWS account ID matches glue id
81    #[serde(rename = "glue.id")]
82    pub glue_id: Option<String>,
83
84    #[serde(rename = "gcs.credential")]
85    pub gcs_credential: Option<String>,
86
87    #[serde(rename = "azblob.account_name")]
88    pub azblob_account_name: Option<String>,
89    #[serde(rename = "azblob.account_key")]
90    pub azblob_account_key: Option<String>,
91    #[serde(rename = "azblob.endpoint_url")]
92    pub azblob_endpoint_url: Option<String>,
93
94    #[serde(rename = "adlsgen2.account_name")]
95    pub adlsgen2_account_name: Option<String>,
96    #[serde(rename = "adlsgen2.account_key")]
97    pub adlsgen2_account_key: Option<String>,
98    #[serde(rename = "adlsgen2.endpoint")]
99    pub adlsgen2_endpoint: Option<String>,
100
101    /// Path of iceberg warehouse.
102    #[serde(rename = "warehouse.path")]
103    pub warehouse_path: Option<String>,
104    /// Catalog name, default value is risingwave.
105    #[serde(rename = "catalog.name")]
106    pub catalog_name: Option<String>,
107    /// URI of iceberg catalog, only applicable in rest catalog.
108    #[serde(rename = "catalog.uri")]
109    pub catalog_uri: Option<String>,
110    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
111    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
112    #[serde(rename = "catalog.credential")]
113    pub catalog_credential: Option<String>,
114    /// token for accessing iceberg catalog, only applicable in rest catalog.
115    /// A Bearer token which will be used for interaction with the server.
116    #[serde(rename = "catalog.token")]
117    pub catalog_token: Option<String>,
118    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
119    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
120    #[serde(rename = "catalog.oauth2_server_uri")]
121    pub catalog_oauth2_server_uri: Option<String>,
122    /// scope for accessing iceberg catalog, only applicable in rest catalog.
123    /// Additional scope for `OAuth2`.
124    #[serde(rename = "catalog.scope")]
125    pub catalog_scope: Option<String>,
126
127    /// The signing region to use when signing requests to the REST catalog.
128    #[serde(rename = "catalog.rest.signing_region")]
129    pub rest_signing_region: Option<String>,
130
131    /// The signing name to use when signing requests to the REST catalog.
132    #[serde(rename = "catalog.rest.signing_name")]
133    pub rest_signing_name: Option<String>,
134
135    /// Whether to use `SigV4` for signing requests to the REST catalog.
136    #[serde(
137        rename = "catalog.rest.sigv4_enabled",
138        default,
139        deserialize_with = "deserialize_optional_bool_from_string"
140    )]
141    pub rest_sigv4_enabled: Option<bool>,
142
143    #[serde(
144        rename = "s3.path.style.access",
145        default,
146        deserialize_with = "deserialize_optional_bool_from_string"
147    )]
148    pub s3_path_style_access: Option<bool>,
149    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
150    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151    pub enable_config_load: Option<bool>,
152
153    /// This is only used by iceberg engine to enable the hosted catalog.
154    #[serde(
155        rename = "hosted_catalog",
156        default,
157        deserialize_with = "deserialize_optional_bool_from_string"
158    )]
159    pub hosted_catalog: Option<bool>,
160
161    /// The HTTP header to be used in catalog requests.
162    /// Example:
163    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
164    /// For Google Cloud Lakehouse Iceberg REST catalogs, set
165    /// `catalog.header = "x-goog-user-project=PROJECT_ID"` to specify the billing project.
166    /// Explain the format of the header:
167    /// - Each header is a key-value pair, separated by an '='.
168    /// - Multiple headers can be specified, separated by a ';'.
169    #[serde(rename = "catalog.header")]
170    pub catalog_header: Option<String>,
171
172    /// Enable vended credentials for Iceberg REST catalog.
173    /// For Google Cloud Lakehouse Iceberg REST catalogs, this sends
174    /// `X-Iceberg-Access-Delegation: vended-credentials`.
175    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
176    pub vended_credentials: Option<bool>,
177
178    /// Security type for REST catalog authentication.
179    /// Supported values: `none`, `oauth2`, `google`.
180    /// When set to `google`, uses Iceberg's `GoogleAuthManager` (requires Iceberg 1.10+)
181    /// for authentication using Google Application Default Credentials (ADC).
182    #[serde(rename = "catalog.security")]
183    pub catalog_security: Option<String>,
184
185    /// OAuth-based scopes for Google authentication.
186    /// Comma-separated list of OAuth-based scopes to request.
187    /// Only applicable when `catalog.security` is set to `google`.
188    /// Default: <https://www.googleapis.com/auth/cloud-platform>
189    #[serde(rename = "gcp.auth.scopes")]
190    pub gcp_auth_scopes: Option<String>,
191
192    /// Custom `FileIO` implementation class for the Iceberg catalog.
193    /// Allows specifying a custom `FileIO` implementation instead of the default.
194    /// Examples:
195    /// - `org.apache.iceberg.aws.s3.S3FileIO` for Amazon S3 (default)
196    /// - `org.apache.iceberg.gcp.gcs.GCSFileIO` for Google Cloud Storage
197    /// - `org.apache.iceberg.azure.adlsv2.ADLSFileIO` for Azure Data Lake Storage Gen2
198    /// Google Cloud Lakehouse Iceberg REST catalogs with credential vending require
199    /// `org.apache.iceberg.gcp.gcs.GCSFileIO`.
200    #[serde(rename = "catalog.io_impl")]
201    pub catalog_io_impl: Option<String>,
202}
203
204// Matches iceberg::io::object_cache default size (32MB).
205// TODO: change it after object cache get refactored.
206const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
207const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
208const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
209    SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
210
211impl EnforceSecret for IcebergCommon {
212    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
213        "s3.access.key",
214        "s3.secret.key",
215        "gcs.credential",
216        "catalog.credential",
217        "catalog.token",
218        "catalog.oauth2_server_uri",
219        "adlsgen2.account_key",
220        "adlsgen2.client_secret",
221        "glue.access.key",
222        "glue.secret.key",
223    };
224}
225
226#[serde_as]
227#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
228#[serde(deny_unknown_fields)]
229pub struct IcebergTableIdentifier {
230    #[serde(rename = "database.name")]
231    pub database_name: Option<String>,
232    /// Table name or namespace-qualified table name. Dots are treated as
233    /// Iceberg namespace separators.
234    #[serde(rename = "table.name")]
235    pub table_name: String,
236}
237
238impl IcebergTableIdentifier {
239    pub fn database_name(&self) -> Option<&str> {
240        self.database_name.as_deref()
241    }
242
243    pub fn table_name(&self) -> &str {
244        &self.table_name
245    }
246
247    fn identifier_parts(&self) -> ConnectorResult<Vec<&str>> {
248        let mut parts = Vec::new();
249        if let Some(database_name) = &self.database_name {
250            parts.extend(database_name.split('.'));
251        }
252        parts.extend(self.table_name.split('.'));
253
254        if parts.iter().any(|part| part.is_empty()) {
255            bail!(
256                "Invalid iceberg table identifier '{}': identifier parts must not be empty",
257                self.full_identifier()
258            );
259        }
260
261        Ok(parts)
262    }
263
264    fn full_identifier(&self) -> String {
265        match &self.database_name {
266            Some(database_name) => format!("{}.{}", database_name, self.table_name),
267            None => self.table_name.clone(),
268        }
269    }
270
271    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
272        let ret = TableIdent::from_strs(self.identifier_parts()?);
273
274        Ok(ret.context("Failed to create table identifier")?)
275    }
276
277    pub fn validate(&self) -> ConnectorResult<()> {
278        self.identifier_parts().map(|_| ())
279    }
280}
281
282impl IcebergCommon {
283    pub fn catalog_type(&self) -> &str {
284        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
285        if self.vended_credentials() && catalog_type == "rest" {
286            "rest_rust"
287        } else {
288            catalog_type
289        }
290    }
291
292    pub fn vended_credentials(&self) -> bool {
293        self.vended_credentials.unwrap_or(false)
294    }
295
296    fn glue_access_key(&self) -> Option<&str> {
297        self.glue_access_key
298            .as_deref()
299            .or(self.s3_access_key.as_deref())
300    }
301
302    fn glue_secret_key(&self) -> Option<&str> {
303        self.glue_secret_key
304            .as_deref()
305            .or(self.s3_secret_key.as_deref())
306    }
307
308    fn glue_region(&self) -> Option<&str> {
309        self.glue_region.as_deref().or(self.s3_region.as_deref())
310    }
311
312    pub fn catalog_name(&self) -> String {
313        self.catalog_name
314            .as_ref()
315            .cloned()
316            .unwrap_or_else(|| "risingwave".to_owned())
317    }
318
319    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
320        let mut headers = HashMap::new();
321        let user_agent = match Deployment::current() {
322            Deployment::Ci => "RisingWave(CI)".to_owned(),
323            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
324            Deployment::Other => "RisingWave(OSS)".to_owned(),
325        };
326        if self.vended_credentials() {
327            headers.insert(
328                "X-Iceberg-Access-Delegation".to_owned(),
329                "vended-credentials".to_owned(),
330            );
331        }
332        headers.insert("User-Agent".to_owned(), user_agent);
333        if let Some(header) = &self.catalog_header {
334            for pair in header.split(';') {
335                let mut parts = pair.split('=');
336                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
337                    headers.insert(key.to_owned(), value.to_owned());
338                } else {
339                    bail!("Invalid header format: {}", pair);
340                }
341            }
342        }
343        Ok(headers)
344    }
345
346    pub fn enable_config_load(&self) -> bool {
347        // If the env var is set to true, we disable the default config load. (Cloud environment)
348        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
349            if matches!(self.enable_config_load, Some(true)) {
350                tracing::warn!(
351                    "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
352                );
353            }
354            return false;
355        }
356        self.enable_config_load.unwrap_or(false)
357    }
358
359    /// For both V1 and V2.
360    fn build_jni_catalog_configs(
361        &self,
362        java_catalog_props: &HashMap<String, String>,
363    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
364        let mut iceberg_configs = HashMap::new();
365        let enable_config_load = self.enable_config_load();
366        let file_io_props = {
367            let catalog_type = self.catalog_type().to_owned();
368
369            if let Some(region) = &self.s3_region {
370                // iceberg-rust
371                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
372            }
373
374            if let Some(endpoint) = &self.s3_endpoint {
375                // iceberg-rust
376                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
377            }
378
379            // iceberg-rust
380            if let Some(access_key) = &self.s3_access_key {
381                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
382            }
383            if let Some(secret_key) = &self.s3_secret_key {
384                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
385            }
386            if let Some(role_arn) = &self.s3_iam_role_arn {
387                iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
388            }
389            if let Some(gcs_credential) = &self.gcs_credential {
390                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
391                if catalog_type != "rest" && catalog_type != "rest_rust" {
392                    bail!("gcs unsupported in {} catalog", &catalog_type);
393                }
394            }
395
396            if let (
397                Some(azblob_account_name),
398                Some(azblob_account_key),
399                Some(azblob_endpoint_url),
400            ) = (
401                &self.azblob_account_name,
402                &self.azblob_account_key,
403                &self.azblob_endpoint_url,
404            ) {
405                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
406                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
407                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
408
409                if catalog_type != "rest" && catalog_type != "rest_rust" {
410                    bail!("azblob unsupported in {} catalog", &catalog_type);
411                }
412            }
413
414            if let (Some(account_name), Some(account_key)) = (
415                self.adlsgen2_account_name.as_ref(),
416                self.adlsgen2_account_key.as_ref(),
417            ) {
418                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
419                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
420                if catalog_type != "rest" && catalog_type != "rest_rust" {
421                    bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
422                }
423            }
424
425            match &self.warehouse_path {
426                Some(warehouse_path) => {
427                    let (bucket, _) = {
428                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
429                        // Lakehouse Iceberg REST catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables.
430                        let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
431                        let url = Url::parse(warehouse_path);
432                        if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
433                            && (catalog_type == "rest" || catalog_type == "rest_rust")
434                        {
435                            // If the warehouse path is not a valid URL, it could be:
436                            // - A warehouse name in REST catalog
437                            // - An S3 Tables path (arn:aws:s3tables:...)
438                            // - A Lakehouse path (bq://projects/...) for Google Cloud BigQuery integration
439                            // We allow these to pass through for REST catalogs.
440                            (None, None)
441                        } else {
442                            let url = url.with_context(|| {
443                                format!("Invalid warehouse path: {}", warehouse_path)
444                            })?;
445                            let bucket = url
446                                .host_str()
447                                .with_context(|| {
448                                    format!(
449                                        "Invalid s3 path: {}, bucket is missing",
450                                        warehouse_path
451                                    )
452                                })?
453                                .to_owned();
454                            let root = url.path().trim_start_matches('/').to_owned();
455                            (Some(bucket), Some(root))
456                        }
457                    };
458
459                    if let Some(bucket) = bucket {
460                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
461                    }
462                }
463                None => {
464                    if catalog_type != "rest" && catalog_type != "rest_rust" {
465                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
466                    }
467                }
468            }
469            iceberg_configs.insert(
470                S3_DISABLE_CONFIG_LOAD.to_owned(),
471                (!enable_config_load).to_string(),
472            );
473
474            iceberg_configs.insert(
475                GCS_DISABLE_CONFIG_LOAD.to_owned(),
476                (!enable_config_load).to_string(),
477            );
478
479            if let Some(path_style_access) = self.s3_path_style_access {
480                iceberg_configs.insert(
481                    S3_PATH_STYLE_ACCESS.to_owned(),
482                    path_style_access.to_string(),
483                );
484            }
485
486            iceberg_configs
487        };
488
489        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
490        let mut java_catalog_configs = HashMap::new();
491        {
492            if let Some(uri) = self.catalog_uri.as_deref() {
493                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
494            }
495
496            if let Some(warehouse_path) = &self.warehouse_path {
497                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
498            }
499            java_catalog_configs.extend(java_catalog_props.clone());
500
501            // Set io-impl: use custom io-impl if provided, otherwise default to S3FileIO
502            let io_impl = self
503                .catalog_io_impl
504                .clone()
505                .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
506            java_catalog_configs.insert("io-impl".to_owned(), io_impl);
507
508            // suppress log of FileIO like: Unclosed FileIO instance created by...
509            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
510
511            if let Some(region) = &self.s3_region {
512                java_catalog_configs.insert("client.region".to_owned(), region.clone());
513            }
514            if let Some(endpoint) = &self.s3_endpoint {
515                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
516            }
517
518            if let Some(access_key) = &self.s3_access_key {
519                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
520            }
521            if let Some(secret_key) = &self.s3_secret_key {
522                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
523            }
524
525            if let Some(path_style_access) = &self.s3_path_style_access {
526                java_catalog_configs.insert(
527                    "s3.path-style-access".to_owned(),
528                    path_style_access.to_string(),
529                );
530            }
531
532            let headers = self.headers()?;
533            for (header_name, header_value) in headers {
534                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
535            }
536
537            match self.catalog_type() {
538                "rest" => {
539                    // Handle security type for REST catalog (Iceberg 1.10+)
540                    if let Some(security) = &self.catalog_security {
541                        match security.to_lowercase().as_str() {
542                            "google" => {
543                                // Google AuthManager (Iceberg 1.10+) - uses Google ADC
544                                java_catalog_configs.insert(
545                                    "rest.auth.type".to_owned(),
546                                    "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
547                                );
548                                // Set GCP auth scopes if provided
549                                if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
550                                    java_catalog_configs.insert(
551                                        "gcp.auth.scopes".to_owned(),
552                                        gcp_auth_scopes.clone(),
553                                    );
554                                }
555                            }
556                            "oauth2" => {
557                                // Standard OAuth2 authentication
558                                if let Some(credential) = &self.catalog_credential {
559                                    java_catalog_configs
560                                        .insert("credential".to_owned(), credential.clone());
561                                }
562                                if let Some(token) = &self.catalog_token {
563                                    java_catalog_configs.insert("token".to_owned(), token.clone());
564                                }
565                                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
566                                    java_catalog_configs.insert(
567                                        "oauth2-server-uri".to_owned(),
568                                        oauth2_server_uri.clone(),
569                                    );
570                                }
571                                if let Some(scope) = &self.catalog_scope {
572                                    java_catalog_configs.insert("scope".to_owned(), scope.clone());
573                                }
574                            }
575                            "none" | "" => {
576                                // No authentication
577                            }
578                            _ => {
579                                tracing::warn!(
580                                    "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
581                                    security
582                                );
583                            }
584                        }
585                    } else {
586                        // Legacy behavior: use individual OAuth2 properties if security type not specified
587                        if let Some(credential) = &self.catalog_credential {
588                            java_catalog_configs
589                                .insert("credential".to_owned(), credential.clone());
590                        }
591                        if let Some(token) = &self.catalog_token {
592                            java_catalog_configs.insert("token".to_owned(), token.clone());
593                        }
594                        if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
595                            java_catalog_configs
596                                .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
597                        }
598                        if let Some(scope) = &self.catalog_scope {
599                            java_catalog_configs.insert("scope".to_owned(), scope.clone());
600                        }
601                    }
602                    if let Some(rest_signing_region) = &self.rest_signing_region {
603                        java_catalog_configs.insert(
604                            "rest.signing-region".to_owned(),
605                            rest_signing_region.clone(),
606                        );
607                    }
608                    if let Some(rest_signing_name) = &self.rest_signing_name {
609                        java_catalog_configs
610                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
611                    }
612                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
613                        java_catalog_configs.insert(
614                            "rest.sigv4-enabled".to_owned(),
615                            rest_sigv4_enabled.to_string(),
616                        );
617
618                        if let Some(access_key) = &self.s3_access_key {
619                            java_catalog_configs
620                                .insert("rest.access-key-id".to_owned(), access_key.clone());
621                        }
622
623                        if let Some(secret_key) = &self.s3_secret_key {
624                            java_catalog_configs
625                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
626                        }
627                    }
628                }
629                "glue" => {
630                    let glue_access_key = self.glue_access_key();
631                    let glue_secret_key = self.glue_secret_key();
632                    let has_glue_credentials =
633                        glue_access_key.is_some() && glue_secret_key.is_some();
634                    let should_configure_glue_provider = !enable_config_load
635                        || has_glue_credentials
636                        || self.glue_iam_role_arn.is_some();
637
638                    if should_configure_glue_provider {
639                        java_catalog_configs.insert(
640                            "client.credentials-provider".to_owned(),
641                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
642                        );
643                        if let Some(region) = self.glue_region() {
644                            java_catalog_configs.insert(
645                                "client.credentials-provider.glue.region".to_owned(),
646                                region.to_owned(),
647                            );
648                        }
649                        if let Some(access_key) = glue_access_key {
650                            java_catalog_configs.insert(
651                                "client.credentials-provider.glue.access-key-id".to_owned(),
652                                access_key.to_owned(),
653                            );
654                        }
655                        if let Some(secret_key) = glue_secret_key {
656                            java_catalog_configs.insert(
657                                "client.credentials-provider.glue.secret-access-key".to_owned(),
658                                secret_key.to_owned(),
659                            );
660                        }
661                        if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
662                            java_catalog_configs.insert(
663                                "client.credentials-provider.glue.iam-role-arn".to_owned(),
664                                role_arn.to_owned(),
665                            );
666                        }
667                        if enable_config_load && !has_glue_credentials {
668                            java_catalog_configs.insert(
669                                "client.credentials-provider.glue.use-default-credential-chain"
670                                    .to_owned(),
671                                "true".to_owned(),
672                            );
673                        }
674                    }
675
676                    if let Some(region) = self.glue_region() {
677                        java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
678                        java_catalog_configs.insert(
679                            "glue.endpoint".to_owned(),
680                            format!("https://glue.{}.amazonaws.com", region),
681                        );
682                    }
683
684                    if let Some(glue_id) = self.glue_id.as_deref() {
685                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
686                    }
687                    self.apply_java_s3_file_io_assume_role_configs(&mut java_catalog_configs);
688                }
689                "jdbc" => {
690                    self.apply_java_aws_client_assume_role_configs(&mut java_catalog_configs);
691                }
692                _ => {}
693            }
694        }
695
696        Ok((file_io_props, java_catalog_configs))
697    }
698
699    fn apply_java_s3_file_io_assume_role_configs(
700        &self,
701        java_catalog_configs: &mut HashMap<String, String>,
702    ) {
703        if let Some(iam_role_arn) = &self.s3_iam_role_arn {
704            java_catalog_configs.insert(
705                "s3.client-factory-impl".to_owned(),
706                "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory".to_owned(),
707            );
708            java_catalog_configs.insert("s3.iam-role-arn".to_owned(), iam_role_arn.clone());
709        }
710    }
711
712    fn apply_java_aws_client_assume_role_configs(
713        &self,
714        java_catalog_configs: &mut HashMap<String, String>,
715    ) {
716        if let Some(iam_role_arn) = &self.s3_iam_role_arn {
717            java_catalog_configs.insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
718            java_catalog_configs.insert(
719                "client.factory".to_owned(),
720                "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
721            );
722            if let Some(region) = &self.s3_region {
723                java_catalog_configs.insert("client.assume-role.region".to_owned(), region.clone());
724            }
725        }
726    }
727}
728
729impl IcebergCommon {
730    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
731    pub async fn create_catalog(
732        &self,
733        java_catalog_props: &HashMap<String, String>,
734    ) -> ConnectorResult<Arc<dyn Catalog>> {
735        match self.catalog_type() {
736            "storage" => {
737                let warehouse = self
738                    .warehouse_path
739                    .clone()
740                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
741                let url = Url::parse(warehouse.as_ref())
742                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
743
744                let config = match url.scheme() {
745                    "s3" | "s3a" => StorageCatalogConfig::S3(
746                        storage_catalog::StorageCatalogS3Config::builder()
747                            .warehouse(warehouse)
748                            .access_key(self.s3_access_key.clone())
749                            .secret_key(self.s3_secret_key.clone())
750                            .region(self.s3_region.clone())
751                            .endpoint(self.s3_endpoint.clone())
752                            .path_style_access(self.s3_path_style_access)
753                            .enable_config_load(Some(self.enable_config_load()))
754                            .build(),
755                    ),
756                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
757                        storage_catalog::StorageCatalogGcsConfig::builder()
758                            .warehouse(warehouse)
759                            .credential(self.gcs_credential.clone())
760                            .enable_config_load(Some(self.enable_config_load()))
761                            .build(),
762                    ),
763                    "azblob" => StorageCatalogConfig::Azblob(
764                        storage_catalog::StorageCatalogAzblobConfig::builder()
765                            .warehouse(warehouse)
766                            .account_name(self.azblob_account_name.clone())
767                            .account_key(self.azblob_account_key.clone())
768                            .endpoint(self.azblob_endpoint_url.clone())
769                            .build(),
770                    ),
771                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
772                };
773
774                let catalog = storage_catalog::StorageCatalog::new(config)?;
775                Ok(Arc::new(catalog))
776            }
777            "rest_rust" => {
778                let mut iceberg_configs = HashMap::new();
779
780                // check gcs credential or s3 access key and secret key
781                if let Some(gcs_credential) = &self.gcs_credential {
782                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
783                } else {
784                    if let Some(region) = &self.s3_region {
785                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
786                    }
787                    if let Some(endpoint) = &self.s3_endpoint {
788                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
789                    }
790                    if let Some(access_key) = &self.s3_access_key {
791                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
792                    }
793                    if let Some(secret_key) = &self.s3_secret_key {
794                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
795                    }
796                    if let Some(path_style_access) = &self.s3_path_style_access {
797                        iceberg_configs.insert(
798                            S3_PATH_STYLE_ACCESS.to_owned(),
799                            path_style_access.to_string(),
800                        );
801                    }
802                };
803
804                if let Some(credential) = &self.catalog_credential {
805                    iceberg_configs.insert("credential".to_owned(), credential.clone());
806                }
807                if let Some(token) = &self.catalog_token {
808                    iceberg_configs.insert("token".to_owned(), token.clone());
809                }
810                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
811                    iceberg_configs
812                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
813                }
814                if let Some(scope) = &self.catalog_scope {
815                    iceberg_configs.insert("scope".to_owned(), scope.clone());
816                }
817
818                let headers = self.headers()?;
819                for (header_name, header_value) in headers {
820                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
821                }
822
823                iceberg_configs.insert(
824                    iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
825                    self.catalog_uri
826                        .clone()
827                        .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
828                );
829                if let Some(warehouse_path) = &self.warehouse_path {
830                    iceberg_configs.insert(
831                        iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
832                        warehouse_path.clone(),
833                    );
834                }
835                let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
836                    .load("rest", iceberg_configs)
837                    .await
838                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
839                Ok(Arc::new(catalog))
840            }
841            "glue_rust" => {
842                let mut iceberg_configs = HashMap::new();
843                // glue
844                if let Some(region) = self.glue_region() {
845                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
846                }
847                if let Some(access_key) = self.glue_access_key() {
848                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
849                }
850                if let Some(secret_key) = self.glue_secret_key() {
851                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
852                }
853                // s3
854                if let Some(region) = &self.s3_region {
855                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
856                }
857                if let Some(endpoint) = &self.s3_endpoint {
858                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
859                }
860                if let Some(access_key) = &self.s3_access_key {
861                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
862                }
863                if let Some(secret_key) = &self.s3_secret_key {
864                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
865                }
866                if let Some(role_arn) = &self.s3_iam_role_arn {
867                    iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
868                }
869                if let Some(path_style_access) = &self.s3_path_style_access {
870                    iceberg_configs.insert(
871                        S3_PATH_STYLE_ACCESS.to_owned(),
872                        path_style_access.to_string(),
873                    );
874                }
875                iceberg_configs.insert(
876                    iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
877                    self.warehouse_path
878                        .clone()
879                        .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
880                );
881                if let Some(uri) = self.catalog_uri.as_deref() {
882                    iceberg_configs.insert(
883                        iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
884                        uri.to_owned(),
885                    );
886                }
887                let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
888                    .load("glue", iceberg_configs)
889                    .await
890                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
891                Ok(Arc::new(catalog))
892            }
893            catalog_type
894                if catalog_type == "hive"
895                    || catalog_type == "snowflake"
896                    || catalog_type == "jdbc"
897                    || catalog_type == "rest"
898                    || catalog_type == "glue" =>
899            {
900                // Create java catalog
901                let (file_io_props, java_catalog_props) =
902                    self.build_jni_catalog_configs(java_catalog_props)?;
903                let catalog_impl = match catalog_type {
904                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
905                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
906                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
907                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
908                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
909                    _ => unreachable!(),
910                };
911
912                jni_catalog::JniCatalog::build_catalog(
913                    file_io_props,
914                    self.catalog_name(),
915                    catalog_impl,
916                    java_catalog_props,
917                )
918            }
919            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
920            _ => {
921                bail!(
922                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
923                    self.catalog_type()
924                )
925            }
926        }
927    }
928
929    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
930    pub async fn load_table(
931        &self,
932        table: &IcebergTableIdentifier,
933        java_catalog_props: &HashMap<String, String>,
934    ) -> ConnectorResult<Table> {
935        let catalog = self
936            .create_catalog(java_catalog_props)
937            .await
938            .context("Unable to load iceberg catalog")?;
939
940        let table_id = table
941            .to_table_ident()
942            .context("Unable to parse table name")?;
943
944        let table = catalog.load_table(&table_id).await?;
945        Ok(rebuild_table_with_shared_cache(table).await)
946    }
947}
948
949/// Get a globally shared object cache keyed by table UUID to avoid reuse after drop & recreate.
950pub(crate) async fn shared_object_cache(
951    init_object_cache: Arc<ObjectCache>,
952    table_uuid: Uuid,
953) -> Arc<ObjectCache> {
954    static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
955        MokaCache::builder()
956            .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
957            .build()
958    });
959
960    CACHE
961        .get_with(table_uuid, async { init_object_cache })
962        .await
963}
964
965pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
966    let table_uuid = table.metadata().uuid();
967    let init_object_cache = table.object_cache();
968    let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
969    table.with_object_cache(object_cache)
970}
971
972#[cfg(test)]
973mod tests {
974    use std::collections::HashMap;
975
976    use super::*;
977
978    fn test_common(catalog_type: &str) -> IcebergCommon {
979        IcebergCommon {
980            catalog_type: Some(catalog_type.to_owned()),
981            s3_region: Some("ap-southeast-2".to_owned()),
982            s3_endpoint: None,
983            s3_access_key: None,
984            s3_secret_key: None,
985            s3_iam_role_arn: None,
986            glue_access_key: None,
987            glue_secret_key: None,
988            glue_iam_role_arn: None,
989            glue_region: None,
990            glue_id: None,
991            gcs_credential: None,
992            azblob_account_name: None,
993            azblob_account_key: None,
994            azblob_endpoint_url: None,
995            adlsgen2_account_name: None,
996            adlsgen2_account_key: None,
997            adlsgen2_endpoint: None,
998            warehouse_path: Some("s3://bucket/warehouse".to_owned()),
999            catalog_name: None,
1000            catalog_uri: None,
1001            catalog_credential: None,
1002            catalog_token: None,
1003            catalog_oauth2_server_uri: None,
1004            catalog_scope: None,
1005            rest_signing_region: None,
1006            rest_signing_name: None,
1007            rest_sigv4_enabled: None,
1008            s3_path_style_access: None,
1009            enable_config_load: None,
1010            hosted_catalog: None,
1011            catalog_header: None,
1012            vended_credentials: None,
1013            catalog_security: None,
1014            gcp_auth_scopes: None,
1015            catalog_io_impl: None,
1016        }
1017    }
1018
1019    #[test]
1020    fn test_glue_jni_catalog_uses_s3_assume_role_for_file_io() {
1021        let common = IcebergCommon {
1022            s3_iam_role_arn: Some("arn:aws:iam::123456789012:role/risingwave-s3".to_owned()),
1023            ..test_common("glue")
1024        };
1025
1026        let (_, java_catalog_configs) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1027
1028        assert_eq!(
1029            java_catalog_configs.get("s3.client-factory-impl").unwrap(),
1030            "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory"
1031        );
1032        assert_eq!(
1033            java_catalog_configs.get("s3.iam-role-arn").unwrap(),
1034            "arn:aws:iam::123456789012:role/risingwave-s3"
1035        );
1036        assert!(!java_catalog_configs.contains_key("client.factory"));
1037    }
1038
1039    #[test]
1040    fn test_iceberg_table_identifier_validation() {
1041        let valid_identifier = IcebergTableIdentifier {
1042            database_name: Some("valid_db".to_owned()),
1043            table_name: "test_table".to_owned(),
1044        };
1045        assert!(valid_identifier.validate().is_ok());
1046
1047        let valid_underscore = IcebergTableIdentifier {
1048            database_name: Some("valid_db_name".to_owned()),
1049            table_name: "test_table".to_owned(),
1050        };
1051        assert!(valid_underscore.validate().is_ok());
1052
1053        let no_database = IcebergTableIdentifier {
1054            database_name: None,
1055            table_name: "test_table".to_owned(),
1056        };
1057        assert!(no_database.validate().is_ok());
1058
1059        let empty_part = IcebergTableIdentifier {
1060            database_name: Some("a..b".to_owned()),
1061            table_name: "test_table".to_owned(),
1062        };
1063        let result = empty_part.validate();
1064        assert!(result.is_err());
1065        assert!(
1066            result
1067                .unwrap_err()
1068                .to_string()
1069                .contains("identifier parts must not be empty")
1070        );
1071
1072        let leading_dot = IcebergTableIdentifier {
1073            database_name: None,
1074            table_name: ".test_table".to_owned(),
1075        };
1076        let result = leading_dot.validate();
1077        assert!(result.is_err());
1078        assert!(
1079            result
1080                .unwrap_err()
1081                .to_string()
1082                .contains("identifier parts must not be empty")
1083        );
1084    }
1085
1086    #[test]
1087    fn test_iceberg_table_identifier_dots_as_namespace_separators() {
1088        let table_ident = IcebergTableIdentifier {
1089            database_name: Some("general.zia.stats".to_owned()),
1090            table_name: "tagged_security_transactions".to_owned(),
1091        }
1092        .to_table_ident()
1093        .unwrap();
1094        let namespace: Vec<_> = table_ident
1095            .namespace()
1096            .as_ref()
1097            .iter()
1098            .map(String::as_str)
1099            .collect();
1100        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1101        assert_eq!(table_ident.name(), "tagged_security_transactions");
1102
1103        let table_ident = IcebergTableIdentifier {
1104            database_name: Some("general".to_owned()),
1105            table_name: "zia.stats.tagged_security_transactions".to_owned(),
1106        }
1107        .to_table_ident()
1108        .unwrap();
1109        let namespace: Vec<_> = table_ident
1110            .namespace()
1111            .as_ref()
1112            .iter()
1113            .map(String::as_str)
1114            .collect();
1115        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1116        assert_eq!(table_ident.name(), "tagged_security_transactions");
1117
1118        let table_ident = IcebergTableIdentifier {
1119            database_name: None,
1120            table_name: "general.zia.stats.tagged_security_transactions".to_owned(),
1121        }
1122        .to_table_ident()
1123        .unwrap();
1124        let namespace: Vec<_> = table_ident
1125            .namespace()
1126            .as_ref()
1127            .iter()
1128            .map(String::as_str)
1129            .collect();
1130        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1131        assert_eq!(table_ident.name(), "tagged_security_transactions");
1132    }
1133}