Skip to main content

risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24    S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET,
32    ADLS_TENANT_ID, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
33    GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
34};
35use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
36use moka::future::Cache as MokaCache;
37use phf::{Set, phf_set};
38use risingwave_common::bail;
39use risingwave_common::error::IcebergError;
40use risingwave_common::util::deployment::Deployment;
41use risingwave_common::util::env_var::env_var_is_true;
42use serde::Deserialize;
43use serde_with::serde_as;
44use url::Url;
45use uuid::Uuid;
46use with_options::WithOptions;
47
48use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
49use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
50use crate::deserialize_optional_bool_from_string;
51use crate::enforce_secret::EnforceSecret;
52use crate::error::ConnectorResult;
53
54#[serde_as]
55#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
56pub struct IcebergCommon {
57    // Catalog type supported by iceberg, such as "storage", "rest".
58    // If not set, we use "storage" as default.
59    #[serde(rename = "catalog.type")]
60    pub catalog_type: Option<String>,
61    #[serde(rename = "s3.region")]
62    pub s3_region: Option<String>,
63    #[serde(rename = "s3.endpoint")]
64    pub s3_endpoint: Option<String>,
65    #[serde(rename = "s3.access.key")]
66    pub s3_access_key: Option<String>,
67    #[serde(rename = "s3.secret.key")]
68    pub s3_secret_key: Option<String>,
69    #[serde(rename = "s3.iam_role_arn")]
70    pub s3_iam_role_arn: Option<String>,
71
72    #[serde(rename = "glue.access.key")]
73    pub glue_access_key: Option<String>,
74    #[serde(rename = "glue.secret.key")]
75    pub glue_secret_key: Option<String>,
76    #[serde(rename = "glue.iam_role_arn")]
77    pub glue_iam_role_arn: Option<String>,
78    #[serde(rename = "glue.region")]
79    pub glue_region: Option<String>,
80    /// AWS Client id, can be omitted for storage catalog or when
81    /// caller's AWS account ID matches glue id
82    #[serde(rename = "glue.id")]
83    pub glue_id: Option<String>,
84
85    #[serde(rename = "gcs.credential")]
86    pub gcs_credential: Option<String>,
87
88    #[serde(rename = "azblob.account_name")]
89    pub azblob_account_name: Option<String>,
90    #[serde(rename = "azblob.account_key")]
91    pub azblob_account_key: Option<String>,
92    #[serde(rename = "azblob.endpoint_url")]
93    pub azblob_endpoint_url: Option<String>,
94
95    #[serde(rename = "adlsgen2.account_name")]
96    pub adlsgen2_account_name: Option<String>,
97    #[serde(rename = "adlsgen2.account_key")]
98    pub adlsgen2_account_key: Option<String>,
99    #[serde(rename = "adlsgen2.endpoint")]
100    pub adlsgen2_endpoint: Option<String>,
101    #[serde(rename = "adlsgen2.tenant_id")]
102    pub adlsgen2_tenant_id: Option<String>,
103    #[serde(rename = "adlsgen2.client_id")]
104    pub adlsgen2_client_id: Option<String>,
105    #[serde(rename = "adlsgen2.client_secret")]
106    pub adlsgen2_client_secret: Option<String>,
107    #[serde(rename = "adlsgen2.authority_host")]
108    pub adlsgen2_authority_host: Option<String>,
109
110    /// Path of iceberg warehouse.
111    #[serde(rename = "warehouse.path")]
112    pub warehouse_path: Option<String>,
113    /// Catalog name, default value is risingwave.
114    #[serde(rename = "catalog.name")]
115    pub catalog_name: Option<String>,
116    /// URI of iceberg catalog, only applicable in rest catalog.
117    #[serde(rename = "catalog.uri")]
118    pub catalog_uri: Option<String>,
119    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
120    /// A credential to exchange for a token in the `OAuth2` client credentials flow.
121    #[serde(rename = "catalog.credential")]
122    pub catalog_credential: Option<String>,
123    /// token for accessing iceberg catalog, only applicable in rest catalog.
124    /// A Bearer token which will be used for interaction with the server.
125    #[serde(rename = "catalog.token")]
126    pub catalog_token: Option<String>,
127    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
128    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
129    #[serde(rename = "catalog.oauth2_server_uri")]
130    pub catalog_oauth2_server_uri: Option<String>,
131    /// scope for accessing iceberg catalog, only applicable in rest catalog.
132    /// Additional scope for `OAuth2`.
133    #[serde(rename = "catalog.scope")]
134    pub catalog_scope: Option<String>,
135
136    /// The signing region to use when signing requests to the REST catalog.
137    #[serde(rename = "catalog.rest.signing_region")]
138    pub rest_signing_region: Option<String>,
139
140    /// The signing name to use when signing requests to the REST catalog.
141    #[serde(rename = "catalog.rest.signing_name")]
142    pub rest_signing_name: Option<String>,
143
144    /// Whether to use `SigV4` for signing requests to the REST catalog.
145    #[serde(
146        rename = "catalog.rest.sigv4_enabled",
147        default,
148        deserialize_with = "deserialize_optional_bool_from_string"
149    )]
150    pub rest_sigv4_enabled: Option<bool>,
151
152    #[serde(
153        rename = "s3.path.style.access",
154        default,
155        deserialize_with = "deserialize_optional_bool_from_string"
156    )]
157    pub s3_path_style_access: Option<bool>,
158    /// Enable config load. This parameter set to true will load warehouse credentials from the environment. Only allowed to be used in a self-hosted environment.
159    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
160    pub enable_config_load: Option<bool>,
161
162    /// This is only used by iceberg engine to enable the hosted catalog.
163    #[serde(
164        rename = "hosted_catalog",
165        default,
166        deserialize_with = "deserialize_optional_bool_from_string"
167    )]
168    pub hosted_catalog: Option<bool>,
169
170    /// The HTTP header to be used in catalog requests.
171    /// Example:
172    /// `catalog.header = "key1=value1;key2=value2;key3=value3"`
173    /// For Google Cloud Lakehouse Iceberg REST catalogs, set
174    /// `catalog.header = "x-goog-user-project=PROJECT_ID"` to specify the billing project.
175    /// Explain the format of the header:
176    /// - Each header is a key-value pair, separated by an '='.
177    /// - Multiple headers can be specified, separated by a ';'.
178    #[serde(rename = "catalog.header")]
179    pub catalog_header: Option<String>,
180
181    /// Enable vended credentials for Iceberg REST catalog.
182    /// For Google Cloud Lakehouse Iceberg REST catalogs, this sends
183    /// `X-Iceberg-Access-Delegation: vended-credentials`.
184    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
185    pub vended_credentials: Option<bool>,
186
187    /// Security type for REST catalog authentication.
188    /// Supported values: `none`, `oauth2`, `google`.
189    /// When set to `google`, uses Iceberg's `GoogleAuthManager` (requires Iceberg 1.10+)
190    /// for authentication using Google Application Default Credentials (ADC).
191    #[serde(rename = "catalog.security")]
192    pub catalog_security: Option<String>,
193
194    /// OAuth-based scopes for Google authentication.
195    /// Comma-separated list of OAuth-based scopes to request.
196    /// Only applicable when `catalog.security` is set to `google`.
197    /// Default: <https://www.googleapis.com/auth/cloud-platform>
198    #[serde(rename = "gcp.auth.scopes")]
199    pub gcp_auth_scopes: Option<String>,
200
201    /// Custom `FileIO` implementation class for the Iceberg catalog.
202    /// Allows specifying a custom `FileIO` implementation instead of the default.
203    /// Examples:
204    /// - `org.apache.iceberg.aws.s3.S3FileIO` for Amazon S3 (default)
205    /// - `org.apache.iceberg.gcp.gcs.GCSFileIO` for Google Cloud Storage
206    /// - `org.apache.iceberg.azure.adlsv2.ADLSFileIO` for Azure Data Lake Storage Gen2
207    /// Google Cloud Lakehouse Iceberg REST catalogs with credential vending require
208    /// `org.apache.iceberg.gcp.gcs.GCSFileIO`.
209    #[serde(rename = "catalog.io_impl")]
210    pub catalog_io_impl: Option<String>,
211}
212
213// Matches iceberg::io::object_cache default size (32MB).
214// TODO: change it after object cache get refactored.
215const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
216const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
217const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
218    SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
219
220/// Default Microsoft Entra (AAD) authority host for public Azure. Sovereign-cloud
221/// users override via `adlsgen2.authority_host`.
222const ADLS_DEFAULT_AUTHORITY_HOST: &str = "https://login.microsoftonline.com";
223
224impl EnforceSecret for IcebergCommon {
225    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
226        "s3.access.key",
227        "s3.secret.key",
228        "gcs.credential",
229        "catalog.credential",
230        "catalog.token",
231        "catalog.oauth2_server_uri",
232        "adlsgen2.account_key",
233        "adlsgen2.client_secret",
234        "glue.access.key",
235        "glue.secret.key",
236    };
237}
238
239#[serde_as]
240#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
241#[serde(deny_unknown_fields)]
242pub struct IcebergTableIdentifier {
243    #[serde(rename = "database.name")]
244    pub database_name: Option<String>,
245    /// Table name or namespace-qualified table name. Dots are treated as
246    /// Iceberg namespace separators.
247    #[serde(rename = "table.name")]
248    pub table_name: String,
249}
250
251impl IcebergTableIdentifier {
252    pub fn database_name(&self) -> Option<&str> {
253        self.database_name.as_deref()
254    }
255
256    pub fn table_name(&self) -> &str {
257        &self.table_name
258    }
259
260    fn identifier_parts(&self) -> ConnectorResult<Vec<&str>> {
261        let mut parts = Vec::new();
262        if let Some(database_name) = &self.database_name {
263            parts.extend(database_name.split('.'));
264        }
265        parts.extend(self.table_name.split('.'));
266
267        if parts.iter().any(|part| part.is_empty()) {
268            bail!(
269                "Invalid iceberg table identifier '{}': identifier parts must not be empty",
270                self.full_identifier()
271            );
272        }
273
274        Ok(parts)
275    }
276
277    fn full_identifier(&self) -> String {
278        match &self.database_name {
279            Some(database_name) => format!("{}.{}", database_name, self.table_name),
280            None => self.table_name.clone(),
281        }
282    }
283
284    pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
285        let ret = TableIdent::from_strs(self.identifier_parts()?);
286
287        Ok(ret.context("Failed to create table identifier")?)
288    }
289
290    pub fn validate(&self) -> ConnectorResult<()> {
291        self.identifier_parts().map(|_| ())
292    }
293}
294
295impl IcebergCommon {
296    pub fn catalog_type(&self) -> &str {
297        let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
298        if self.vended_credentials() && catalog_type == "rest" {
299            "rest_rust"
300        } else {
301            catalog_type
302        }
303    }
304
305    pub fn vended_credentials(&self) -> bool {
306        self.vended_credentials.unwrap_or(false)
307    }
308
309    fn glue_access_key(&self) -> Option<&str> {
310        self.glue_access_key
311            .as_deref()
312            .or(self.s3_access_key.as_deref())
313    }
314
315    fn glue_secret_key(&self) -> Option<&str> {
316        self.glue_secret_key
317            .as_deref()
318            .or(self.s3_secret_key.as_deref())
319    }
320
321    fn glue_region(&self) -> Option<&str> {
322        self.glue_region.as_deref().or(self.s3_region.as_deref())
323    }
324
325    pub fn catalog_name(&self) -> String {
326        self.catalog_name
327            .as_ref()
328            .cloned()
329            .unwrap_or_else(|| "risingwave".to_owned())
330    }
331
332    pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
333        let mut headers = HashMap::new();
334        let user_agent = match Deployment::current() {
335            Deployment::Ci => "RisingWave(CI)".to_owned(),
336            Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
337            Deployment::Other => "RisingWave(OSS)".to_owned(),
338        };
339        if self.vended_credentials() {
340            headers.insert(
341                "X-Iceberg-Access-Delegation".to_owned(),
342                "vended-credentials".to_owned(),
343            );
344        }
345        headers.insert("User-Agent".to_owned(), user_agent);
346        if let Some(header) = &self.catalog_header {
347            for pair in header.split(';') {
348                let mut parts = pair.split('=');
349                if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
350                    headers.insert(key.to_owned(), value.to_owned());
351                } else {
352                    bail!("Invalid header format: {}", pair);
353                }
354            }
355        }
356        Ok(headers)
357    }
358
359    pub fn enable_config_load(&self) -> bool {
360        // If the env var is set to true, we disable the default config load. (Cloud environment)
361        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
362            if matches!(self.enable_config_load, Some(true)) {
363                tracing::warn!(
364                    "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
365                );
366            }
367            return false;
368        }
369        self.enable_config_load.unwrap_or(false)
370    }
371
372    /// For both V1 and V2.
373    fn build_jni_catalog_configs(
374        &self,
375        java_catalog_props: &HashMap<String, String>,
376    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
377        let mut iceberg_configs = HashMap::new();
378        let enable_config_load = self.enable_config_load();
379        let file_io_props = {
380            let catalog_type = self.catalog_type().to_owned();
381
382            // Non-S3/Glue object-store backends only work with a REST catalog. This
383            // function is only invoked for catalog_type in {hive, snowflake, jdbc, rest,
384            // glue}, so the only accepted value here is "rest".
385            let require_rest = |backend: &str| -> ConnectorResult<()> {
386                if catalog_type != "rest" {
387                    bail!("{} unsupported in {} catalog", backend, &catalog_type);
388                }
389                Ok(())
390            };
391
392            if let Some(region) = &self.s3_region {
393                // iceberg-rust
394                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
395            }
396
397            if let Some(endpoint) = &self.s3_endpoint {
398                // iceberg-rust
399                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
400            }
401
402            // iceberg-rust
403            if let Some(access_key) = &self.s3_access_key {
404                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
405            }
406            if let Some(secret_key) = &self.s3_secret_key {
407                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
408            }
409            if let Some(role_arn) = &self.s3_iam_role_arn {
410                iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
411            }
412            if let Some(gcs_credential) = &self.gcs_credential {
413                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
414                require_rest("gcs")?;
415            }
416
417            if let (
418                Some(azblob_account_name),
419                Some(azblob_account_key),
420                Some(azblob_endpoint_url),
421            ) = (
422                &self.azblob_account_name,
423                &self.azblob_account_key,
424                &self.azblob_endpoint_url,
425            ) {
426                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
427                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
428                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
429
430                require_rest("azblob")?;
431            }
432
433            // Validate adlsgen2 auth configuration before populating iceberg_configs.
434            // Treat empty and whitespace-only strings as unset — serde surfaces
435            // `adlsgen2.tenant_id = ''` (or a value with trailing `\n` from a copy-paste)
436            // as `Some("...")` which would pass `is_some()` but break downstream auth.
437            fn nonempty(v: &Option<String>) -> Option<&str> {
438                v.as_deref().filter(|s| !s.trim().is_empty())
439            }
440            let sp_tenant = nonempty(&self.adlsgen2_tenant_id);
441            let sp_client = nonempty(&self.adlsgen2_client_id);
442            let sp_secret = nonempty(&self.adlsgen2_client_secret);
443            let sp_authority = nonempty(&self.adlsgen2_authority_host);
444            let sk_account_name = nonempty(&self.adlsgen2_account_name);
445            let sk_account_key = nonempty(&self.adlsgen2_account_key);
446            let any_sp_field = sp_tenant.is_some()
447                || sp_client.is_some()
448                || sp_secret.is_some()
449                || sp_authority.is_some();
450            let all_sp_required = sp_tenant.is_some() && sp_client.is_some() && sp_secret.is_some();
451
452            if sk_account_key.is_some() && any_sp_field {
453                bail!(
454                    "adlsgen2: cannot configure both shared-key auth \
455                     (adlsgen2.account_key) and service-principal auth \
456                     (adlsgen2.tenant_id / adlsgen2.client_id / adlsgen2.client_secret / \
457                     adlsgen2.authority_host) simultaneously. Specify exactly one auth mode."
458                );
459            }
460            if any_sp_field && !all_sp_required {
461                bail!(
462                    "adlsgen2: service-principal auth requires all three of \
463                     adlsgen2.tenant_id, adlsgen2.client_id, and adlsgen2.client_secret \
464                     to be set. (adlsgen2.authority_host is optional and defaults to the \
465                     public Azure AAD endpoint.)"
466                );
467            }
468            // Defense in depth: reqsign POSTs the OAuth token request — carrying the
469            // client_secret to this host. Require a bare https origin: no userinfo,
470            // no query, no fragment, and no path beyond "/". The value itself is not
471            // echoed into error messages in case a user pasted a secret by mistake.
472            if let Some(host) = sp_authority {
473                let parsed = Url::parse(host).map_err(|_| {
474                    anyhow!(
475                        "adlsgen2.authority_host does not parse as a URL ({} chars)",
476                        host.len()
477                    )
478                })?;
479                if parsed.scheme() != "https" {
480                    bail!(
481                        "adlsgen2.authority_host must use the https scheme, got {}",
482                        parsed.scheme()
483                    );
484                }
485                if !parsed.username().is_empty() || parsed.password().is_some() {
486                    bail!("adlsgen2.authority_host must not contain userinfo");
487                }
488                if parsed.query().is_some() || parsed.fragment().is_some() {
489                    bail!("adlsgen2.authority_host must not contain a query or fragment");
490                }
491                if !matches!(parsed.path(), "" | "/") {
492                    bail!("adlsgen2.authority_host must not contain a path component");
493                }
494            }
495
496            if let (Some(account_name), Some(account_key)) = (sk_account_name, sk_account_key) {
497                iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.to_owned());
498                iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.to_owned());
499                require_rest("adlsgen2")?;
500            }
501
502            if let (Some(tenant_id), Some(client_id), Some(client_secret)) =
503                (sp_tenant, sp_client, sp_secret)
504            {
505                iceberg_configs.insert(ADLS_TENANT_ID.to_owned(), tenant_id.to_owned());
506                iceberg_configs.insert(ADLS_CLIENT_ID.to_owned(), client_id.to_owned());
507                iceberg_configs.insert(ADLS_CLIENT_SECRET.to_owned(), client_secret.to_owned());
508                // Strip trailing slash to prevent double slash
509                let authority_host = sp_authority
510                    .unwrap_or(ADLS_DEFAULT_AUTHORITY_HOST)
511                    .trim_end_matches('/')
512                    .to_owned();
513                iceberg_configs.insert(ADLS_AUTHORITY_HOST.to_owned(), authority_host);
514                require_rest("adlsgen2")?;
515            }
516
517            match &self.warehouse_path {
518                Some(warehouse_path) => {
519                    let (bucket, _) = {
520                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
521                        // Lakehouse Iceberg REST catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables.
522                        let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
523                        let url = Url::parse(warehouse_path);
524                        if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
525                            && (catalog_type == "rest" || catalog_type == "rest_rust")
526                        {
527                            // If the warehouse path is not a valid URL, it could be:
528                            // - A warehouse name in REST catalog
529                            // - An S3 Tables path (arn:aws:s3tables:...)
530                            // - A Lakehouse path (bq://projects/...) for Google Cloud BigQuery integration
531                            // We allow these to pass through for REST catalogs.
532                            (None, None)
533                        } else {
534                            let url = url.with_context(|| {
535                                format!("Invalid warehouse path: {}", warehouse_path)
536                            })?;
537                            let bucket = url
538                                .host_str()
539                                .with_context(|| {
540                                    format!(
541                                        "Invalid s3 path: {}, bucket is missing",
542                                        warehouse_path
543                                    )
544                                })?
545                                .to_owned();
546                            let root = url.path().trim_start_matches('/').to_owned();
547                            (Some(bucket), Some(root))
548                        }
549                    };
550
551                    if let Some(bucket) = bucket {
552                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
553                    }
554                }
555                None => {
556                    if catalog_type != "rest" && catalog_type != "rest_rust" {
557                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
558                    }
559                }
560            }
561            iceberg_configs.insert(
562                S3_DISABLE_CONFIG_LOAD.to_owned(),
563                (!enable_config_load).to_string(),
564            );
565
566            iceberg_configs.insert(
567                GCS_DISABLE_CONFIG_LOAD.to_owned(),
568                (!enable_config_load).to_string(),
569            );
570
571            if let Some(path_style_access) = self.s3_path_style_access {
572                iceberg_configs.insert(
573                    S3_PATH_STYLE_ACCESS.to_owned(),
574                    path_style_access.to_string(),
575                );
576            }
577
578            iceberg_configs
579        };
580
581        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
582        let mut java_catalog_configs = HashMap::new();
583        {
584            if let Some(uri) = self.catalog_uri.as_deref() {
585                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
586            }
587
588            if let Some(warehouse_path) = &self.warehouse_path {
589                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
590            }
591            java_catalog_configs.extend(java_catalog_props.clone());
592
593            // Set io-impl: use custom io-impl if provided, otherwise default to S3FileIO
594            let io_impl = self
595                .catalog_io_impl
596                .clone()
597                .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
598            java_catalog_configs.insert("io-impl".to_owned(), io_impl);
599
600            // suppress log of FileIO like: Unclosed FileIO instance created by...
601            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
602
603            if let Some(region) = &self.s3_region {
604                java_catalog_configs.insert("client.region".to_owned(), region.clone());
605            }
606            if let Some(endpoint) = &self.s3_endpoint {
607                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
608            }
609
610            if let Some(access_key) = &self.s3_access_key {
611                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
612            }
613            if let Some(secret_key) = &self.s3_secret_key {
614                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
615            }
616
617            if let Some(path_style_access) = &self.s3_path_style_access {
618                java_catalog_configs.insert(
619                    "s3.path-style-access".to_owned(),
620                    path_style_access.to_string(),
621                );
622            }
623
624            let headers = self.headers()?;
625            for (header_name, header_value) in headers {
626                java_catalog_configs.insert(format!("header.{}", header_name), header_value);
627            }
628
629            match self.catalog_type() {
630                "rest" => {
631                    // Handle security type for REST catalog (Iceberg 1.10+)
632                    if let Some(security) = &self.catalog_security {
633                        match security.to_lowercase().as_str() {
634                            "google" => {
635                                // Google AuthManager (Iceberg 1.10+) - uses Google ADC
636                                java_catalog_configs.insert(
637                                    "rest.auth.type".to_owned(),
638                                    "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
639                                );
640                                // Set GCP auth scopes if provided
641                                if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
642                                    java_catalog_configs.insert(
643                                        "gcp.auth.scopes".to_owned(),
644                                        gcp_auth_scopes.clone(),
645                                    );
646                                }
647                            }
648                            "oauth2" => {
649                                // Standard OAuth2 authentication
650                                if let Some(credential) = &self.catalog_credential {
651                                    java_catalog_configs
652                                        .insert("credential".to_owned(), credential.clone());
653                                }
654                                if let Some(token) = &self.catalog_token {
655                                    java_catalog_configs.insert("token".to_owned(), token.clone());
656                                }
657                                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
658                                    java_catalog_configs.insert(
659                                        "oauth2-server-uri".to_owned(),
660                                        oauth2_server_uri.clone(),
661                                    );
662                                }
663                                if let Some(scope) = &self.catalog_scope {
664                                    java_catalog_configs.insert("scope".to_owned(), scope.clone());
665                                }
666                            }
667                            "none" | "" => {
668                                // No authentication
669                            }
670                            _ => {
671                                tracing::warn!(
672                                    "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
673                                    security
674                                );
675                            }
676                        }
677                    } else {
678                        // Legacy behavior: use individual OAuth2 properties if security type not specified
679                        if let Some(credential) = &self.catalog_credential {
680                            java_catalog_configs
681                                .insert("credential".to_owned(), credential.clone());
682                        }
683                        if let Some(token) = &self.catalog_token {
684                            java_catalog_configs.insert("token".to_owned(), token.clone());
685                        }
686                        if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
687                            java_catalog_configs
688                                .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
689                        }
690                        if let Some(scope) = &self.catalog_scope {
691                            java_catalog_configs.insert("scope".to_owned(), scope.clone());
692                        }
693                    }
694                    if let Some(rest_signing_region) = &self.rest_signing_region {
695                        java_catalog_configs.insert(
696                            "rest.signing-region".to_owned(),
697                            rest_signing_region.clone(),
698                        );
699                    }
700                    if let Some(rest_signing_name) = &self.rest_signing_name {
701                        java_catalog_configs
702                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
703                    }
704                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
705                        java_catalog_configs.insert(
706                            "rest.sigv4-enabled".to_owned(),
707                            rest_sigv4_enabled.to_string(),
708                        );
709
710                        if let Some(access_key) = &self.s3_access_key {
711                            java_catalog_configs
712                                .insert("rest.access-key-id".to_owned(), access_key.clone());
713                        }
714
715                        if let Some(secret_key) = &self.s3_secret_key {
716                            java_catalog_configs
717                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
718                        }
719                    }
720                }
721                "glue" => {
722                    let glue_access_key = self.glue_access_key();
723                    let glue_secret_key = self.glue_secret_key();
724                    let has_glue_credentials =
725                        glue_access_key.is_some() && glue_secret_key.is_some();
726                    let should_configure_glue_provider = !enable_config_load
727                        || has_glue_credentials
728                        || self.glue_iam_role_arn.is_some();
729
730                    if should_configure_glue_provider {
731                        java_catalog_configs.insert(
732                            "client.credentials-provider".to_owned(),
733                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
734                        );
735                        if let Some(region) = self.glue_region() {
736                            java_catalog_configs.insert(
737                                "client.credentials-provider.glue.region".to_owned(),
738                                region.to_owned(),
739                            );
740                        }
741                        if let Some(access_key) = glue_access_key {
742                            java_catalog_configs.insert(
743                                "client.credentials-provider.glue.access-key-id".to_owned(),
744                                access_key.to_owned(),
745                            );
746                        }
747                        if let Some(secret_key) = glue_secret_key {
748                            java_catalog_configs.insert(
749                                "client.credentials-provider.glue.secret-access-key".to_owned(),
750                                secret_key.to_owned(),
751                            );
752                        }
753                        if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
754                            java_catalog_configs.insert(
755                                "client.credentials-provider.glue.iam-role-arn".to_owned(),
756                                role_arn.to_owned(),
757                            );
758                        }
759                        if enable_config_load && !has_glue_credentials {
760                            java_catalog_configs.insert(
761                                "client.credentials-provider.glue.use-default-credential-chain"
762                                    .to_owned(),
763                                "true".to_owned(),
764                            );
765                        }
766                    }
767
768                    if let Some(region) = self.glue_region() {
769                        java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
770                        java_catalog_configs.insert(
771                            "glue.endpoint".to_owned(),
772                            format!("https://glue.{}.amazonaws.com", region),
773                        );
774                    }
775
776                    if let Some(glue_id) = self.glue_id.as_deref() {
777                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
778                    }
779                    self.apply_java_s3_file_io_assume_role_configs(&mut java_catalog_configs);
780                }
781                "jdbc" => {
782                    self.apply_java_aws_client_assume_role_configs(&mut java_catalog_configs);
783                }
784                _ => {}
785            }
786        }
787
788        Ok((file_io_props, java_catalog_configs))
789    }
790
791    fn apply_java_s3_file_io_assume_role_configs(
792        &self,
793        java_catalog_configs: &mut HashMap<String, String>,
794    ) {
795        if let Some(iam_role_arn) = &self.s3_iam_role_arn {
796            java_catalog_configs.insert(
797                "s3.client-factory-impl".to_owned(),
798                "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory".to_owned(),
799            );
800            java_catalog_configs.insert("s3.iam-role-arn".to_owned(), iam_role_arn.clone());
801        }
802    }
803
804    fn apply_java_aws_client_assume_role_configs(
805        &self,
806        java_catalog_configs: &mut HashMap<String, String>,
807    ) {
808        if let Some(iam_role_arn) = &self.s3_iam_role_arn {
809            java_catalog_configs.insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
810            java_catalog_configs.insert(
811                "client.factory".to_owned(),
812                "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
813            );
814            if let Some(region) = &self.s3_region {
815                java_catalog_configs.insert("client.assume-role.region".to_owned(), region.clone());
816            }
817        }
818    }
819}
820
821impl IcebergCommon {
822    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
823    pub async fn create_catalog(
824        &self,
825        java_catalog_props: &HashMap<String, String>,
826    ) -> ConnectorResult<Arc<dyn Catalog>> {
827        match self.catalog_type() {
828            "storage" => {
829                let warehouse = self
830                    .warehouse_path
831                    .clone()
832                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
833                let url = Url::parse(warehouse.as_ref())
834                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
835
836                let config = match url.scheme() {
837                    "s3" | "s3a" => StorageCatalogConfig::S3(
838                        storage_catalog::StorageCatalogS3Config::builder()
839                            .warehouse(warehouse)
840                            .access_key(self.s3_access_key.clone())
841                            .secret_key(self.s3_secret_key.clone())
842                            .region(self.s3_region.clone())
843                            .endpoint(self.s3_endpoint.clone())
844                            .path_style_access(self.s3_path_style_access)
845                            .enable_config_load(Some(self.enable_config_load()))
846                            .build(),
847                    ),
848                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
849                        storage_catalog::StorageCatalogGcsConfig::builder()
850                            .warehouse(warehouse)
851                            .credential(self.gcs_credential.clone())
852                            .enable_config_load(Some(self.enable_config_load()))
853                            .build(),
854                    ),
855                    "azblob" => StorageCatalogConfig::Azblob(
856                        storage_catalog::StorageCatalogAzblobConfig::builder()
857                            .warehouse(warehouse)
858                            .account_name(self.azblob_account_name.clone())
859                            .account_key(self.azblob_account_key.clone())
860                            .endpoint(self.azblob_endpoint_url.clone())
861                            .build(),
862                    ),
863                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
864                };
865
866                let catalog = storage_catalog::StorageCatalog::new(config)?;
867                Ok(Arc::new(catalog))
868            }
869            "rest_rust" => {
870                let mut iceberg_configs = HashMap::new();
871
872                // check gcs credential or s3 access key and secret key
873                if let Some(gcs_credential) = &self.gcs_credential {
874                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
875                } else {
876                    if let Some(region) = &self.s3_region {
877                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
878                    }
879                    if let Some(endpoint) = &self.s3_endpoint {
880                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
881                    }
882                    if let Some(access_key) = &self.s3_access_key {
883                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
884                    }
885                    if let Some(secret_key) = &self.s3_secret_key {
886                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
887                    }
888                    if let Some(path_style_access) = &self.s3_path_style_access {
889                        iceberg_configs.insert(
890                            S3_PATH_STYLE_ACCESS.to_owned(),
891                            path_style_access.to_string(),
892                        );
893                    }
894                };
895
896                if let Some(credential) = &self.catalog_credential {
897                    iceberg_configs.insert("credential".to_owned(), credential.clone());
898                }
899                if let Some(token) = &self.catalog_token {
900                    iceberg_configs.insert("token".to_owned(), token.clone());
901                }
902                if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
903                    iceberg_configs
904                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
905                }
906                if let Some(scope) = &self.catalog_scope {
907                    iceberg_configs.insert("scope".to_owned(), scope.clone());
908                }
909
910                let headers = self.headers()?;
911                for (header_name, header_value) in headers {
912                    iceberg_configs.insert(format!("header.{}", header_name), header_value);
913                }
914
915                iceberg_configs.insert(
916                    iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
917                    self.catalog_uri
918                        .clone()
919                        .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
920                );
921                if let Some(warehouse_path) = &self.warehouse_path {
922                    iceberg_configs.insert(
923                        iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
924                        warehouse_path.clone(),
925                    );
926                }
927                let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
928                    .load("rest", iceberg_configs)
929                    .await
930                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
931                Ok(Arc::new(catalog))
932            }
933            "glue_rust" => {
934                let mut iceberg_configs = HashMap::new();
935                // glue
936                if let Some(region) = self.glue_region() {
937                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
938                }
939                if let Some(access_key) = self.glue_access_key() {
940                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
941                }
942                if let Some(secret_key) = self.glue_secret_key() {
943                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
944                }
945                // s3
946                if let Some(region) = &self.s3_region {
947                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
948                }
949                if let Some(endpoint) = &self.s3_endpoint {
950                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
951                }
952                if let Some(access_key) = &self.s3_access_key {
953                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
954                }
955                if let Some(secret_key) = &self.s3_secret_key {
956                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
957                }
958                if let Some(role_arn) = &self.s3_iam_role_arn {
959                    iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
960                }
961                if let Some(path_style_access) = &self.s3_path_style_access {
962                    iceberg_configs.insert(
963                        S3_PATH_STYLE_ACCESS.to_owned(),
964                        path_style_access.to_string(),
965                    );
966                }
967                iceberg_configs.insert(
968                    iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
969                    self.warehouse_path
970                        .clone()
971                        .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
972                );
973                if let Some(uri) = self.catalog_uri.as_deref() {
974                    iceberg_configs.insert(
975                        iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
976                        uri.to_owned(),
977                    );
978                }
979                let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
980                    .load("glue", iceberg_configs)
981                    .await
982                    .map_err(|e| anyhow!(IcebergError::from(e)))?;
983                Ok(Arc::new(catalog))
984            }
985            catalog_type
986                if catalog_type == "hive"
987                    || catalog_type == "snowflake"
988                    || catalog_type == "jdbc"
989                    || catalog_type == "rest"
990                    || catalog_type == "glue" =>
991            {
992                // Create java catalog
993                let (file_io_props, java_catalog_props) =
994                    self.build_jni_catalog_configs(java_catalog_props)?;
995                let catalog_impl = match catalog_type {
996                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
997                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
998                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
999                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
1000                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
1001                    _ => unreachable!(),
1002                };
1003
1004                jni_catalog::JniCatalog::build_catalog(
1005                    file_io_props,
1006                    self.catalog_name(),
1007                    catalog_impl,
1008                    java_catalog_props,
1009                )
1010            }
1011            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
1012            _ => {
1013                bail!(
1014                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
1015                    self.catalog_type()
1016                )
1017            }
1018        }
1019    }
1020
1021    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
1022    pub async fn load_table(
1023        &self,
1024        table: &IcebergTableIdentifier,
1025        java_catalog_props: &HashMap<String, String>,
1026    ) -> ConnectorResult<Table> {
1027        let catalog = self
1028            .create_catalog(java_catalog_props)
1029            .await
1030            .context("Unable to load iceberg catalog")?;
1031
1032        let table_id = table
1033            .to_table_ident()
1034            .context("Unable to parse table name")?;
1035
1036        let table = catalog.load_table(&table_id).await?;
1037        Ok(rebuild_table_with_shared_cache(table).await)
1038    }
1039}
1040
1041/// Get a globally shared object cache keyed by table UUID to avoid reuse after drop & recreate.
1042pub(crate) async fn shared_object_cache(
1043    init_object_cache: Arc<ObjectCache>,
1044    table_uuid: Uuid,
1045) -> Arc<ObjectCache> {
1046    static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
1047        MokaCache::builder()
1048            .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
1049            .build()
1050    });
1051
1052    CACHE
1053        .get_with(table_uuid, async { init_object_cache })
1054        .await
1055}
1056
1057pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
1058    let table_uuid = table.metadata().uuid();
1059    let init_object_cache = table.object_cache();
1060    let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
1061    table.with_object_cache(object_cache)
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use std::collections::HashMap;
1067
1068    use super::*;
1069
1070    fn test_common(catalog_type: &str) -> IcebergCommon {
1071        IcebergCommon {
1072            catalog_type: Some(catalog_type.to_owned()),
1073            s3_region: Some("ap-southeast-2".to_owned()),
1074            s3_endpoint: None,
1075            s3_access_key: None,
1076            s3_secret_key: None,
1077            s3_iam_role_arn: None,
1078            glue_access_key: None,
1079            glue_secret_key: None,
1080            glue_iam_role_arn: None,
1081            glue_region: None,
1082            glue_id: None,
1083            gcs_credential: None,
1084            azblob_account_name: None,
1085            azblob_account_key: None,
1086            azblob_endpoint_url: None,
1087            adlsgen2_account_name: None,
1088            adlsgen2_account_key: None,
1089            adlsgen2_endpoint: None,
1090            adlsgen2_tenant_id: None,
1091            adlsgen2_client_id: None,
1092            adlsgen2_client_secret: None,
1093            adlsgen2_authority_host: None,
1094            warehouse_path: Some("s3://bucket/warehouse".to_owned()),
1095            catalog_name: None,
1096            catalog_uri: None,
1097            catalog_credential: None,
1098            catalog_token: None,
1099            catalog_oauth2_server_uri: None,
1100            catalog_scope: None,
1101            rest_signing_region: None,
1102            rest_signing_name: None,
1103            rest_sigv4_enabled: None,
1104            s3_path_style_access: None,
1105            enable_config_load: None,
1106            hosted_catalog: None,
1107            catalog_header: None,
1108            vended_credentials: None,
1109            catalog_security: None,
1110            gcp_auth_scopes: None,
1111            catalog_io_impl: None,
1112        }
1113    }
1114
1115    #[test]
1116    fn test_glue_jni_catalog_uses_s3_assume_role_for_file_io() {
1117        let common = IcebergCommon {
1118            s3_iam_role_arn: Some("arn:aws:iam::123456789012:role/risingwave-s3".to_owned()),
1119            ..test_common("glue")
1120        };
1121
1122        let (_, java_catalog_configs) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1123
1124        assert_eq!(
1125            java_catalog_configs.get("s3.client-factory-impl").unwrap(),
1126            "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory"
1127        );
1128        assert_eq!(
1129            java_catalog_configs.get("s3.iam-role-arn").unwrap(),
1130            "arn:aws:iam::123456789012:role/risingwave-s3"
1131        );
1132        assert!(!java_catalog_configs.contains_key("client.factory"));
1133    }
1134
1135    #[test]
1136    fn test_adlsgen2_service_principal_populates_file_io_configs_with_default_authority_host() {
1137        let common = test_adlsgen2_service_principal_common(None);
1138
1139        let (file_io_props, _) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1140
1141        assert_eq!(file_io_props.get(ADLS_TENANT_ID).unwrap(), "tenant-uuid");
1142        assert_eq!(file_io_props.get(ADLS_CLIENT_ID).unwrap(), "client-uuid");
1143        assert_eq!(
1144            file_io_props.get(ADLS_CLIENT_SECRET).unwrap(),
1145            "secret-value"
1146        );
1147        assert_eq!(
1148            file_io_props.get(ADLS_AUTHORITY_HOST).unwrap(),
1149            ADLS_DEFAULT_AUTHORITY_HOST
1150        );
1151    }
1152
1153    fn test_adlsgen2_service_principal_common(authority_host: Option<&str>) -> IcebergCommon {
1154        IcebergCommon {
1155            adlsgen2_account_name: Some("acct".to_owned()),
1156            adlsgen2_tenant_id: Some("tenant-uuid".to_owned()),
1157            adlsgen2_client_id: Some("client-uuid".to_owned()),
1158            adlsgen2_client_secret: Some("secret-value".to_owned()),
1159            adlsgen2_authority_host: authority_host.map(str::to_owned),
1160            warehouse_path: Some("abfss://wh@acct.dfs.core.windows.net/wh".to_owned()),
1161            ..test_common("rest")
1162        }
1163    }
1164
1165    #[test]
1166    fn test_adlsgen2_service_principal_authority_host_override_is_respected() {
1167        let common =
1168            test_adlsgen2_service_principal_common(Some("https://login.microsoftonline.us"));
1169
1170        let (file_io_props, _) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1171
1172        assert_eq!(
1173            file_io_props.get(ADLS_AUTHORITY_HOST).unwrap(),
1174            "https://login.microsoftonline.us"
1175        );
1176    }
1177
1178    #[test]
1179    fn test_adlsgen2_authority_host_rejects_non_bare_https_origins() {
1180        let cases = [
1181            ("not a url", "does not parse as a URL"),
1182            (
1183                "http://login.microsoftonline.com",
1184                "must use the https scheme",
1185            ),
1186            (
1187                "https://user:pass@login.microsoftonline.com",
1188                "must not contain userinfo",
1189            ),
1190            (
1191                "https://login.microsoftonline.com?bar=baz",
1192                "must not contain a query or fragment",
1193            ),
1194            (
1195                "https://login.microsoftonline.com#frag",
1196                "must not contain a query or fragment",
1197            ),
1198            (
1199                "https://login.microsoftonline.com/foo",
1200                "must not contain a path component",
1201            ),
1202        ];
1203        for (authority_host, expected_error) in cases {
1204            let common = test_adlsgen2_service_principal_common(Some(authority_host));
1205            let err = common
1206                .build_jni_catalog_configs(&HashMap::new())
1207                .unwrap_err();
1208            assert!(
1209                format!("{:#}", err).contains(expected_error),
1210                "authority_host {authority_host:?}: expected error containing {expected_error:?}, got: {err:#}"
1211            );
1212        }
1213    }
1214
1215    #[test]
1216    fn test_adlsgen2_authority_host_trailing_slash_is_normalized() {
1217        let common =
1218            test_adlsgen2_service_principal_common(Some("https://login.microsoftonline.us/"));
1219
1220        let (file_io_props, _) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1221
1222        assert_eq!(
1223            file_io_props.get(ADLS_AUTHORITY_HOST).unwrap(),
1224            "https://login.microsoftonline.us"
1225        );
1226    }
1227
1228    #[test]
1229    fn test_adlsgen2_rejects_mixing_shared_key_and_service_principal() {
1230        let common = IcebergCommon {
1231            adlsgen2_account_key: Some("shared-key".to_owned()),
1232            ..test_adlsgen2_service_principal_common(None)
1233        };
1234
1235        let err = common
1236            .build_jni_catalog_configs(&HashMap::new())
1237            .unwrap_err();
1238        assert!(
1239            format!("{:#}", err).contains("exactly one auth mode"),
1240            "expected mutual-exclusion error, got: {err:#}"
1241        );
1242    }
1243
1244    #[test]
1245    fn test_adlsgen2_rejects_partial_service_principal_config() {
1246        let common = IcebergCommon {
1247            adlsgen2_client_secret: None,
1248            ..test_adlsgen2_service_principal_common(None)
1249        };
1250
1251        let err = common
1252            .build_jni_catalog_configs(&HashMap::new())
1253            .unwrap_err();
1254        assert!(
1255            format!("{:#}", err).contains("requires all three"),
1256            "expected partial-config error, got: {err:#}"
1257        );
1258    }
1259
1260    #[test]
1261    fn test_iceberg_table_identifier_validation() {
1262        let valid_identifier = IcebergTableIdentifier {
1263            database_name: Some("valid_db".to_owned()),
1264            table_name: "test_table".to_owned(),
1265        };
1266        assert!(valid_identifier.validate().is_ok());
1267
1268        let valid_underscore = IcebergTableIdentifier {
1269            database_name: Some("valid_db_name".to_owned()),
1270            table_name: "test_table".to_owned(),
1271        };
1272        assert!(valid_underscore.validate().is_ok());
1273
1274        let no_database = IcebergTableIdentifier {
1275            database_name: None,
1276            table_name: "test_table".to_owned(),
1277        };
1278        assert!(no_database.validate().is_ok());
1279
1280        let empty_part = IcebergTableIdentifier {
1281            database_name: Some("a..b".to_owned()),
1282            table_name: "test_table".to_owned(),
1283        };
1284        let result = empty_part.validate();
1285        assert!(result.is_err());
1286        assert!(
1287            result
1288                .unwrap_err()
1289                .to_string()
1290                .contains("identifier parts must not be empty")
1291        );
1292
1293        let leading_dot = IcebergTableIdentifier {
1294            database_name: None,
1295            table_name: ".test_table".to_owned(),
1296        };
1297        let result = leading_dot.validate();
1298        assert!(result.is_err());
1299        assert!(
1300            result
1301                .unwrap_err()
1302                .to_string()
1303                .contains("identifier parts must not be empty")
1304        );
1305    }
1306
1307    #[test]
1308    fn test_iceberg_table_identifier_dots_as_namespace_separators() {
1309        let table_ident = IcebergTableIdentifier {
1310            database_name: Some("general.zia.stats".to_owned()),
1311            table_name: "tagged_security_transactions".to_owned(),
1312        }
1313        .to_table_ident()
1314        .unwrap();
1315        let namespace: Vec<_> = table_ident
1316            .namespace()
1317            .as_ref()
1318            .iter()
1319            .map(String::as_str)
1320            .collect();
1321        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1322        assert_eq!(table_ident.name(), "tagged_security_transactions");
1323
1324        let table_ident = IcebergTableIdentifier {
1325            database_name: Some("general".to_owned()),
1326            table_name: "zia.stats.tagged_security_transactions".to_owned(),
1327        }
1328        .to_table_ident()
1329        .unwrap();
1330        let namespace: Vec<_> = table_ident
1331            .namespace()
1332            .as_ref()
1333            .iter()
1334            .map(String::as_str)
1335            .collect();
1336        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1337        assert_eq!(table_ident.name(), "tagged_security_transactions");
1338
1339        let table_ident = IcebergTableIdentifier {
1340            database_name: None,
1341            table_name: "general.zia.stats.tagged_security_transactions".to_owned(),
1342        }
1343        .to_table_ident()
1344        .unwrap();
1345        let namespace: Vec<_> = table_ident
1346            .namespace()
1347            .as_ref()
1348            .iter()
1349            .map(String::as_str)
1350            .collect();
1351        assert_eq!(namespace, vec!["general", "zia", "stats"]);
1352        assert_eq!(table_ident.name(), "tagged_security_transactions");
1353    }
1354}