risingwave_connector/connector_common/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod jni_catalog;
16mod mock_catalog;
17mod storage_catalog;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
22use ::iceberg::table::Table;
23use ::iceberg::{Catalog, TableIdent};
24use anyhow::{Context, anyhow};
25use iceberg::io::{GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD};
26use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
27use risingwave_common::bail;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use url::Url;
31use with_options::WithOptions;
32
33use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
34use crate::deserialize_optional_bool_from_string;
35use crate::error::ConnectorResult;
36
37#[serde_as]
38#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
39pub struct IcebergCommon {
40    // Catalog type supported by iceberg, such as "storage", "rest".
41    // If not set, we use "storage" as default.
42    #[serde(rename = "catalog.type")]
43    pub catalog_type: Option<String>,
44    #[serde(rename = "s3.region")]
45    pub region: Option<String>,
46    #[serde(rename = "s3.endpoint")]
47    pub endpoint: Option<String>,
48    #[serde(rename = "s3.access.key")]
49    pub access_key: Option<String>,
50    #[serde(rename = "s3.secret.key")]
51    pub secret_key: Option<String>,
52
53    #[serde(rename = "gcs.credential")]
54    pub gcs_credential: Option<String>,
55
56    /// Path of iceberg warehouse.
57    #[serde(rename = "warehouse.path")]
58    pub warehouse_path: Option<String>,
59    /// AWS Client id, can be omitted for storage catalog or when
60    /// caller's AWS account ID matches glue id
61    #[serde(rename = "glue.id")]
62    pub glue_id: Option<String>,
63    /// Catalog name, default value is risingwave.
64    #[serde(rename = "catalog.name")]
65    pub catalog_name: Option<String>,
66    /// URI of iceberg catalog, only applicable in rest catalog.
67    #[serde(rename = "catalog.uri")]
68    pub catalog_uri: Option<String>,
69    #[serde(rename = "database.name")]
70    pub database_name: Option<String>,
71    /// Full name of table, must include schema name.
72    #[serde(rename = "table.name")]
73    pub table_name: String,
74    /// Credential for accessing iceberg catalog, only applicable in rest catalog.
75    /// A credential to exchange for a token in the OAuth2 client credentials flow.
76    #[serde(rename = "catalog.credential")]
77    pub credential: Option<String>,
78    /// token for accessing iceberg catalog, only applicable in rest catalog.
79    /// A Bearer token which will be used for interaction with the server.
80    #[serde(rename = "catalog.token")]
81    pub token: Option<String>,
82    /// `oauth2_server_uri` for accessing iceberg catalog, only applicable in rest catalog.
83    /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
84    #[serde(rename = "catalog.oauth2_server_uri")]
85    pub oauth2_server_uri: Option<String>,
86    /// scope for accessing iceberg catalog, only applicable in rest catalog.
87    /// Additional scope for OAuth2.
88    #[serde(rename = "catalog.scope")]
89    pub scope: Option<String>,
90
91    /// The signing region to use when signing requests to the REST catalog.
92    #[serde(rename = "catalog.rest.signing_region")]
93    pub rest_signing_region: Option<String>,
94
95    /// The signing name to use when signing requests to the REST catalog.
96    #[serde(rename = "catalog.rest.signing_name")]
97    pub rest_signing_name: Option<String>,
98
99    /// Whether to use SigV4 for signing requests to the REST catalog.
100    #[serde(
101        rename = "catalog.rest.sigv4_enabled",
102        default,
103        deserialize_with = "deserialize_optional_bool_from_string"
104    )]
105    pub rest_sigv4_enabled: Option<bool>,
106
107    #[serde(
108        rename = "s3.path.style.access",
109        default,
110        deserialize_with = "deserialize_optional_bool_from_string"
111    )]
112    pub path_style_access: Option<bool>,
113    /// enable config load.
114    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
115    pub enable_config_load: Option<bool>,
116}
117
118impl IcebergCommon {
119    pub fn catalog_type(&self) -> &str {
120        self.catalog_type.as_deref().unwrap_or("storage")
121    }
122
123    pub fn catalog_name(&self) -> String {
124        self.catalog_name
125            .as_ref()
126            .map(|s| s.to_string())
127            .unwrap_or_else(|| "risingwave".to_owned())
128    }
129
130    /// For both V1 and V2.
131    fn build_jni_catalog_configs(
132        &self,
133        java_catalog_props: &HashMap<String, String>,
134    ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
135        let mut iceberg_configs = HashMap::new();
136        let enable_config_load = self.enable_config_load.unwrap_or(false);
137        let file_io_props = {
138            let catalog_type = self.catalog_type().to_owned();
139
140            if let Some(region) = &self.region {
141                // iceberg-rust
142                iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
143            }
144
145            if let Some(endpoint) = &self.endpoint {
146                // iceberg-rust
147                iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
148            }
149
150            // iceberg-rust
151            if let Some(access_key) = &self.access_key {
152                iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
153            }
154            if let Some(secret_key) = &self.secret_key {
155                iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
156            }
157            if let Some(gcs_credential) = &self.gcs_credential {
158                iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
159                if catalog_type != "rest" && catalog_type != "rest_rust" {
160                    bail!("gcs unsupported in {} catalog", &catalog_type);
161                }
162            }
163
164            match &self.warehouse_path {
165                Some(warehouse_path) => {
166                    let (bucket, _) = {
167                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
168                        let url = Url::parse(warehouse_path);
169                        if (url.is_err() || is_s3_tables)
170                            && (catalog_type == "rest" || catalog_type == "rest_rust")
171                        {
172                            // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
173                            // Or it could be a s3tables path, which is not a valid URL but a valid warehouse path,
174                            // so we allow it to pass here.
175                            (None, None)
176                        } else {
177                            let url = url.with_context(|| {
178                                format!("Invalid warehouse path: {}", warehouse_path)
179                            })?;
180                            let bucket = url
181                                .host_str()
182                                .with_context(|| {
183                                    format!(
184                                        "Invalid s3 path: {}, bucket is missing",
185                                        warehouse_path
186                                    )
187                                })?
188                                .to_owned();
189                            let root = url.path().trim_start_matches('/').to_owned();
190                            (Some(bucket), Some(root))
191                        }
192                    };
193
194                    if let Some(bucket) = bucket {
195                        iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
196                    }
197                }
198                None => {
199                    if catalog_type != "rest" && catalog_type != "rest_rust" {
200                        bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
201                    }
202                }
203            }
204            iceberg_configs.insert(
205                S3_DISABLE_CONFIG_LOAD.to_owned(),
206                (!enable_config_load).to_string(),
207            );
208
209            iceberg_configs.insert(
210                GCS_DISABLE_CONFIG_LOAD.to_owned(),
211                (!enable_config_load).to_string(),
212            );
213
214            iceberg_configs
215        };
216
217        // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
218        let mut java_catalog_configs = HashMap::new();
219        {
220            if let Some(uri) = self.catalog_uri.as_deref() {
221                java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
222            }
223
224            if let Some(warehouse_path) = &self.warehouse_path {
225                java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
226            }
227            java_catalog_configs.extend(java_catalog_props.clone());
228
229            // Currently we only support s3, so let's set it to s3
230            java_catalog_configs.insert(
231                "io-impl".to_owned(),
232                "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
233            );
234
235            // suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
236            java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
237
238            if let Some(region) = &self.region {
239                java_catalog_configs.insert("client.region".to_owned(), region.clone());
240            }
241            if let Some(endpoint) = &self.endpoint {
242                java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
243            }
244
245            if let Some(access_key) = &self.access_key {
246                java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
247            }
248            if let Some(secret_key) = &self.secret_key {
249                java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
250            }
251
252            if let Some(path_style_access) = self.path_style_access {
253                java_catalog_configs.insert(
254                    "s3.path-style-access".to_owned(),
255                    path_style_access.to_string(),
256                );
257            }
258
259            match self.catalog_type.as_deref() {
260                Some("rest") => {
261                    if let Some(credential) = &self.credential {
262                        java_catalog_configs.insert("credential".to_owned(), credential.clone());
263                    }
264                    if let Some(token) = &self.token {
265                        java_catalog_configs.insert("token".to_owned(), token.clone());
266                    }
267                    if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
268                        java_catalog_configs
269                            .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
270                    }
271                    if let Some(scope) = &self.scope {
272                        java_catalog_configs.insert("scope".to_owned(), scope.clone());
273                    }
274                    if let Some(rest_signing_region) = &self.rest_signing_region {
275                        java_catalog_configs.insert(
276                            "rest.signing-region".to_owned(),
277                            rest_signing_region.clone(),
278                        );
279                    }
280                    if let Some(rest_signing_name) = &self.rest_signing_name {
281                        java_catalog_configs
282                            .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
283                    }
284                    if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
285                        java_catalog_configs.insert(
286                            "rest.sigv4-enabled".to_owned(),
287                            rest_sigv4_enabled.to_string(),
288                        );
289
290                        if let Some(access_key) = &self.access_key {
291                            java_catalog_configs
292                                .insert("rest.access-key-id".to_owned(), access_key.clone());
293                        }
294
295                        if let Some(secret_key) = &self.secret_key {
296                            java_catalog_configs
297                                .insert("rest.secret-access-key".to_owned(), secret_key.clone());
298                        }
299                    }
300                }
301                Some("glue") => {
302                    if !enable_config_load {
303                        java_catalog_configs.insert(
304                            "client.credentials-provider".to_owned(),
305                            "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
306                        );
307                        // Use S3 ak/sk and region as glue ak/sk and region by default.
308                        // TODO: use different ak/sk and region for s3 and glue.
309                        if let Some(access_key) = &self.access_key {
310                            java_catalog_configs.insert(
311                                "client.credentials-provider.glue.access-key-id".to_owned(),
312                                access_key.clone(),
313                            );
314                        }
315                        if let Some(secret_key) = &self.secret_key {
316                            java_catalog_configs.insert(
317                                "client.credentials-provider.glue.secret-access-key".to_owned(),
318                                secret_key.clone(),
319                            );
320                        }
321                    }
322
323                    if let Some(region) = &self.region {
324                        java_catalog_configs.insert("client.region".to_owned(), region.clone());
325                        java_catalog_configs.insert(
326                            "glue.endpoint".to_owned(),
327                            format!("https://glue.{}.amazonaws.com", region),
328                        );
329                    }
330
331                    if let Some(glue_id) = self.glue_id.as_deref() {
332                        java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
333                    }
334                }
335                _ => {}
336            }
337        }
338
339        Ok((file_io_props, java_catalog_configs))
340    }
341}
342
343impl IcebergCommon {
344    pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
345        let ret = if let Some(database_name) = &self.database_name {
346            TableIdent::from_strs(vec![database_name, &self.table_name])
347        } else {
348            TableIdent::from_strs(vec![&self.table_name])
349        };
350
351        Ok(ret.context("Failed to create table identifier")?)
352    }
353
354    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
355    pub async fn create_catalog(
356        &self,
357        java_catalog_props: &HashMap<String, String>,
358    ) -> ConnectorResult<Arc<dyn Catalog>> {
359        match self.catalog_type() {
360            "storage" => {
361                let warehouse = self
362                    .warehouse_path
363                    .clone()
364                    .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
365                let url = Url::parse(warehouse.as_ref())
366                    .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
367
368                let config = match url.scheme() {
369                    "s3" | "s3a" => StorageCatalogConfig::S3(
370                        storage_catalog::StorageCatalogS3Config::builder()
371                            .warehouse(warehouse)
372                            .access_key(self.access_key.clone())
373                            .secret_key(self.secret_key.clone())
374                            .region(self.region.clone())
375                            .endpoint(self.endpoint.clone())
376                            .enable_config_load(self.enable_config_load)
377                            .build(),
378                    ),
379                    "gs" | "gcs" => StorageCatalogConfig::Gcs(
380                        storage_catalog::StorageCatalogGcsConfig::builder()
381                            .warehouse(warehouse)
382                            .credential(self.gcs_credential.clone())
383                            .enable_config_load(self.enable_config_load)
384                            .build(),
385                    ),
386                    scheme => bail!("Unsupported warehouse scheme: {}", scheme),
387                };
388
389                let catalog = storage_catalog::StorageCatalog::new(config)?;
390                Ok(Arc::new(catalog))
391            }
392            "rest_rust" => {
393                let mut iceberg_configs = HashMap::new();
394
395                // check gcs credential or s3 access key and secret key
396                if let Some(gcs_credential) = &self.gcs_credential {
397                    iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
398                } else {
399                    if let Some(region) = &self.region {
400                        iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
401                    }
402                    if let Some(endpoint) = &self.endpoint {
403                        iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
404                    }
405                    if let Some(access_key) = &self.access_key {
406                        iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
407                    }
408                    if let Some(secret_key) = &self.secret_key {
409                        iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
410                    }
411                };
412
413                if let Some(credential) = &self.credential {
414                    iceberg_configs.insert("credential".to_owned(), credential.clone());
415                }
416                if let Some(token) = &self.token {
417                    iceberg_configs.insert("token".to_owned(), token.clone());
418                }
419                if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
420                    iceberg_configs
421                        .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
422                }
423                if let Some(scope) = &self.scope {
424                    iceberg_configs.insert("scope".to_owned(), scope.clone());
425                }
426
427                let config_builder =
428                    iceberg_catalog_rest::RestCatalogConfig::builder()
429                        .uri(self.catalog_uri.clone().with_context(|| {
430                            "`catalog.uri` must be set in rest catalog".to_owned()
431                        })?)
432                        .props(iceberg_configs);
433
434                let config = match &self.warehouse_path {
435                    Some(warehouse_path) => {
436                        config_builder.warehouse(warehouse_path.clone()).build()
437                    }
438                    None => config_builder.build(),
439                };
440                let catalog = iceberg_catalog_rest::RestCatalog::new(config);
441                Ok(Arc::new(catalog))
442            }
443            "glue_rust" => {
444                let mut iceberg_configs = HashMap::new();
445                // glue
446                if let Some(region) = &self.region {
447                    iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
448                }
449                if let Some(access_key) = &self.access_key {
450                    iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
451                }
452                if let Some(secret_key) = &self.secret_key {
453                    iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
454                }
455                // s3
456                if let Some(region) = &self.region {
457                    iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
458                }
459                if let Some(endpoint) = &self.endpoint {
460                    iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
461                }
462                if let Some(access_key) = &self.access_key {
463                    iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
464                }
465                if let Some(secret_key) = &self.secret_key {
466                    iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
467                }
468                let config_builder =
469                    iceberg_catalog_glue::GlueCatalogConfig::builder()
470                        .warehouse(self.warehouse_path.clone().ok_or_else(|| {
471                            anyhow!("`warehouse.path` must be set in glue catalog")
472                        })?)
473                        .props(iceberg_configs);
474                let config = if let Some(uri) = self.catalog_uri.as_deref() {
475                    config_builder.uri(uri.to_owned()).build()
476                } else {
477                    config_builder.build()
478                };
479                let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
480                Ok(Arc::new(catalog))
481            }
482            catalog_type
483                if catalog_type == "hive"
484                    || catalog_type == "snowflake"
485                    || catalog_type == "jdbc"
486                    || catalog_type == "rest"
487                    || catalog_type == "glue" =>
488            {
489                // Create java catalog
490                let (file_io_props, java_catalog_props) =
491                    self.build_jni_catalog_configs(java_catalog_props)?;
492                let catalog_impl = match catalog_type {
493                    "hive" => "org.apache.iceberg.hive.HiveCatalog",
494                    "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
495                    "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
496                    "rest" => "org.apache.iceberg.rest.RESTCatalog",
497                    "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
498                    _ => unreachable!(),
499                };
500
501                jni_catalog::JniCatalog::build_catalog(
502                    file_io_props,
503                    self.catalog_name(),
504                    catalog_impl,
505                    java_catalog_props,
506                )
507            }
508            "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
509            _ => {
510                bail!(
511                    "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
512                    self.catalog_type()
513                )
514            }
515        }
516    }
517
518    /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
519    pub async fn load_table(
520        &self,
521        java_catalog_props: &HashMap<String, String>,
522    ) -> ConnectorResult<Table> {
523        let catalog = self
524            .create_catalog(java_catalog_props)
525            .await
526            .context("Unable to load iceberg catalog")?;
527
528        let table_id = self
529            .full_table_name()
530            .context("Unable to parse table name")?;
531
532        catalog.load_table(&table_id).await.map_err(Into::into)
533    }
534}