1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56 #[serde(rename = "catalog.type")]
59 pub catalog_type: Option<String>,
60 #[serde(rename = "s3.region")]
61 pub s3_region: Option<String>,
62 #[serde(rename = "s3.endpoint")]
63 pub s3_endpoint: Option<String>,
64 #[serde(rename = "s3.access.key")]
65 pub s3_access_key: Option<String>,
66 #[serde(rename = "s3.secret.key")]
67 pub s3_secret_key: Option<String>,
68 #[serde(rename = "s3.iam_role_arn")]
69 pub s3_iam_role_arn: Option<String>,
70
71 #[serde(rename = "glue.access.key")]
72 pub glue_access_key: Option<String>,
73 #[serde(rename = "glue.secret.key")]
74 pub glue_secret_key: Option<String>,
75 #[serde(rename = "glue.iam_role_arn")]
76 pub glue_iam_role_arn: Option<String>,
77 #[serde(rename = "glue.region")]
78 pub glue_region: Option<String>,
79 #[serde(rename = "glue.id")]
82 pub glue_id: Option<String>,
83
84 #[serde(rename = "gcs.credential")]
85 pub gcs_credential: Option<String>,
86
87 #[serde(rename = "azblob.account_name")]
88 pub azblob_account_name: Option<String>,
89 #[serde(rename = "azblob.account_key")]
90 pub azblob_account_key: Option<String>,
91 #[serde(rename = "azblob.endpoint_url")]
92 pub azblob_endpoint_url: Option<String>,
93
94 #[serde(rename = "adlsgen2.account_name")]
95 pub adlsgen2_account_name: Option<String>,
96 #[serde(rename = "adlsgen2.account_key")]
97 pub adlsgen2_account_key: Option<String>,
98 #[serde(rename = "adlsgen2.endpoint")]
99 pub adlsgen2_endpoint: Option<String>,
100
101 #[serde(rename = "warehouse.path")]
103 pub warehouse_path: Option<String>,
104 #[serde(rename = "catalog.name")]
106 pub catalog_name: Option<String>,
107 #[serde(rename = "catalog.uri")]
109 pub catalog_uri: Option<String>,
110 #[serde(rename = "catalog.credential")]
113 pub catalog_credential: Option<String>,
114 #[serde(rename = "catalog.token")]
117 pub catalog_token: Option<String>,
118 #[serde(rename = "catalog.oauth2_server_uri")]
121 pub catalog_oauth2_server_uri: Option<String>,
122 #[serde(rename = "catalog.scope")]
125 pub catalog_scope: Option<String>,
126
127 #[serde(rename = "catalog.rest.signing_region")]
129 pub rest_signing_region: Option<String>,
130
131 #[serde(rename = "catalog.rest.signing_name")]
133 pub rest_signing_name: Option<String>,
134
135 #[serde(
137 rename = "catalog.rest.sigv4_enabled",
138 default,
139 deserialize_with = "deserialize_optional_bool_from_string"
140 )]
141 pub rest_sigv4_enabled: Option<bool>,
142
143 #[serde(
144 rename = "s3.path.style.access",
145 default,
146 deserialize_with = "deserialize_optional_bool_from_string"
147 )]
148 pub s3_path_style_access: Option<bool>,
149 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151 pub enable_config_load: Option<bool>,
152
153 #[serde(
155 rename = "hosted_catalog",
156 default,
157 deserialize_with = "deserialize_optional_bool_from_string"
158 )]
159 pub hosted_catalog: Option<bool>,
160
161 #[serde(rename = "catalog.header")]
170 pub catalog_header: Option<String>,
171
172 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
176 pub vended_credentials: Option<bool>,
177
178 #[serde(rename = "catalog.security")]
183 pub catalog_security: Option<String>,
184
185 #[serde(rename = "gcp.auth.scopes")]
190 pub gcp_auth_scopes: Option<String>,
191
192 #[serde(rename = "catalog.io_impl")]
201 pub catalog_io_impl: Option<String>,
202}
203
204const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
207const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
208const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
209 SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
210
211impl EnforceSecret for IcebergCommon {
212 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
213 "s3.access.key",
214 "s3.secret.key",
215 "gcs.credential",
216 "catalog.credential",
217 "catalog.token",
218 "catalog.oauth2_server_uri",
219 "adlsgen2.account_key",
220 "adlsgen2.client_secret",
221 "glue.access.key",
222 "glue.secret.key",
223 };
224}
225
226#[serde_as]
227#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
228#[serde(deny_unknown_fields)]
229pub struct IcebergTableIdentifier {
230 #[serde(rename = "database.name")]
231 pub database_name: Option<String>,
232 #[serde(rename = "table.name")]
235 pub table_name: String,
236}
237
238impl IcebergTableIdentifier {
239 pub fn database_name(&self) -> Option<&str> {
240 self.database_name.as_deref()
241 }
242
243 pub fn table_name(&self) -> &str {
244 &self.table_name
245 }
246
247 fn identifier_parts(&self) -> ConnectorResult<Vec<&str>> {
248 let mut parts = Vec::new();
249 if let Some(database_name) = &self.database_name {
250 parts.extend(database_name.split('.'));
251 }
252 parts.extend(self.table_name.split('.'));
253
254 if parts.iter().any(|part| part.is_empty()) {
255 bail!(
256 "Invalid iceberg table identifier '{}': identifier parts must not be empty",
257 self.full_identifier()
258 );
259 }
260
261 Ok(parts)
262 }
263
264 fn full_identifier(&self) -> String {
265 match &self.database_name {
266 Some(database_name) => format!("{}.{}", database_name, self.table_name),
267 None => self.table_name.clone(),
268 }
269 }
270
271 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
272 let ret = TableIdent::from_strs(self.identifier_parts()?);
273
274 Ok(ret.context("Failed to create table identifier")?)
275 }
276
277 pub fn validate(&self) -> ConnectorResult<()> {
278 self.identifier_parts().map(|_| ())
279 }
280}
281
282impl IcebergCommon {
283 pub fn catalog_type(&self) -> &str {
284 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
285 if self.vended_credentials() && catalog_type == "rest" {
286 "rest_rust"
287 } else {
288 catalog_type
289 }
290 }
291
292 pub fn vended_credentials(&self) -> bool {
293 self.vended_credentials.unwrap_or(false)
294 }
295
296 fn glue_access_key(&self) -> Option<&str> {
297 self.glue_access_key
298 .as_deref()
299 .or(self.s3_access_key.as_deref())
300 }
301
302 fn glue_secret_key(&self) -> Option<&str> {
303 self.glue_secret_key
304 .as_deref()
305 .or(self.s3_secret_key.as_deref())
306 }
307
308 fn glue_region(&self) -> Option<&str> {
309 self.glue_region.as_deref().or(self.s3_region.as_deref())
310 }
311
312 pub fn catalog_name(&self) -> String {
313 self.catalog_name
314 .as_ref()
315 .cloned()
316 .unwrap_or_else(|| "risingwave".to_owned())
317 }
318
319 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
320 let mut headers = HashMap::new();
321 let user_agent = match Deployment::current() {
322 Deployment::Ci => "RisingWave(CI)".to_owned(),
323 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
324 Deployment::Other => "RisingWave(OSS)".to_owned(),
325 };
326 if self.vended_credentials() {
327 headers.insert(
328 "X-Iceberg-Access-Delegation".to_owned(),
329 "vended-credentials".to_owned(),
330 );
331 }
332 headers.insert("User-Agent".to_owned(), user_agent);
333 if let Some(header) = &self.catalog_header {
334 for pair in header.split(';') {
335 let mut parts = pair.split('=');
336 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
337 headers.insert(key.to_owned(), value.to_owned());
338 } else {
339 bail!("Invalid header format: {}", pair);
340 }
341 }
342 }
343 Ok(headers)
344 }
345
346 pub fn enable_config_load(&self) -> bool {
347 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
349 if matches!(self.enable_config_load, Some(true)) {
350 tracing::warn!(
351 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
352 );
353 }
354 return false;
355 }
356 self.enable_config_load.unwrap_or(false)
357 }
358
359 fn build_jni_catalog_configs(
361 &self,
362 java_catalog_props: &HashMap<String, String>,
363 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
364 let mut iceberg_configs = HashMap::new();
365 let enable_config_load = self.enable_config_load();
366 let file_io_props = {
367 let catalog_type = self.catalog_type().to_owned();
368
369 if let Some(region) = &self.s3_region {
370 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
372 }
373
374 if let Some(endpoint) = &self.s3_endpoint {
375 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
377 }
378
379 if let Some(access_key) = &self.s3_access_key {
381 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
382 }
383 if let Some(secret_key) = &self.s3_secret_key {
384 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
385 }
386 if let Some(role_arn) = &self.s3_iam_role_arn {
387 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
388 }
389 if let Some(gcs_credential) = &self.gcs_credential {
390 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
391 if catalog_type != "rest" && catalog_type != "rest_rust" {
392 bail!("gcs unsupported in {} catalog", &catalog_type);
393 }
394 }
395
396 if let (
397 Some(azblob_account_name),
398 Some(azblob_account_key),
399 Some(azblob_endpoint_url),
400 ) = (
401 &self.azblob_account_name,
402 &self.azblob_account_key,
403 &self.azblob_endpoint_url,
404 ) {
405 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
406 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
407 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
408
409 if catalog_type != "rest" && catalog_type != "rest_rust" {
410 bail!("azblob unsupported in {} catalog", &catalog_type);
411 }
412 }
413
414 if let (Some(account_name), Some(account_key)) = (
415 self.adlsgen2_account_name.as_ref(),
416 self.adlsgen2_account_key.as_ref(),
417 ) {
418 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
419 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
420 if catalog_type != "rest" && catalog_type != "rest_rust" {
421 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
422 }
423 }
424
425 match &self.warehouse_path {
426 Some(warehouse_path) => {
427 let (bucket, _) = {
428 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
429 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
431 let url = Url::parse(warehouse_path);
432 if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
433 && (catalog_type == "rest" || catalog_type == "rest_rust")
434 {
435 (None, None)
441 } else {
442 let url = url.with_context(|| {
443 format!("Invalid warehouse path: {}", warehouse_path)
444 })?;
445 let bucket = url
446 .host_str()
447 .with_context(|| {
448 format!(
449 "Invalid s3 path: {}, bucket is missing",
450 warehouse_path
451 )
452 })?
453 .to_owned();
454 let root = url.path().trim_start_matches('/').to_owned();
455 (Some(bucket), Some(root))
456 }
457 };
458
459 if let Some(bucket) = bucket {
460 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
461 }
462 }
463 None => {
464 if catalog_type != "rest" && catalog_type != "rest_rust" {
465 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
466 }
467 }
468 }
469 iceberg_configs.insert(
470 S3_DISABLE_CONFIG_LOAD.to_owned(),
471 (!enable_config_load).to_string(),
472 );
473
474 iceberg_configs.insert(
475 GCS_DISABLE_CONFIG_LOAD.to_owned(),
476 (!enable_config_load).to_string(),
477 );
478
479 if let Some(path_style_access) = self.s3_path_style_access {
480 iceberg_configs.insert(
481 S3_PATH_STYLE_ACCESS.to_owned(),
482 path_style_access.to_string(),
483 );
484 }
485
486 iceberg_configs
487 };
488
489 let mut java_catalog_configs = HashMap::new();
491 {
492 if let Some(uri) = self.catalog_uri.as_deref() {
493 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
494 }
495
496 if let Some(warehouse_path) = &self.warehouse_path {
497 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
498 }
499 java_catalog_configs.extend(java_catalog_props.clone());
500
501 let io_impl = self
503 .catalog_io_impl
504 .clone()
505 .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
506 java_catalog_configs.insert("io-impl".to_owned(), io_impl);
507
508 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
510
511 if let Some(region) = &self.s3_region {
512 java_catalog_configs.insert("client.region".to_owned(), region.clone());
513 }
514 if let Some(endpoint) = &self.s3_endpoint {
515 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
516 }
517
518 if let Some(access_key) = &self.s3_access_key {
519 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
520 }
521 if let Some(secret_key) = &self.s3_secret_key {
522 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
523 }
524
525 if let Some(path_style_access) = &self.s3_path_style_access {
526 java_catalog_configs.insert(
527 "s3.path-style-access".to_owned(),
528 path_style_access.to_string(),
529 );
530 }
531
532 let headers = self.headers()?;
533 for (header_name, header_value) in headers {
534 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
535 }
536
537 match self.catalog_type() {
538 "rest" => {
539 if let Some(security) = &self.catalog_security {
541 match security.to_lowercase().as_str() {
542 "google" => {
543 java_catalog_configs.insert(
545 "rest.auth.type".to_owned(),
546 "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
547 );
548 if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
550 java_catalog_configs.insert(
551 "gcp.auth.scopes".to_owned(),
552 gcp_auth_scopes.clone(),
553 );
554 }
555 }
556 "oauth2" => {
557 if let Some(credential) = &self.catalog_credential {
559 java_catalog_configs
560 .insert("credential".to_owned(), credential.clone());
561 }
562 if let Some(token) = &self.catalog_token {
563 java_catalog_configs.insert("token".to_owned(), token.clone());
564 }
565 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
566 java_catalog_configs.insert(
567 "oauth2-server-uri".to_owned(),
568 oauth2_server_uri.clone(),
569 );
570 }
571 if let Some(scope) = &self.catalog_scope {
572 java_catalog_configs.insert("scope".to_owned(), scope.clone());
573 }
574 }
575 "none" | "" => {
576 }
578 _ => {
579 tracing::warn!(
580 "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
581 security
582 );
583 }
584 }
585 } else {
586 if let Some(credential) = &self.catalog_credential {
588 java_catalog_configs
589 .insert("credential".to_owned(), credential.clone());
590 }
591 if let Some(token) = &self.catalog_token {
592 java_catalog_configs.insert("token".to_owned(), token.clone());
593 }
594 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
595 java_catalog_configs
596 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
597 }
598 if let Some(scope) = &self.catalog_scope {
599 java_catalog_configs.insert("scope".to_owned(), scope.clone());
600 }
601 }
602 if let Some(rest_signing_region) = &self.rest_signing_region {
603 java_catalog_configs.insert(
604 "rest.signing-region".to_owned(),
605 rest_signing_region.clone(),
606 );
607 }
608 if let Some(rest_signing_name) = &self.rest_signing_name {
609 java_catalog_configs
610 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
611 }
612 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
613 java_catalog_configs.insert(
614 "rest.sigv4-enabled".to_owned(),
615 rest_sigv4_enabled.to_string(),
616 );
617
618 if let Some(access_key) = &self.s3_access_key {
619 java_catalog_configs
620 .insert("rest.access-key-id".to_owned(), access_key.clone());
621 }
622
623 if let Some(secret_key) = &self.s3_secret_key {
624 java_catalog_configs
625 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
626 }
627 }
628 }
629 "glue" => {
630 let glue_access_key = self.glue_access_key();
631 let glue_secret_key = self.glue_secret_key();
632 let has_glue_credentials =
633 glue_access_key.is_some() && glue_secret_key.is_some();
634 let should_configure_glue_provider = !enable_config_load
635 || has_glue_credentials
636 || self.glue_iam_role_arn.is_some();
637
638 if should_configure_glue_provider {
639 java_catalog_configs.insert(
640 "client.credentials-provider".to_owned(),
641 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
642 );
643 if let Some(region) = self.glue_region() {
644 java_catalog_configs.insert(
645 "client.credentials-provider.glue.region".to_owned(),
646 region.to_owned(),
647 );
648 }
649 if let Some(access_key) = glue_access_key {
650 java_catalog_configs.insert(
651 "client.credentials-provider.glue.access-key-id".to_owned(),
652 access_key.to_owned(),
653 );
654 }
655 if let Some(secret_key) = glue_secret_key {
656 java_catalog_configs.insert(
657 "client.credentials-provider.glue.secret-access-key".to_owned(),
658 secret_key.to_owned(),
659 );
660 }
661 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
662 java_catalog_configs.insert(
663 "client.credentials-provider.glue.iam-role-arn".to_owned(),
664 role_arn.to_owned(),
665 );
666 }
667 if enable_config_load && !has_glue_credentials {
668 java_catalog_configs.insert(
669 "client.credentials-provider.glue.use-default-credential-chain"
670 .to_owned(),
671 "true".to_owned(),
672 );
673 }
674 }
675
676 if let Some(region) = self.glue_region() {
677 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
678 java_catalog_configs.insert(
679 "glue.endpoint".to_owned(),
680 format!("https://glue.{}.amazonaws.com", region),
681 );
682 }
683
684 if let Some(glue_id) = self.glue_id.as_deref() {
685 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
686 }
687 self.apply_java_s3_file_io_assume_role_configs(&mut java_catalog_configs);
688 }
689 "jdbc" => {
690 self.apply_java_aws_client_assume_role_configs(&mut java_catalog_configs);
691 }
692 _ => {}
693 }
694 }
695
696 Ok((file_io_props, java_catalog_configs))
697 }
698
699 fn apply_java_s3_file_io_assume_role_configs(
700 &self,
701 java_catalog_configs: &mut HashMap<String, String>,
702 ) {
703 if let Some(iam_role_arn) = &self.s3_iam_role_arn {
704 java_catalog_configs.insert(
705 "s3.client-factory-impl".to_owned(),
706 "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory".to_owned(),
707 );
708 java_catalog_configs.insert("s3.iam-role-arn".to_owned(), iam_role_arn.clone());
709 }
710 }
711
712 fn apply_java_aws_client_assume_role_configs(
713 &self,
714 java_catalog_configs: &mut HashMap<String, String>,
715 ) {
716 if let Some(iam_role_arn) = &self.s3_iam_role_arn {
717 java_catalog_configs.insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
718 java_catalog_configs.insert(
719 "client.factory".to_owned(),
720 "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
721 );
722 if let Some(region) = &self.s3_region {
723 java_catalog_configs.insert("client.assume-role.region".to_owned(), region.clone());
724 }
725 }
726 }
727}
728
729impl IcebergCommon {
730 pub async fn create_catalog(
732 &self,
733 java_catalog_props: &HashMap<String, String>,
734 ) -> ConnectorResult<Arc<dyn Catalog>> {
735 match self.catalog_type() {
736 "storage" => {
737 let warehouse = self
738 .warehouse_path
739 .clone()
740 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
741 let url = Url::parse(warehouse.as_ref())
742 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
743
744 let config = match url.scheme() {
745 "s3" | "s3a" => StorageCatalogConfig::S3(
746 storage_catalog::StorageCatalogS3Config::builder()
747 .warehouse(warehouse)
748 .access_key(self.s3_access_key.clone())
749 .secret_key(self.s3_secret_key.clone())
750 .region(self.s3_region.clone())
751 .endpoint(self.s3_endpoint.clone())
752 .path_style_access(self.s3_path_style_access)
753 .enable_config_load(Some(self.enable_config_load()))
754 .build(),
755 ),
756 "gs" | "gcs" => StorageCatalogConfig::Gcs(
757 storage_catalog::StorageCatalogGcsConfig::builder()
758 .warehouse(warehouse)
759 .credential(self.gcs_credential.clone())
760 .enable_config_load(Some(self.enable_config_load()))
761 .build(),
762 ),
763 "azblob" => StorageCatalogConfig::Azblob(
764 storage_catalog::StorageCatalogAzblobConfig::builder()
765 .warehouse(warehouse)
766 .account_name(self.azblob_account_name.clone())
767 .account_key(self.azblob_account_key.clone())
768 .endpoint(self.azblob_endpoint_url.clone())
769 .build(),
770 ),
771 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
772 };
773
774 let catalog = storage_catalog::StorageCatalog::new(config)?;
775 Ok(Arc::new(catalog))
776 }
777 "rest_rust" => {
778 let mut iceberg_configs = HashMap::new();
779
780 if let Some(gcs_credential) = &self.gcs_credential {
782 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
783 } else {
784 if let Some(region) = &self.s3_region {
785 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
786 }
787 if let Some(endpoint) = &self.s3_endpoint {
788 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
789 }
790 if let Some(access_key) = &self.s3_access_key {
791 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
792 }
793 if let Some(secret_key) = &self.s3_secret_key {
794 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
795 }
796 if let Some(path_style_access) = &self.s3_path_style_access {
797 iceberg_configs.insert(
798 S3_PATH_STYLE_ACCESS.to_owned(),
799 path_style_access.to_string(),
800 );
801 }
802 };
803
804 if let Some(credential) = &self.catalog_credential {
805 iceberg_configs.insert("credential".to_owned(), credential.clone());
806 }
807 if let Some(token) = &self.catalog_token {
808 iceberg_configs.insert("token".to_owned(), token.clone());
809 }
810 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
811 iceberg_configs
812 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
813 }
814 if let Some(scope) = &self.catalog_scope {
815 iceberg_configs.insert("scope".to_owned(), scope.clone());
816 }
817
818 let headers = self.headers()?;
819 for (header_name, header_value) in headers {
820 iceberg_configs.insert(format!("header.{}", header_name), header_value);
821 }
822
823 iceberg_configs.insert(
824 iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
825 self.catalog_uri
826 .clone()
827 .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
828 );
829 if let Some(warehouse_path) = &self.warehouse_path {
830 iceberg_configs.insert(
831 iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
832 warehouse_path.clone(),
833 );
834 }
835 let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
836 .load("rest", iceberg_configs)
837 .await
838 .map_err(|e| anyhow!(IcebergError::from(e)))?;
839 Ok(Arc::new(catalog))
840 }
841 "glue_rust" => {
842 let mut iceberg_configs = HashMap::new();
843 if let Some(region) = self.glue_region() {
845 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
846 }
847 if let Some(access_key) = self.glue_access_key() {
848 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
849 }
850 if let Some(secret_key) = self.glue_secret_key() {
851 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
852 }
853 if let Some(region) = &self.s3_region {
855 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
856 }
857 if let Some(endpoint) = &self.s3_endpoint {
858 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
859 }
860 if let Some(access_key) = &self.s3_access_key {
861 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
862 }
863 if let Some(secret_key) = &self.s3_secret_key {
864 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
865 }
866 if let Some(role_arn) = &self.s3_iam_role_arn {
867 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
868 }
869 if let Some(path_style_access) = &self.s3_path_style_access {
870 iceberg_configs.insert(
871 S3_PATH_STYLE_ACCESS.to_owned(),
872 path_style_access.to_string(),
873 );
874 }
875 iceberg_configs.insert(
876 iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
877 self.warehouse_path
878 .clone()
879 .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
880 );
881 if let Some(uri) = self.catalog_uri.as_deref() {
882 iceberg_configs.insert(
883 iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
884 uri.to_owned(),
885 );
886 }
887 let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
888 .load("glue", iceberg_configs)
889 .await
890 .map_err(|e| anyhow!(IcebergError::from(e)))?;
891 Ok(Arc::new(catalog))
892 }
893 catalog_type
894 if catalog_type == "hive"
895 || catalog_type == "snowflake"
896 || catalog_type == "jdbc"
897 || catalog_type == "rest"
898 || catalog_type == "glue" =>
899 {
900 let (file_io_props, java_catalog_props) =
902 self.build_jni_catalog_configs(java_catalog_props)?;
903 let catalog_impl = match catalog_type {
904 "hive" => "org.apache.iceberg.hive.HiveCatalog",
905 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
906 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
907 "rest" => "org.apache.iceberg.rest.RESTCatalog",
908 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
909 _ => unreachable!(),
910 };
911
912 jni_catalog::JniCatalog::build_catalog(
913 file_io_props,
914 self.catalog_name(),
915 catalog_impl,
916 java_catalog_props,
917 )
918 }
919 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
920 _ => {
921 bail!(
922 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
923 self.catalog_type()
924 )
925 }
926 }
927 }
928
929 pub async fn load_table(
931 &self,
932 table: &IcebergTableIdentifier,
933 java_catalog_props: &HashMap<String, String>,
934 ) -> ConnectorResult<Table> {
935 let catalog = self
936 .create_catalog(java_catalog_props)
937 .await
938 .context("Unable to load iceberg catalog")?;
939
940 let table_id = table
941 .to_table_ident()
942 .context("Unable to parse table name")?;
943
944 let table = catalog.load_table(&table_id).await?;
945 Ok(rebuild_table_with_shared_cache(table).await)
946 }
947}
948
949pub(crate) async fn shared_object_cache(
951 init_object_cache: Arc<ObjectCache>,
952 table_uuid: Uuid,
953) -> Arc<ObjectCache> {
954 static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
955 MokaCache::builder()
956 .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
957 .build()
958 });
959
960 CACHE
961 .get_with(table_uuid, async { init_object_cache })
962 .await
963}
964
965pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
966 let table_uuid = table.metadata().uuid();
967 let init_object_cache = table.object_cache();
968 let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
969 table.with_object_cache(object_cache)
970}
971
972#[cfg(test)]
973mod tests {
974 use std::collections::HashMap;
975
976 use super::*;
977
978 fn test_common(catalog_type: &str) -> IcebergCommon {
979 IcebergCommon {
980 catalog_type: Some(catalog_type.to_owned()),
981 s3_region: Some("ap-southeast-2".to_owned()),
982 s3_endpoint: None,
983 s3_access_key: None,
984 s3_secret_key: None,
985 s3_iam_role_arn: None,
986 glue_access_key: None,
987 glue_secret_key: None,
988 glue_iam_role_arn: None,
989 glue_region: None,
990 glue_id: None,
991 gcs_credential: None,
992 azblob_account_name: None,
993 azblob_account_key: None,
994 azblob_endpoint_url: None,
995 adlsgen2_account_name: None,
996 adlsgen2_account_key: None,
997 adlsgen2_endpoint: None,
998 warehouse_path: Some("s3://bucket/warehouse".to_owned()),
999 catalog_name: None,
1000 catalog_uri: None,
1001 catalog_credential: None,
1002 catalog_token: None,
1003 catalog_oauth2_server_uri: None,
1004 catalog_scope: None,
1005 rest_signing_region: None,
1006 rest_signing_name: None,
1007 rest_sigv4_enabled: None,
1008 s3_path_style_access: None,
1009 enable_config_load: None,
1010 hosted_catalog: None,
1011 catalog_header: None,
1012 vended_credentials: None,
1013 catalog_security: None,
1014 gcp_auth_scopes: None,
1015 catalog_io_impl: None,
1016 }
1017 }
1018
1019 #[test]
1020 fn test_glue_jni_catalog_uses_s3_assume_role_for_file_io() {
1021 let common = IcebergCommon {
1022 s3_iam_role_arn: Some("arn:aws:iam::123456789012:role/risingwave-s3".to_owned()),
1023 ..test_common("glue")
1024 };
1025
1026 let (_, java_catalog_configs) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1027
1028 assert_eq!(
1029 java_catalog_configs.get("s3.client-factory-impl").unwrap(),
1030 "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory"
1031 );
1032 assert_eq!(
1033 java_catalog_configs.get("s3.iam-role-arn").unwrap(),
1034 "arn:aws:iam::123456789012:role/risingwave-s3"
1035 );
1036 assert!(!java_catalog_configs.contains_key("client.factory"));
1037 }
1038
1039 #[test]
1040 fn test_iceberg_table_identifier_validation() {
1041 let valid_identifier = IcebergTableIdentifier {
1042 database_name: Some("valid_db".to_owned()),
1043 table_name: "test_table".to_owned(),
1044 };
1045 assert!(valid_identifier.validate().is_ok());
1046
1047 let valid_underscore = IcebergTableIdentifier {
1048 database_name: Some("valid_db_name".to_owned()),
1049 table_name: "test_table".to_owned(),
1050 };
1051 assert!(valid_underscore.validate().is_ok());
1052
1053 let no_database = IcebergTableIdentifier {
1054 database_name: None,
1055 table_name: "test_table".to_owned(),
1056 };
1057 assert!(no_database.validate().is_ok());
1058
1059 let empty_part = IcebergTableIdentifier {
1060 database_name: Some("a..b".to_owned()),
1061 table_name: "test_table".to_owned(),
1062 };
1063 let result = empty_part.validate();
1064 assert!(result.is_err());
1065 assert!(
1066 result
1067 .unwrap_err()
1068 .to_string()
1069 .contains("identifier parts must not be empty")
1070 );
1071
1072 let leading_dot = IcebergTableIdentifier {
1073 database_name: None,
1074 table_name: ".test_table".to_owned(),
1075 };
1076 let result = leading_dot.validate();
1077 assert!(result.is_err());
1078 assert!(
1079 result
1080 .unwrap_err()
1081 .to_string()
1082 .contains("identifier parts must not be empty")
1083 );
1084 }
1085
1086 #[test]
1087 fn test_iceberg_table_identifier_dots_as_namespace_separators() {
1088 let table_ident = IcebergTableIdentifier {
1089 database_name: Some("general.zia.stats".to_owned()),
1090 table_name: "tagged_security_transactions".to_owned(),
1091 }
1092 .to_table_ident()
1093 .unwrap();
1094 let namespace: Vec<_> = table_ident
1095 .namespace()
1096 .as_ref()
1097 .iter()
1098 .map(String::as_str)
1099 .collect();
1100 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1101 assert_eq!(table_ident.name(), "tagged_security_transactions");
1102
1103 let table_ident = IcebergTableIdentifier {
1104 database_name: Some("general".to_owned()),
1105 table_name: "zia.stats.tagged_security_transactions".to_owned(),
1106 }
1107 .to_table_ident()
1108 .unwrap();
1109 let namespace: Vec<_> = table_ident
1110 .namespace()
1111 .as_ref()
1112 .iter()
1113 .map(String::as_str)
1114 .collect();
1115 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1116 assert_eq!(table_ident.name(), "tagged_security_transactions");
1117
1118 let table_ident = IcebergTableIdentifier {
1119 database_name: None,
1120 table_name: "general.zia.stats.tagged_security_transactions".to_owned(),
1121 }
1122 .to_table_ident()
1123 .unwrap();
1124 let namespace: Vec<_> = table_ident
1125 .namespace()
1126 .as_ref()
1127 .iter()
1128 .map(String::as_str)
1129 .collect();
1130 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1131 assert_eq!(table_ident.name(), "tagged_security_transactions");
1132 }
1133}