1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
24use ::iceberg::table::Table;
25use ::iceberg::{Catalog, TableIdent};
26use anyhow::{Context, anyhow};
27use iceberg::io::{
28 AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
29 GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD,
30};
31use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
32use phf::{Set, phf_set};
33use risingwave_common::bail;
34use risingwave_common::util::env_var::env_var_is_true;
35use serde_derive::Deserialize;
36use serde_with::serde_as;
37use url::Url;
38use with_options::WithOptions;
39
40use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
41use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
42use crate::deserialize_optional_bool_from_string;
43use crate::enforce_secret::EnforceSecret;
44use crate::error::ConnectorResult;
45
46#[serde_as]
47#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
48pub struct IcebergCommon {
49 #[serde(rename = "catalog.type")]
52 pub catalog_type: Option<String>,
53 #[serde(rename = "s3.region")]
54 pub region: Option<String>,
55 #[serde(rename = "s3.endpoint")]
56 pub endpoint: Option<String>,
57 #[serde(rename = "s3.access.key")]
58 pub access_key: Option<String>,
59 #[serde(rename = "s3.secret.key")]
60 pub secret_key: Option<String>,
61
62 #[serde(rename = "gcs.credential")]
63 pub gcs_credential: Option<String>,
64
65 #[serde(rename = "azblob.account_name")]
66 pub azblob_account_name: Option<String>,
67 #[serde(rename = "azblob.account_key")]
68 pub azblob_account_key: Option<String>,
69 #[serde(rename = "azblob.endpoint_url")]
70 pub azblob_endpoint_url: Option<String>,
71
72 #[serde(rename = "warehouse.path")]
74 pub warehouse_path: Option<String>,
75 #[serde(rename = "glue.id")]
78 pub glue_id: Option<String>,
79 #[serde(rename = "catalog.name")]
81 pub catalog_name: Option<String>,
82 #[serde(rename = "catalog.uri")]
84 pub catalog_uri: Option<String>,
85 #[serde(rename = "database.name")]
86 pub database_name: Option<String>,
87 #[serde(rename = "table.name")]
89 pub table_name: String,
90 #[serde(rename = "catalog.credential")]
93 pub credential: Option<String>,
94 #[serde(rename = "catalog.token")]
97 pub token: Option<String>,
98 #[serde(rename = "catalog.oauth2_server_uri")]
101 pub oauth2_server_uri: Option<String>,
102 #[serde(rename = "catalog.scope")]
105 pub scope: Option<String>,
106
107 #[serde(rename = "catalog.rest.signing_region")]
109 pub rest_signing_region: Option<String>,
110
111 #[serde(rename = "catalog.rest.signing_name")]
113 pub rest_signing_name: Option<String>,
114
115 #[serde(
117 rename = "catalog.rest.sigv4_enabled",
118 default,
119 deserialize_with = "deserialize_optional_bool_from_string"
120 )]
121 pub rest_sigv4_enabled: Option<bool>,
122
123 #[serde(
124 rename = "s3.path.style.access",
125 default,
126 deserialize_with = "deserialize_optional_bool_from_string"
127 )]
128 pub path_style_access: Option<bool>,
129 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
131 pub enable_config_load: Option<bool>,
132
133 #[serde(
135 rename = "hosted_catalog",
136 default,
137 deserialize_with = "deserialize_optional_bool_from_string"
138 )]
139 pub hosted_catalog: Option<bool>,
140}
141
142impl EnforceSecret for IcebergCommon {
143 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
144 "s3.access.key",
145 "s3.secret.key",
146 "gcs.credential",
147 "catalog.credential",
148 "catalog.token",
149 "catalog.oauth2_server_uri",
150 };
151}
152
153impl IcebergCommon {
154 pub fn catalog_type(&self) -> &str {
155 self.catalog_type.as_deref().unwrap_or("storage")
156 }
157
158 pub fn catalog_name(&self) -> String {
159 self.catalog_name
160 .as_ref()
161 .map(|s| s.to_string())
162 .unwrap_or_else(|| "risingwave".to_owned())
163 }
164
165 pub fn enable_config_load(&self) -> bool {
166 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
168 return false;
169 }
170 self.enable_config_load.unwrap_or(false)
171 }
172
173 fn build_jni_catalog_configs(
175 &self,
176 java_catalog_props: &HashMap<String, String>,
177 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
178 let mut iceberg_configs = HashMap::new();
179 let enable_config_load = self.enable_config_load();
180 let file_io_props = {
181 let catalog_type = self.catalog_type().to_owned();
182
183 if let Some(region) = &self.region {
184 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
186 }
187
188 if let Some(endpoint) = &self.endpoint {
189 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
191 }
192
193 if let Some(access_key) = &self.access_key {
195 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
196 }
197 if let Some(secret_key) = &self.secret_key {
198 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
199 }
200 if let Some(gcs_credential) = &self.gcs_credential {
201 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
202 if catalog_type != "rest" && catalog_type != "rest_rust" {
203 bail!("gcs unsupported in {} catalog", &catalog_type);
204 }
205 }
206
207 if let (
208 Some(azblob_account_name),
209 Some(azblob_account_key),
210 Some(azblob_endpoint_url),
211 ) = (
212 &self.azblob_account_name,
213 &self.azblob_account_key,
214 &self.azblob_endpoint_url,
215 ) {
216 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
217 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
218 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
219
220 if catalog_type != "rest" && catalog_type != "rest_rust" {
221 bail!("azblob unsupported in {} catalog", &catalog_type);
222 }
223 }
224
225 match &self.warehouse_path {
226 Some(warehouse_path) => {
227 let (bucket, _) = {
228 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
229 let url = Url::parse(warehouse_path);
230 if (url.is_err() || is_s3_tables)
231 && (catalog_type == "rest" || catalog_type == "rest_rust")
232 {
233 (None, None)
237 } else {
238 let url = url.with_context(|| {
239 format!("Invalid warehouse path: {}", warehouse_path)
240 })?;
241 let bucket = url
242 .host_str()
243 .with_context(|| {
244 format!(
245 "Invalid s3 path: {}, bucket is missing",
246 warehouse_path
247 )
248 })?
249 .to_owned();
250 let root = url.path().trim_start_matches('/').to_owned();
251 (Some(bucket), Some(root))
252 }
253 };
254
255 if let Some(bucket) = bucket {
256 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
257 }
258 }
259 None => {
260 if catalog_type != "rest" && catalog_type != "rest_rust" {
261 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
262 }
263 }
264 }
265 iceberg_configs.insert(
266 S3_DISABLE_CONFIG_LOAD.to_owned(),
267 (!enable_config_load).to_string(),
268 );
269
270 iceberg_configs.insert(
271 GCS_DISABLE_CONFIG_LOAD.to_owned(),
272 (!enable_config_load).to_string(),
273 );
274
275 iceberg_configs
276 };
277
278 let mut java_catalog_configs = HashMap::new();
280 {
281 if let Some(uri) = self.catalog_uri.as_deref() {
282 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
283 }
284
285 if let Some(warehouse_path) = &self.warehouse_path {
286 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
287 }
288 java_catalog_configs.extend(java_catalog_props.clone());
289
290 java_catalog_configs.insert(
292 "io-impl".to_owned(),
293 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
294 );
295
296 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
298
299 if let Some(region) = &self.region {
300 java_catalog_configs.insert("client.region".to_owned(), region.clone());
301 }
302 if let Some(endpoint) = &self.endpoint {
303 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
304 }
305
306 if let Some(access_key) = &self.access_key {
307 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
308 }
309 if let Some(secret_key) = &self.secret_key {
310 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
311 }
312
313 if let Some(path_style_access) = self.path_style_access {
314 java_catalog_configs.insert(
315 "s3.path-style-access".to_owned(),
316 path_style_access.to_string(),
317 );
318 }
319
320 match self.catalog_type.as_deref() {
321 Some("rest") => {
322 if let Some(credential) = &self.credential {
323 java_catalog_configs.insert("credential".to_owned(), credential.clone());
324 }
325 if let Some(token) = &self.token {
326 java_catalog_configs.insert("token".to_owned(), token.clone());
327 }
328 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
329 java_catalog_configs
330 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
331 }
332 if let Some(scope) = &self.scope {
333 java_catalog_configs.insert("scope".to_owned(), scope.clone());
334 }
335 if let Some(rest_signing_region) = &self.rest_signing_region {
336 java_catalog_configs.insert(
337 "rest.signing-region".to_owned(),
338 rest_signing_region.clone(),
339 );
340 }
341 if let Some(rest_signing_name) = &self.rest_signing_name {
342 java_catalog_configs
343 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
344 }
345 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
346 java_catalog_configs.insert(
347 "rest.sigv4-enabled".to_owned(),
348 rest_sigv4_enabled.to_string(),
349 );
350
351 if let Some(access_key) = &self.access_key {
352 java_catalog_configs
353 .insert("rest.access-key-id".to_owned(), access_key.clone());
354 }
355
356 if let Some(secret_key) = &self.secret_key {
357 java_catalog_configs
358 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
359 }
360 }
361 }
362 Some("glue") => {
363 if !enable_config_load {
364 java_catalog_configs.insert(
365 "client.credentials-provider".to_owned(),
366 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
367 );
368 if let Some(access_key) = &self.access_key {
371 java_catalog_configs.insert(
372 "client.credentials-provider.glue.access-key-id".to_owned(),
373 access_key.clone(),
374 );
375 }
376 if let Some(secret_key) = &self.secret_key {
377 java_catalog_configs.insert(
378 "client.credentials-provider.glue.secret-access-key".to_owned(),
379 secret_key.clone(),
380 );
381 }
382 }
383
384 if let Some(region) = &self.region {
385 java_catalog_configs.insert("client.region".to_owned(), region.clone());
386 java_catalog_configs.insert(
387 "glue.endpoint".to_owned(),
388 format!("https://glue.{}.amazonaws.com", region),
389 );
390 }
391
392 if let Some(glue_id) = self.glue_id.as_deref() {
393 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
394 }
395 }
396 _ => {}
397 }
398 }
399
400 Ok((file_io_props, java_catalog_configs))
401 }
402}
403
404impl IcebergCommon {
405 pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
406 let ret = if let Some(database_name) = &self.database_name {
407 TableIdent::from_strs(vec![database_name, &self.table_name])
408 } else {
409 TableIdent::from_strs(vec![&self.table_name])
410 };
411
412 Ok(ret.context("Failed to create table identifier")?)
413 }
414
415 pub async fn create_catalog(
417 &self,
418 java_catalog_props: &HashMap<String, String>,
419 ) -> ConnectorResult<Arc<dyn Catalog>> {
420 match self.catalog_type() {
421 "storage" => {
422 let warehouse = self
423 .warehouse_path
424 .clone()
425 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
426 let url = Url::parse(warehouse.as_ref())
427 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
428
429 let config = match url.scheme() {
430 "s3" | "s3a" => StorageCatalogConfig::S3(
431 storage_catalog::StorageCatalogS3Config::builder()
432 .warehouse(warehouse)
433 .access_key(self.access_key.clone())
434 .secret_key(self.secret_key.clone())
435 .region(self.region.clone())
436 .endpoint(self.endpoint.clone())
437 .enable_config_load(Some(self.enable_config_load()))
438 .build(),
439 ),
440 "gs" | "gcs" => StorageCatalogConfig::Gcs(
441 storage_catalog::StorageCatalogGcsConfig::builder()
442 .warehouse(warehouse)
443 .credential(self.gcs_credential.clone())
444 .enable_config_load(Some(self.enable_config_load()))
445 .build(),
446 ),
447 "azblob" => StorageCatalogConfig::Azblob(
448 storage_catalog::StorageCatalogAzblobConfig::builder()
449 .warehouse(warehouse)
450 .account_name(self.azblob_account_name.clone())
451 .account_key(self.azblob_account_key.clone())
452 .endpoint(self.azblob_endpoint_url.clone())
453 .build(),
454 ),
455 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
456 };
457
458 let catalog = storage_catalog::StorageCatalog::new(config)?;
459 Ok(Arc::new(catalog))
460 }
461 "rest_rust" => {
462 let mut iceberg_configs = HashMap::new();
463
464 if let Some(gcs_credential) = &self.gcs_credential {
466 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
467 } else {
468 if let Some(region) = &self.region {
469 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
470 }
471 if let Some(endpoint) = &self.endpoint {
472 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
473 }
474 if let Some(access_key) = &self.access_key {
475 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
476 }
477 if let Some(secret_key) = &self.secret_key {
478 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
479 }
480 };
481
482 if let Some(credential) = &self.credential {
483 iceberg_configs.insert("credential".to_owned(), credential.clone());
484 }
485 if let Some(token) = &self.token {
486 iceberg_configs.insert("token".to_owned(), token.clone());
487 }
488 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
489 iceberg_configs
490 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
491 }
492 if let Some(scope) = &self.scope {
493 iceberg_configs.insert("scope".to_owned(), scope.clone());
494 }
495
496 let config_builder =
497 iceberg_catalog_rest::RestCatalogConfig::builder()
498 .uri(self.catalog_uri.clone().with_context(|| {
499 "`catalog.uri` must be set in rest catalog".to_owned()
500 })?)
501 .props(iceberg_configs);
502
503 let config = match &self.warehouse_path {
504 Some(warehouse_path) => {
505 config_builder.warehouse(warehouse_path.clone()).build()
506 }
507 None => config_builder.build(),
508 };
509 let catalog = iceberg_catalog_rest::RestCatalog::new(config);
510 Ok(Arc::new(catalog))
511 }
512 "glue_rust" => {
513 let mut iceberg_configs = HashMap::new();
514 if let Some(region) = &self.region {
516 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
517 }
518 if let Some(access_key) = &self.access_key {
519 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
520 }
521 if let Some(secret_key) = &self.secret_key {
522 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
523 }
524 if let Some(region) = &self.region {
526 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
527 }
528 if let Some(endpoint) = &self.endpoint {
529 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
530 }
531 if let Some(access_key) = &self.access_key {
532 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
533 }
534 if let Some(secret_key) = &self.secret_key {
535 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
536 }
537 let config_builder =
538 iceberg_catalog_glue::GlueCatalogConfig::builder()
539 .warehouse(self.warehouse_path.clone().ok_or_else(|| {
540 anyhow!("`warehouse.path` must be set in glue catalog")
541 })?)
542 .props(iceberg_configs);
543 let config = if let Some(uri) = self.catalog_uri.as_deref() {
544 config_builder.uri(uri.to_owned()).build()
545 } else {
546 config_builder.build()
547 };
548 let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
549 Ok(Arc::new(catalog))
550 }
551 catalog_type
552 if catalog_type == "hive"
553 || catalog_type == "snowflake"
554 || catalog_type == "jdbc"
555 || catalog_type == "rest"
556 || catalog_type == "glue" =>
557 {
558 let (file_io_props, java_catalog_props) =
560 self.build_jni_catalog_configs(java_catalog_props)?;
561 let catalog_impl = match catalog_type {
562 "hive" => "org.apache.iceberg.hive.HiveCatalog",
563 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
564 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
565 "rest" => "org.apache.iceberg.rest.RESTCatalog",
566 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
567 _ => unreachable!(),
568 };
569
570 jni_catalog::JniCatalog::build_catalog(
571 file_io_props,
572 self.catalog_name(),
573 catalog_impl,
574 java_catalog_props,
575 )
576 }
577 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
578 _ => {
579 bail!(
580 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
581 self.catalog_type()
582 )
583 }
584 }
585 }
586
587 pub async fn load_table(
589 &self,
590 java_catalog_props: &HashMap<String, String>,
591 ) -> ConnectorResult<Table> {
592 let catalog = self
593 .create_catalog(java_catalog_props)
594 .await
595 .context("Unable to load iceberg catalog")?;
596
597 let table_id = self
598 .full_table_name()
599 .context("Unable to parse table name")?;
600
601 catalog.load_table(&table_id).await.map_err(Into::into)
602 }
603}