1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
24use ::iceberg::table::Table;
25use ::iceberg::{Catalog, TableIdent};
26use anyhow::{Context, anyhow};
27use iceberg::io::{
28 AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
29 GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
30};
31use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
32use phf::{Set, phf_set};
33use risingwave_common::bail;
34use risingwave_common::util::env_var::env_var_is_true;
35use serde::Deserialize;
36use serde_with::serde_as;
37use url::Url;
38use with_options::WithOptions;
39
40use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
41use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
42use crate::deserialize_optional_bool_from_string;
43use crate::enforce_secret::EnforceSecret;
44use crate::error::ConnectorResult;
45
46#[serde_as]
47#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
48pub struct IcebergCommon {
49 #[serde(rename = "catalog.type")]
52 pub catalog_type: Option<String>,
53 #[serde(rename = "s3.region")]
54 pub region: Option<String>,
55 #[serde(rename = "s3.endpoint")]
56 pub endpoint: Option<String>,
57 #[serde(rename = "s3.access.key")]
58 pub access_key: Option<String>,
59 #[serde(rename = "s3.secret.key")]
60 pub secret_key: Option<String>,
61
62 #[serde(rename = "gcs.credential")]
63 pub gcs_credential: Option<String>,
64
65 #[serde(rename = "azblob.account_name")]
66 pub azblob_account_name: Option<String>,
67 #[serde(rename = "azblob.account_key")]
68 pub azblob_account_key: Option<String>,
69 #[serde(rename = "azblob.endpoint_url")]
70 pub azblob_endpoint_url: Option<String>,
71
72 #[serde(rename = "warehouse.path")]
74 pub warehouse_path: Option<String>,
75 #[serde(rename = "glue.id")]
78 pub glue_id: Option<String>,
79 #[serde(rename = "catalog.name")]
81 pub catalog_name: Option<String>,
82 #[serde(rename = "catalog.uri")]
84 pub catalog_uri: Option<String>,
85 #[serde(rename = "database.name")]
86 pub database_name: Option<String>,
87 #[serde(rename = "table.name")]
89 pub table_name: String,
90 #[serde(rename = "catalog.credential")]
93 pub credential: Option<String>,
94 #[serde(rename = "catalog.token")]
97 pub token: Option<String>,
98 #[serde(rename = "catalog.oauth2_server_uri")]
101 pub oauth2_server_uri: Option<String>,
102 #[serde(rename = "catalog.scope")]
105 pub scope: Option<String>,
106
107 #[serde(rename = "catalog.rest.signing_region")]
109 pub rest_signing_region: Option<String>,
110
111 #[serde(rename = "catalog.rest.signing_name")]
113 pub rest_signing_name: Option<String>,
114
115 #[serde(
117 rename = "catalog.rest.sigv4_enabled",
118 default,
119 deserialize_with = "deserialize_optional_bool_from_string"
120 )]
121 pub rest_sigv4_enabled: Option<bool>,
122
123 #[serde(
124 rename = "s3.path.style.access",
125 default,
126 deserialize_with = "deserialize_optional_bool_from_string"
127 )]
128 pub path_style_access: Option<bool>,
129 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
131 pub enable_config_load: Option<bool>,
132
133 #[serde(
135 rename = "hosted_catalog",
136 default,
137 deserialize_with = "deserialize_optional_bool_from_string"
138 )]
139 pub hosted_catalog: Option<bool>,
140
141 #[serde(rename = "catalog.header")]
148 pub header: Option<String>,
149}
150
151impl EnforceSecret for IcebergCommon {
152 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
153 "s3.access.key",
154 "s3.secret.key",
155 "gcs.credential",
156 "catalog.credential",
157 "catalog.token",
158 "catalog.oauth2_server_uri",
159 };
160}
161
162impl IcebergCommon {
163 pub fn catalog_type(&self) -> &str {
164 self.catalog_type.as_deref().unwrap_or("storage")
165 }
166
167 pub fn catalog_name(&self) -> String {
168 self.catalog_name
169 .as_ref()
170 .cloned()
171 .unwrap_or_else(|| "risingwave".to_owned())
172 }
173
174 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
175 let mut headers = HashMap::new();
176 headers.insert("User-Agent".to_owned(), "RisingWave".to_owned());
177 if let Some(header) = &self.header {
178 for pair in header.split(';') {
179 let mut parts = pair.split('=');
180 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
181 headers.insert(key.to_owned(), value.to_owned());
182 } else {
183 bail!("Invalid header format: {}", pair);
184 }
185 }
186 }
187 Ok(headers)
188 }
189
190 pub fn enable_config_load(&self) -> bool {
191 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
193 return false;
194 }
195 self.enable_config_load.unwrap_or(false)
196 }
197
198 fn build_jni_catalog_configs(
200 &self,
201 java_catalog_props: &HashMap<String, String>,
202 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
203 let mut iceberg_configs = HashMap::new();
204 let enable_config_load = self.enable_config_load();
205 let file_io_props = {
206 let catalog_type = self.catalog_type().to_owned();
207
208 if let Some(region) = &self.region {
209 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
211 }
212
213 if let Some(endpoint) = &self.endpoint {
214 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
216 }
217
218 if let Some(access_key) = &self.access_key {
220 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
221 }
222 if let Some(secret_key) = &self.secret_key {
223 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
224 }
225 if let Some(gcs_credential) = &self.gcs_credential {
226 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
227 if catalog_type != "rest" && catalog_type != "rest_rust" {
228 bail!("gcs unsupported in {} catalog", &catalog_type);
229 }
230 }
231
232 if let (
233 Some(azblob_account_name),
234 Some(azblob_account_key),
235 Some(azblob_endpoint_url),
236 ) = (
237 &self.azblob_account_name,
238 &self.azblob_account_key,
239 &self.azblob_endpoint_url,
240 ) {
241 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
242 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
243 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
244
245 if catalog_type != "rest" && catalog_type != "rest_rust" {
246 bail!("azblob unsupported in {} catalog", &catalog_type);
247 }
248 }
249
250 match &self.warehouse_path {
251 Some(warehouse_path) => {
252 let (bucket, _) = {
253 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
254 let url = Url::parse(warehouse_path);
255 if (url.is_err() || is_s3_tables)
256 && (catalog_type == "rest" || catalog_type == "rest_rust")
257 {
258 (None, None)
262 } else {
263 let url = url.with_context(|| {
264 format!("Invalid warehouse path: {}", warehouse_path)
265 })?;
266 let bucket = url
267 .host_str()
268 .with_context(|| {
269 format!(
270 "Invalid s3 path: {}, bucket is missing",
271 warehouse_path
272 )
273 })?
274 .to_owned();
275 let root = url.path().trim_start_matches('/').to_owned();
276 (Some(bucket), Some(root))
277 }
278 };
279
280 if let Some(bucket) = bucket {
281 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
282 }
283 }
284 None => {
285 if catalog_type != "rest" && catalog_type != "rest_rust" {
286 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
287 }
288 }
289 }
290 iceberg_configs.insert(
291 S3_DISABLE_CONFIG_LOAD.to_owned(),
292 (!enable_config_load).to_string(),
293 );
294
295 iceberg_configs.insert(
296 GCS_DISABLE_CONFIG_LOAD.to_owned(),
297 (!enable_config_load).to_string(),
298 );
299
300 if let Some(path_style_access) = self.path_style_access {
301 iceberg_configs.insert(
302 S3_PATH_STYLE_ACCESS.to_owned(),
303 path_style_access.to_string(),
304 );
305 }
306
307 iceberg_configs
308 };
309
310 let mut java_catalog_configs = HashMap::new();
312 {
313 if let Some(uri) = self.catalog_uri.as_deref() {
314 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
315 }
316
317 if let Some(warehouse_path) = &self.warehouse_path {
318 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
319 }
320 java_catalog_configs.extend(java_catalog_props.clone());
321
322 java_catalog_configs.insert(
324 "io-impl".to_owned(),
325 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
326 );
327
328 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
330
331 if let Some(region) = &self.region {
332 java_catalog_configs.insert("client.region".to_owned(), region.clone());
333 }
334 if let Some(endpoint) = &self.endpoint {
335 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
336 }
337
338 if let Some(access_key) = &self.access_key {
339 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
340 }
341 if let Some(secret_key) = &self.secret_key {
342 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
343 }
344
345 if let Some(path_style_access) = &self.path_style_access {
346 java_catalog_configs.insert(
347 "s3.path-style-access".to_owned(),
348 path_style_access.to_string(),
349 );
350 }
351
352 let headers = self.headers()?;
353 for (header_name, header_value) in headers {
354 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
355 }
356
357 match self.catalog_type.as_deref() {
358 Some("rest") => {
359 if let Some(credential) = &self.credential {
360 java_catalog_configs.insert("credential".to_owned(), credential.clone());
361 }
362 if let Some(token) = &self.token {
363 java_catalog_configs.insert("token".to_owned(), token.clone());
364 }
365 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
366 java_catalog_configs
367 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
368 }
369 if let Some(scope) = &self.scope {
370 java_catalog_configs.insert("scope".to_owned(), scope.clone());
371 }
372 if let Some(rest_signing_region) = &self.rest_signing_region {
373 java_catalog_configs.insert(
374 "rest.signing-region".to_owned(),
375 rest_signing_region.clone(),
376 );
377 }
378 if let Some(rest_signing_name) = &self.rest_signing_name {
379 java_catalog_configs
380 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
381 }
382 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
383 java_catalog_configs.insert(
384 "rest.sigv4-enabled".to_owned(),
385 rest_sigv4_enabled.to_string(),
386 );
387
388 if let Some(access_key) = &self.access_key {
389 java_catalog_configs
390 .insert("rest.access-key-id".to_owned(), access_key.clone());
391 }
392
393 if let Some(secret_key) = &self.secret_key {
394 java_catalog_configs
395 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
396 }
397 }
398 }
399 Some("glue") => {
400 if !enable_config_load {
401 java_catalog_configs.insert(
402 "client.credentials-provider".to_owned(),
403 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
404 );
405 if let Some(access_key) = &self.access_key {
408 java_catalog_configs.insert(
409 "client.credentials-provider.glue.access-key-id".to_owned(),
410 access_key.clone(),
411 );
412 }
413 if let Some(secret_key) = &self.secret_key {
414 java_catalog_configs.insert(
415 "client.credentials-provider.glue.secret-access-key".to_owned(),
416 secret_key.clone(),
417 );
418 }
419 }
420
421 if let Some(region) = &self.region {
422 java_catalog_configs.insert("client.region".to_owned(), region.clone());
423 java_catalog_configs.insert(
424 "glue.endpoint".to_owned(),
425 format!("https://glue.{}.amazonaws.com", region),
426 );
427 }
428
429 if let Some(glue_id) = self.glue_id.as_deref() {
430 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
431 }
432 }
433 _ => {}
434 }
435 }
436
437 Ok((file_io_props, java_catalog_configs))
438 }
439}
440
441impl IcebergCommon {
442 pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
443 let ret = if let Some(database_name) = &self.database_name {
444 TableIdent::from_strs(vec![database_name, &self.table_name])
445 } else {
446 TableIdent::from_strs(vec![&self.table_name])
447 };
448
449 Ok(ret.context("Failed to create table identifier")?)
450 }
451
452 pub async fn create_catalog(
454 &self,
455 java_catalog_props: &HashMap<String, String>,
456 ) -> ConnectorResult<Arc<dyn Catalog>> {
457 match self.catalog_type() {
458 "storage" => {
459 let warehouse = self
460 .warehouse_path
461 .clone()
462 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
463 let url = Url::parse(warehouse.as_ref())
464 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
465
466 let config = match url.scheme() {
467 "s3" | "s3a" => StorageCatalogConfig::S3(
468 storage_catalog::StorageCatalogS3Config::builder()
469 .warehouse(warehouse)
470 .access_key(self.access_key.clone())
471 .secret_key(self.secret_key.clone())
472 .region(self.region.clone())
473 .endpoint(self.endpoint.clone())
474 .path_style_access(self.path_style_access)
475 .enable_config_load(Some(self.enable_config_load()))
476 .build(),
477 ),
478 "gs" | "gcs" => StorageCatalogConfig::Gcs(
479 storage_catalog::StorageCatalogGcsConfig::builder()
480 .warehouse(warehouse)
481 .credential(self.gcs_credential.clone())
482 .enable_config_load(Some(self.enable_config_load()))
483 .build(),
484 ),
485 "azblob" => StorageCatalogConfig::Azblob(
486 storage_catalog::StorageCatalogAzblobConfig::builder()
487 .warehouse(warehouse)
488 .account_name(self.azblob_account_name.clone())
489 .account_key(self.azblob_account_key.clone())
490 .endpoint(self.azblob_endpoint_url.clone())
491 .build(),
492 ),
493 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
494 };
495
496 let catalog = storage_catalog::StorageCatalog::new(config)?;
497 Ok(Arc::new(catalog))
498 }
499 "rest_rust" => {
500 let mut iceberg_configs = HashMap::new();
501
502 if let Some(gcs_credential) = &self.gcs_credential {
504 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
505 } else {
506 if let Some(region) = &self.region {
507 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
508 }
509 if let Some(endpoint) = &self.endpoint {
510 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
511 }
512 if let Some(access_key) = &self.access_key {
513 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
514 }
515 if let Some(secret_key) = &self.secret_key {
516 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
517 }
518 if let Some(path_style_access) = &self.path_style_access {
519 iceberg_configs.insert(
520 S3_PATH_STYLE_ACCESS.to_owned(),
521 path_style_access.to_string(),
522 );
523 }
524 };
525
526 if let Some(credential) = &self.credential {
527 iceberg_configs.insert("credential".to_owned(), credential.clone());
528 }
529 if let Some(token) = &self.token {
530 iceberg_configs.insert("token".to_owned(), token.clone());
531 }
532 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
533 iceberg_configs
534 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
535 }
536 if let Some(scope) = &self.scope {
537 iceberg_configs.insert("scope".to_owned(), scope.clone());
538 }
539
540 let headers = self.headers()?;
541 for (header_name, header_value) in headers {
542 iceberg_configs.insert(format!("header.{}", header_name), header_value);
543 }
544
545 let config_builder =
546 iceberg_catalog_rest::RestCatalogConfig::builder()
547 .uri(self.catalog_uri.clone().with_context(|| {
548 "`catalog.uri` must be set in rest catalog".to_owned()
549 })?)
550 .props(iceberg_configs);
551
552 let config = match &self.warehouse_path {
553 Some(warehouse_path) => {
554 config_builder.warehouse(warehouse_path.clone()).build()
555 }
556 None => config_builder.build(),
557 };
558 let catalog = iceberg_catalog_rest::RestCatalog::new(config);
559 Ok(Arc::new(catalog))
560 }
561 "glue_rust" => {
562 let mut iceberg_configs = HashMap::new();
563 if let Some(region) = &self.region {
565 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
566 }
567 if let Some(access_key) = &self.access_key {
568 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
569 }
570 if let Some(secret_key) = &self.secret_key {
571 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
572 }
573 if let Some(region) = &self.region {
575 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
576 }
577 if let Some(endpoint) = &self.endpoint {
578 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
579 }
580 if let Some(access_key) = &self.access_key {
581 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
582 }
583 if let Some(secret_key) = &self.secret_key {
584 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
585 }
586 if let Some(path_style_access) = &self.path_style_access {
587 iceberg_configs.insert(
588 S3_PATH_STYLE_ACCESS.to_owned(),
589 path_style_access.to_string(),
590 );
591 }
592 let config_builder =
593 iceberg_catalog_glue::GlueCatalogConfig::builder()
594 .warehouse(self.warehouse_path.clone().ok_or_else(|| {
595 anyhow!("`warehouse.path` must be set in glue catalog")
596 })?)
597 .props(iceberg_configs);
598 let config = if let Some(uri) = self.catalog_uri.as_deref() {
599 config_builder.uri(uri.to_owned()).build()
600 } else {
601 config_builder.build()
602 };
603 let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
604 Ok(Arc::new(catalog))
605 }
606 catalog_type
607 if catalog_type == "hive"
608 || catalog_type == "snowflake"
609 || catalog_type == "jdbc"
610 || catalog_type == "rest"
611 || catalog_type == "glue" =>
612 {
613 let (file_io_props, java_catalog_props) =
615 self.build_jni_catalog_configs(java_catalog_props)?;
616 let catalog_impl = match catalog_type {
617 "hive" => "org.apache.iceberg.hive.HiveCatalog",
618 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
619 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
620 "rest" => "org.apache.iceberg.rest.RESTCatalog",
621 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
622 _ => unreachable!(),
623 };
624
625 jni_catalog::JniCatalog::build_catalog(
626 file_io_props,
627 self.catalog_name(),
628 catalog_impl,
629 java_catalog_props,
630 )
631 }
632 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
633 _ => {
634 bail!(
635 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
636 self.catalog_type()
637 )
638 }
639 }
640 }
641
642 pub async fn load_table(
644 &self,
645 java_catalog_props: &HashMap<String, String>,
646 ) -> ConnectorResult<Table> {
647 let catalog = self
648 .create_catalog(java_catalog_props)
649 .await
650 .context("Unable to load iceberg catalog")?;
651
652 let table_id = self
653 .full_table_name()
654 .context("Unable to parse table name")?;
655
656 catalog.load_table(&table_id).await.map_err(Into::into)
657 }
658}