risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod jni_catalog;
16mod mock_catalog;
17mod storage_catalog;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
22use ::iceberg::table::Table;
23use ::iceberg::{Catalog, TableIdent};
24use anyhow::{Context, anyhow};
25use iceberg::io::{
26    AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
27    GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD,
28};
29use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
30use phf::{Set, phf_set};
31use risingwave_common::bail;
32use serde_derive::Deserialize;
33use serde_with::serde_as;
34use url::Url;
35use with_options::WithOptions;
36
37use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
38use crate::deserialize_optional_bool_from_string;
39use crate::enforce_secret::EnforceSecret;
40use crate::error::ConnectorResult;
41
42#[serde_as]
43#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
44pub struct IcebergCommon {
45    // Catalog type supported by iceberg, such as "storage", "rest".
46    // If not set, we use "storage" as default.
47    #[serde(rename = "catalog.type")]
48    pub catalog_type: Option<String>,
49    #[serde(rename = "s3.region")]
50    pub region: Option<String>,
51    #[serde(rename = "s3.endpoint")]
52    pub endpoint: Option<String>,
53    #[serde(rename = "s3.access.key")]
54    pub access_key: Option<String>,
55    #[serde(rename = "s3.secret.key")]
56    pub secret_key: Option<String>,
57
58    #[serde(rename = "gcs.credential")]
59    pub gcs_credential: Option<String>,
60
61    #[serde(rename = "azblob.account_name")]
62    pub azblob_account_name: Option<String>,
63    #[serde(rename = "azblob.account_key")]
64    pub azblob_account_key: Option<String>,
65    #[serde(rename = "azblob.endpoint_url")]
66    pub azblob_endpoint_url: Option<String>,
67
68    /// Path of iceberg warehouse.
69    #[serde(rename = "warehouse.path")]
70    pub warehouse_path: Option<String>,
71    /// AWS Client id, can be omitted for storage catalog or when
72    /// caller's AWS account ID matches glue id
73    #[serde(rename = "glue.id")]
74    pub glue_id: Option<String>,
75    /// Catalog name, default value is risingwave.
76    #[serde(rename = "catalog.name")]
77    pub catalog_name: Option<String>,
78    /// URI of iceberg catalog, only applicable in rest catalog.
79    #[serde(rename = "catalog.uri")]
80    pub catalog_uri: Option<String>,
81    #[serde(rename = "database.name")]
82    pub database_name: Option<String>,
83    /// Full name of table, must include schema name.
84    #[serde(rename = "table.name")]
85    pub table_name: String,
86    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
87    /// A credential to exchange for a token in the OAuth2 client credentials flow.
88    #[serde(rename = "catalog.credential")]
89    pub credential: Option<String>,
90    /// token for accessing iceberg catalog, only applicable in rest catalog.
91    /// A Bearer token which will be used for interaction with the server.
92    #[serde(rename = "catalog.token")]
93    pub token: Option<String>,
94    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
95    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
96    #[serde(rename = "catalog.oauth2_server_uri")]
97    pub oauth2_server_uri: Option<String>,
98    /// scope for accessing iceberg catalog, only applicable in rest catalog.
99    /// Additional scope for OAuth2.
100    #[serde(rename = "catalog.scope")]
101    pub scope: Option<String>,
102
103    /// The signing region to use when signing requests to the REST catalog.
104    #[serde(rename = "catalog.rest.signing_region")]
105    pub rest_signing_region: Option<String>,
106
107    /// The signing name to use when signing requests to the REST catalog.
108    #[serde(rename = "catalog.rest.signing_name")]
109    pub rest_signing_name: Option<String>,
110
111    /// Whether to use SigV4 for signing requests to the REST catalog.
112    #[serde(
113        rename = "catalog.rest.sigv4_enabled",
114        default,
115        deserialize_with = "deserialize_optional_bool_from_string"
116    )]
117    pub rest_sigv4_enabled: Option<bool>,
118
119    #[serde(
120        rename = "s3.path.style.access",
121        default,
122        deserialize_with = "deserialize_optional_bool_from_string"
123    )]
124    pub path_style_access: Option<bool>,
125    /// enable config load.
126    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
127    pub enable_config_load: Option<bool>,
128
129    /// This is only used by iceberg engine to enable the hosted catalog.
130    #[serde(
131        rename = "hosted_catalog",
132        default,
133        deserialize_with = "deserialize_optional_bool_from_string"
134    )]
135    pub hosted_catalog: Option<bool>,
136}
137
138impl EnforceSecret for IcebergCommon {
139    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
140        "s3.access.key",
141        "s3.secret.key",
142        "gcs.credential",
143        "catalog.credential",
144        "catalog.token",
145        "catalog.oauth2_server_uri",
146    };
147}
148
149impl IcebergCommon {
150    pub fn catalog_type(&self) -> &str {
151        self.catalog_type.as_deref().unwrap_or("storage")
152    }
153
154    pub fn catalog_name(&self) -> String {
155        self.catalog_name
156            .as_ref()
157            .map(|s| s.to_string())
158            .unwrap_or_else(|| "risingwave".to_owned())
159    }
160
161    /// For both V1 and V2.
162    fn build_jni_catalog_configs(
163        &self,
164        java_catalog_props: &HashMap<String, String>,
165    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
166        let mut iceberg_configs = HashMap::new();
167        let enable_config_load = self.enable_config_load.unwrap_or(false);
168        let file_io_props = {
169            let catalog_type = self.catalog_type().to_owned();
170
171            if let Some(region) = &self.region {
172                // iceberg-rust
173                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
174            }
175
176            if let Some(endpoint) = &self.endpoint {
177                // iceberg-rust
178                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
179            }
180
181            // iceberg-rust
182            if let Some(access_key) = &self.access_key {
183                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
184            }
185            if let Some(secret_key) = &self.secret_key {
186                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
187            }
188            if let Some(gcs_credential) = &self.gcs_credential {
189                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
190                if catalog_type != "rest" && catalog_type != "rest_rust" {
191                    bail!("gcs unsupported in {} catalog", &catalog_type);
192                }
193            }
194
195            if let (
196                Some(azblob_account_name),
197                Some(azblob_account_key),
198                Some(azblob_endpoint_url),
199            ) = (
200                &self.azblob_account_name,
201                &self.azblob_account_key,
202                &self.azblob_endpoint_url,
203            ) {
204                iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
205                iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
206                iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
207
208                if catalog_type != "rest" && catalog_type != "rest_rust" {
209                    bail!("azblob unsupported in {} catalog", &catalog_type);
210                }
211            }
212
213            match &self.warehouse_path {
214                Some(warehouse_path) => {
215                    let (bucket, _) = {
216                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
217                        let url = Url::parse(warehouse_path);
218                        if (url.is_err() || is_s3_tables)
219                            && (catalog_type == "rest" || catalog_type == "rest_rust")
220                        {
221                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
222                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
223                            // so we allow it to pass here.
224                            (None, None)
225                        } else {
226                            let url = url.with_context(|| {
227                                format!("Invalid warehouse path: {}", warehouse_path)
228                            })?;
229                            let bucket = url
230                                .host_str()
231                                .with_context(|| {
232                                    format!(
233                                        "Invalid s3 path: {}, bucket is missing",
234                                        warehouse_path
235                                    )
236                                })?
237                                .to_owned();
238                            let root = url.path().trim_start_matches('/').to_owned();
239                            (Some(bucket), Some(root))
240                        }
241                    };
242
243                    if let Some(bucket) = bucket {
244                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
245                    }
246                }
247                None => {
248                    if catalog_type != "rest" && catalog_type != "rest_rust" {
249                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
250                    }
251                }
252            }
253            iceberg_configs.insert(
254                S3_DISABLE_CONFIG_LOAD.to_owned(),
255                (!enable_config_load).to_string(),
256            );
257
258            iceberg_configs.insert(
259                GCS_DISABLE_CONFIG_LOAD.to_owned(),
260                (!enable_config_load).to_string(),
261            );
262
263            iceberg_configs
264        };
265
266        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
267        let mut java_catalog_configs = HashMap::new();
268        {
269            if let Some(uri) = self.catalog_uri.as_deref() {
270                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
271            }
272
273            if let Some(warehouse_path) = &self.warehouse_path {
274                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
275            }
276            java_catalog_configs.extend(java_catalog_props.clone());
277
278            // Currently we only support s3, so let's set it to s3
279            java_catalog_configs.insert(
280                "io-impl".to_owned(),
281                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
282            );
283
284            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
285            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
286
287            if let Some(region) = &self.region {
288                java_catalog_configs.insert("client.region".to_owned(), region.clone());
289            }
290            if let Some(endpoint) = &self.endpoint {
291                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
292            }
293
294            if let Some(access_key) = &self.access_key {
295                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
296            }
297            if let Some(secret_key) = &self.secret_key {
298                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
299            }
300
301            if let Some(path_style_access) = self.path_style_access {
302                java_catalog_configs.insert(
303                    "s3.path-style-access".to_owned(),
304                    path_style_access.to_string(),
305                );
306            }
307
308            match self.catalog_type.as_deref() {
309                Some("rest") => {
310                    if let Some(credential) = &self.credential {
311                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
312                    }
313                    if let Some(token) = &self.token {
314                        java_catalog_configs.insert("token".to_owned(), token.clone());
315                    }
316                    if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
317                        java_catalog_configs
318                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
319                    }
320                    if let Some(scope) = &self.scope {
321                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
322                    }
323                    if let Some(rest_signing_region) = &self.rest_signing_region {
324                        java_catalog_configs.insert(
325                            "rest.signing-region".to_owned(),
326                            rest_signing_region.clone(),
327                        );
328                    }
329                    if let Some(rest_signing_name) = &self.rest_signing_name {
330                        java_catalog_configs
331                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
332                    }
333                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
334                        java_catalog_configs.insert(
335                            "rest.sigv4-enabled".to_owned(),
336                            rest_sigv4_enabled.to_string(),
337                        );
338
339                        if let Some(access_key) = &self.access_key {
340                            java_catalog_configs
341                                .insert("rest.access-key-id".to_owned(), access_key.clone());
342                        }
343
344                        if let Some(secret_key) = &self.secret_key {
345                            java_catalog_configs
346                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
347                        }
348                    }
349                }
350                Some("glue") => {
351                    if !enable_config_load {
352                        java_catalog_configs.insert(
353                            "client.credentials-provider".to_owned(),
354                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
355                        );
356                        // Use S3 ak/sk and region as glue ak/sk and region by default.
357                        // TODO: use different ak/sk and region for s3 and glue.
358                        if let Some(access_key) = &self.access_key {
359                            java_catalog_configs.insert(
360                                "client.credentials-provider.glue.access-key-id".to_owned(),
361                                access_key.clone(),
362                            );
363                        }
364                        if let Some(secret_key) = &self.secret_key {
365                            java_catalog_configs.insert(
366                                "client.credentials-provider.glue.secret-access-key".to_owned(),
367                                secret_key.clone(),
368                            );
369                        }
370                    }
371
372                    if let Some(region) = &self.region {
373                        java_catalog_configs.insert("client.region".to_owned(), region.clone());
374                        java_catalog_configs.insert(
375                            "glue.endpoint".to_owned(),
376                            format!("https://glue.{}.amazonaws.com", region),
377                        );
378                    }
379
380                    if let Some(glue_id) = self.glue_id.as_deref() {
381                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
382                    }
383                }
384                _ => {}
385            }
386        }
387
388        Ok((file_io_props, java_catalog_configs))
389    }
390}
391
392impl IcebergCommon {
393    pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
394        let ret = if let Some(database_name) = &self.database_name {
395            TableIdent::from_strs(vec![database_name, &self.table_name])
396        } else {
397            TableIdent::from_strs(vec![&self.table_name])
398        };
399
400        Ok(ret.context("Failed to create table identifier")?)
401    }
402
403    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
404    pub async fn create_catalog(
405        &self,
406        java_catalog_props: &HashMap<String, String>,
407    ) -> ConnectorResult<Arc<dyn Catalog>> {
408        match self.catalog_type() {
409            "storage" => {
410                let warehouse = self
411                    .warehouse_path
412                    .clone()
413                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
414                let url = Url::parse(warehouse.as_ref())
415                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
416
417                let config = match url.scheme() {
418                    "s3" | "s3a" => StorageCatalogConfig::S3(
419                        storage_catalog::StorageCatalogS3Config::builder()
420                            .warehouse(warehouse)
421                            .access_key(self.access_key.clone())
422                            .secret_key(self.secret_key.clone())
423                            .region(self.region.clone())
424                            .endpoint(self.endpoint.clone())
425                            .enable_config_load(self.enable_config_load)
426                            .build(),
427                    ),
428                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
429                        storage_catalog::StorageCatalogGcsConfig::builder()
430                            .warehouse(warehouse)
431                            .credential(self.gcs_credential.clone())
432                            .enable_config_load(self.enable_config_load)
433                            .build(),
434                    ),
435                    "azblob" => StorageCatalogConfig::Azblob(
436                        storage_catalog::StorageCatalogAzblobConfig::builder()
437                            .warehouse(warehouse)
438                            .account_name(self.azblob_account_name.clone())
439                            .account_key(self.azblob_account_key.clone())
440                            .endpoint(self.azblob_endpoint_url.clone())
441                            .build(),
442                    ),
443                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
444                };
445
446                let catalog = storage_catalog::StorageCatalog::new(config)?;
447                Ok(Arc::new(catalog))
448            }
449            "rest_rust" => {
450                let mut iceberg_configs = HashMap::new();
451
452                // check gcs credential or s3 access key and secret key
453                if let Some(gcs_credential) = &self.gcs_credential {
454                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
455                } else {
456                    if let Some(region) = &self.region {
457                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
458                    }
459                    if let Some(endpoint) = &self.endpoint {
460                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
461                    }
462                    if let Some(access_key) = &self.access_key {
463                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
464                    }
465                    if let Some(secret_key) = &self.secret_key {
466                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
467                    }
468                };
469
470                if let Some(credential) = &self.credential {
471                    iceberg_configs.insert("credential".to_owned(), credential.clone());
472                }
473                if let Some(token) = &self.token {
474                    iceberg_configs.insert("token".to_owned(), token.clone());
475                }
476                if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
477                    iceberg_configs
478                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
479                }
480                if let Some(scope) = &self.scope {
481                    iceberg_configs.insert("scope".to_owned(), scope.clone());
482                }
483
484                let config_builder =
485                    iceberg_catalog_rest::RestCatalogConfig::builder()
486                        .uri(self.catalog_uri.clone().with_context(|| {
487                            "`catalog.uri` must be set in rest catalog".to_owned()
488                        })?)
489                        .props(iceberg_configs);
490
491                let config = match &self.warehouse_path {
492                    Some(warehouse_path) => {
493                        config_builder.warehouse(warehouse_path.clone()).build()
494                    }
495                    None => config_builder.build(),
496                };
497                let catalog = iceberg_catalog_rest::RestCatalog::new(config);
498                Ok(Arc::new(catalog))
499            }
500            "glue_rust" => {
501                let mut iceberg_configs = HashMap::new();
502                // glue
503                if let Some(region) = &self.region {
504                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
505                }
506                if let Some(access_key) = &self.access_key {
507                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
508                }
509                if let Some(secret_key) = &self.secret_key {
510                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
511                }
512                // s3
513                if let Some(region) = &self.region {
514                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
515                }
516                if let Some(endpoint) = &self.endpoint {
517                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
518                }
519                if let Some(access_key) = &self.access_key {
520                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
521                }
522                if let Some(secret_key) = &self.secret_key {
523                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
524                }
525                let config_builder =
526                    iceberg_catalog_glue::GlueCatalogConfig::builder()
527                        .warehouse(self.warehouse_path.clone().ok_or_else(|| {
528                            anyhow!("`warehouse.path` must be set in glue catalog")
529                        })?)
530                        .props(iceberg_configs);
531                let config = if let Some(uri) = self.catalog_uri.as_deref() {
532                    config_builder.uri(uri.to_owned()).build()
533                } else {
534                    config_builder.build()
535                };
536                let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
537                Ok(Arc::new(catalog))
538            }
539            catalog_type
540                if catalog_type == "hive"
541                    || catalog_type == "snowflake"
542                    || catalog_type == "jdbc"
543                    || catalog_type == "rest"
544                    || catalog_type == "glue" =>
545            {
546                // Create java catalog
547                let (file_io_props, java_catalog_props) =
548                    self.build_jni_catalog_configs(java_catalog_props)?;
549                let catalog_impl = match catalog_type {
550                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
551                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
552                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
553                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
554                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
555                    _ => unreachable!(),
556                };
557
558                jni_catalog::JniCatalog::build_catalog(
559                    file_io_props,
560                    self.catalog_name(),
561                    catalog_impl,
562                    java_catalog_props,
563                )
564            }
565            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
566            _ => {
567                bail!(
568                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
569                    self.catalog_type()
570                )
571            }
572        }
573    }
574
575    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
576    pub async fn load_table(
577        &self,
578        java_catalog_props: &HashMap<String, String>,
579    ) -> ConnectorResult<Table> {
580        let catalog = self
581            .create_catalog(java_catalog_props)
582            .await
583            .context("Unable to load iceberg catalog")?;
584
585        let table_id = self
586            .full_table_name()
587            .context("Unable to parse table name")?;
588
589        catalog.load_table(&table_id).await.map_err(Into::into)
590    }
591}