1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56 #[serde(rename = "catalog.type")]
59 pub catalog_type: Option<String>,
60 #[serde(rename = "s3.region")]
61 pub s3_region: Option<String>,
62 #[serde(rename = "s3.endpoint")]
63 pub s3_endpoint: Option<String>,
64 #[serde(rename = "s3.access.key")]
65 pub s3_access_key: Option<String>,
66 #[serde(rename = "s3.secret.key")]
67 pub s3_secret_key: Option<String>,
68 #[serde(rename = "s3.iam_role_arn")]
69 pub s3_iam_role_arn: Option<String>,
70
71 #[serde(rename = "glue.access.key")]
72 pub glue_access_key: Option<String>,
73 #[serde(rename = "glue.secret.key")]
74 pub glue_secret_key: Option<String>,
75 #[serde(rename = "glue.iam_role_arn")]
76 pub glue_iam_role_arn: Option<String>,
77 #[serde(rename = "glue.region")]
78 pub glue_region: Option<String>,
79 #[serde(rename = "glue.id")]
82 pub glue_id: Option<String>,
83
84 #[serde(rename = "gcs.credential")]
85 pub gcs_credential: Option<String>,
86
87 #[serde(rename = "azblob.account_name")]
88 pub azblob_account_name: Option<String>,
89 #[serde(rename = "azblob.account_key")]
90 pub azblob_account_key: Option<String>,
91 #[serde(rename = "azblob.endpoint_url")]
92 pub azblob_endpoint_url: Option<String>,
93
94 #[serde(rename = "adlsgen2.account_name")]
95 pub adlsgen2_account_name: Option<String>,
96 #[serde(rename = "adlsgen2.account_key")]
97 pub adlsgen2_account_key: Option<String>,
98 #[serde(rename = "adlsgen2.endpoint")]
99 pub adlsgen2_endpoint: Option<String>,
100
101 #[serde(rename = "warehouse.path")]
103 pub warehouse_path: Option<String>,
104 #[serde(rename = "catalog.name")]
106 pub catalog_name: Option<String>,
107 #[serde(rename = "catalog.uri")]
109 pub catalog_uri: Option<String>,
110 #[serde(rename = "catalog.credential")]
113 pub catalog_credential: Option<String>,
114 #[serde(rename = "catalog.token")]
117 pub catalog_token: Option<String>,
118 #[serde(rename = "catalog.oauth2_server_uri")]
121 pub catalog_oauth2_server_uri: Option<String>,
122 #[serde(rename = "catalog.scope")]
125 pub catalog_scope: Option<String>,
126
127 #[serde(rename = "catalog.rest.signing_region")]
129 pub rest_signing_region: Option<String>,
130
131 #[serde(rename = "catalog.rest.signing_name")]
133 pub rest_signing_name: Option<String>,
134
135 #[serde(
137 rename = "catalog.rest.sigv4_enabled",
138 default,
139 deserialize_with = "deserialize_optional_bool_from_string"
140 )]
141 pub rest_sigv4_enabled: Option<bool>,
142
143 #[serde(
144 rename = "s3.path.style.access",
145 default,
146 deserialize_with = "deserialize_optional_bool_from_string"
147 )]
148 pub s3_path_style_access: Option<bool>,
149 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151 pub enable_config_load: Option<bool>,
152
153 #[serde(
155 rename = "hosted_catalog",
156 default,
157 deserialize_with = "deserialize_optional_bool_from_string"
158 )]
159 pub hosted_catalog: Option<bool>,
160
161 #[serde(rename = "catalog.header")]
168 pub catalog_header: Option<String>,
169
170 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
172 pub vended_credentials: Option<bool>,
173
174 #[serde(rename = "catalog.security")]
179 pub catalog_security: Option<String>,
180
181 #[serde(rename = "gcp.auth.scopes")]
186 pub gcp_auth_scopes: Option<String>,
187
188 #[serde(rename = "catalog.io_impl")]
195 pub catalog_io_impl: Option<String>,
196}
197
198const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
201const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
202const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
203 SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
204
205impl EnforceSecret for IcebergCommon {
206 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
207 "s3.access.key",
208 "s3.secret.key",
209 "gcs.credential",
210 "catalog.credential",
211 "catalog.token",
212 "catalog.oauth2_server_uri",
213 "adlsgen2.account_key",
214 "adlsgen2.client_secret",
215 "glue.access.key",
216 "glue.secret.key",
217 };
218}
219
220#[serde_as]
221#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
222#[serde(deny_unknown_fields)]
223pub struct IcebergTableIdentifier {
224 #[serde(rename = "database.name")]
225 pub database_name: Option<String>,
226 #[serde(rename = "table.name")]
228 pub table_name: String,
229}
230
231impl IcebergTableIdentifier {
232 pub fn database_name(&self) -> Option<&str> {
233 self.database_name.as_deref()
234 }
235
236 pub fn table_name(&self) -> &str {
237 &self.table_name
238 }
239
240 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
241 let ret = if let Some(database_name) = &self.database_name {
242 TableIdent::from_strs(vec![database_name, &self.table_name])
243 } else {
244 TableIdent::from_strs(vec![&self.table_name])
245 };
246
247 Ok(ret.context("Failed to create table identifier")?)
248 }
249
250 pub fn validate(&self) -> ConnectorResult<()> {
251 if let Some(database_name) = &self.database_name
252 && database_name.contains('.')
253 {
254 bail!(
255 "Invalid database.name '{}': dots are not allowed in database names",
256 database_name
257 );
258 }
259 Ok(())
260 }
261}
262
263impl IcebergCommon {
264 pub fn catalog_type(&self) -> &str {
265 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
266 if self.vended_credentials() && catalog_type == "rest" {
267 "rest_rust"
268 } else {
269 catalog_type
270 }
271 }
272
273 pub fn vended_credentials(&self) -> bool {
274 self.vended_credentials.unwrap_or(false)
275 }
276
277 fn glue_access_key(&self) -> Option<&str> {
278 self.glue_access_key
279 .as_deref()
280 .or(self.s3_access_key.as_deref())
281 }
282
283 fn glue_secret_key(&self) -> Option<&str> {
284 self.glue_secret_key
285 .as_deref()
286 .or(self.s3_secret_key.as_deref())
287 }
288
289 fn glue_region(&self) -> Option<&str> {
290 self.glue_region.as_deref().or(self.s3_region.as_deref())
291 }
292
293 pub fn catalog_name(&self) -> String {
294 self.catalog_name
295 .as_ref()
296 .cloned()
297 .unwrap_or_else(|| "risingwave".to_owned())
298 }
299
300 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
301 let mut headers = HashMap::new();
302 let user_agent = match Deployment::current() {
303 Deployment::Ci => "RisingWave(CI)".to_owned(),
304 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
305 Deployment::Other => "RisingWave(OSS)".to_owned(),
306 };
307 if self.vended_credentials() {
308 headers.insert(
309 "X-Iceberg-Access-Delegation".to_owned(),
310 "vended-credentials".to_owned(),
311 );
312 }
313 headers.insert("User-Agent".to_owned(), user_agent);
314 if let Some(header) = &self.catalog_header {
315 for pair in header.split(';') {
316 let mut parts = pair.split('=');
317 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
318 headers.insert(key.to_owned(), value.to_owned());
319 } else {
320 bail!("Invalid header format: {}", pair);
321 }
322 }
323 }
324 Ok(headers)
325 }
326
327 pub fn enable_config_load(&self) -> bool {
328 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
330 if matches!(self.enable_config_load, Some(true)) {
331 tracing::warn!(
332 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
333 );
334 }
335 return false;
336 }
337 self.enable_config_load.unwrap_or(false)
338 }
339
340 fn build_jni_catalog_configs(
342 &self,
343 java_catalog_props: &HashMap<String, String>,
344 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
345 let mut iceberg_configs = HashMap::new();
346 let enable_config_load = self.enable_config_load();
347 let file_io_props = {
348 let catalog_type = self.catalog_type().to_owned();
349
350 if let Some(region) = &self.s3_region {
351 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
353 }
354
355 if let Some(endpoint) = &self.s3_endpoint {
356 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
358 }
359
360 if let Some(access_key) = &self.s3_access_key {
362 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
363 }
364 if let Some(secret_key) = &self.s3_secret_key {
365 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
366 }
367 if let Some(role_arn) = &self.s3_iam_role_arn {
368 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
369 }
370 if let Some(gcs_credential) = &self.gcs_credential {
371 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
372 if catalog_type != "rest" && catalog_type != "rest_rust" {
373 bail!("gcs unsupported in {} catalog", &catalog_type);
374 }
375 }
376
377 if let (
378 Some(azblob_account_name),
379 Some(azblob_account_key),
380 Some(azblob_endpoint_url),
381 ) = (
382 &self.azblob_account_name,
383 &self.azblob_account_key,
384 &self.azblob_endpoint_url,
385 ) {
386 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
387 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
388 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
389
390 if catalog_type != "rest" && catalog_type != "rest_rust" {
391 bail!("azblob unsupported in {} catalog", &catalog_type);
392 }
393 }
394
395 if let (Some(account_name), Some(account_key)) = (
396 self.adlsgen2_account_name.as_ref(),
397 self.adlsgen2_account_key.as_ref(),
398 ) {
399 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
400 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
401 if catalog_type != "rest" && catalog_type != "rest_rust" {
402 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
403 }
404 }
405
406 match &self.warehouse_path {
407 Some(warehouse_path) => {
408 let (bucket, _) = {
409 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
410 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
412 let url = Url::parse(warehouse_path);
413 if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
414 && (catalog_type == "rest" || catalog_type == "rest_rust")
415 {
416 (None, None)
422 } else {
423 let url = url.with_context(|| {
424 format!("Invalid warehouse path: {}", warehouse_path)
425 })?;
426 let bucket = url
427 .host_str()
428 .with_context(|| {
429 format!(
430 "Invalid s3 path: {}, bucket is missing",
431 warehouse_path
432 )
433 })?
434 .to_owned();
435 let root = url.path().trim_start_matches('/').to_owned();
436 (Some(bucket), Some(root))
437 }
438 };
439
440 if let Some(bucket) = bucket {
441 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
442 }
443 }
444 None => {
445 if catalog_type != "rest" && catalog_type != "rest_rust" {
446 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
447 }
448 }
449 }
450 iceberg_configs.insert(
451 S3_DISABLE_CONFIG_LOAD.to_owned(),
452 (!enable_config_load).to_string(),
453 );
454
455 iceberg_configs.insert(
456 GCS_DISABLE_CONFIG_LOAD.to_owned(),
457 (!enable_config_load).to_string(),
458 );
459
460 if let Some(path_style_access) = self.s3_path_style_access {
461 iceberg_configs.insert(
462 S3_PATH_STYLE_ACCESS.to_owned(),
463 path_style_access.to_string(),
464 );
465 }
466
467 iceberg_configs
468 };
469
470 let mut java_catalog_configs = HashMap::new();
472 {
473 if let Some(uri) = self.catalog_uri.as_deref() {
474 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
475 }
476
477 if let Some(warehouse_path) = &self.warehouse_path {
478 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
479 }
480 java_catalog_configs.extend(java_catalog_props.clone());
481
482 let io_impl = self
484 .catalog_io_impl
485 .clone()
486 .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
487 java_catalog_configs.insert("io-impl".to_owned(), io_impl);
488
489 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
491
492 if let Some(region) = &self.s3_region {
493 java_catalog_configs.insert("client.region".to_owned(), region.clone());
494 }
495 if let Some(endpoint) = &self.s3_endpoint {
496 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
497 }
498
499 if let Some(access_key) = &self.s3_access_key {
500 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
501 }
502 if let Some(secret_key) = &self.s3_secret_key {
503 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
504 }
505
506 if let Some(path_style_access) = &self.s3_path_style_access {
507 java_catalog_configs.insert(
508 "s3.path-style-access".to_owned(),
509 path_style_access.to_string(),
510 );
511 }
512
513 let headers = self.headers()?;
514 for (header_name, header_value) in headers {
515 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
516 }
517
518 match self.catalog_type() {
519 "rest" => {
520 if let Some(security) = &self.catalog_security {
522 match security.to_lowercase().as_str() {
523 "google" => {
524 java_catalog_configs.insert(
526 "rest.auth.type".to_owned(),
527 "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
528 );
529 if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
531 java_catalog_configs.insert(
532 "gcp.auth.scopes".to_owned(),
533 gcp_auth_scopes.clone(),
534 );
535 }
536 }
537 "oauth2" => {
538 if let Some(credential) = &self.catalog_credential {
540 java_catalog_configs
541 .insert("credential".to_owned(), credential.clone());
542 }
543 if let Some(token) = &self.catalog_token {
544 java_catalog_configs.insert("token".to_owned(), token.clone());
545 }
546 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
547 java_catalog_configs.insert(
548 "oauth2-server-uri".to_owned(),
549 oauth2_server_uri.clone(),
550 );
551 }
552 if let Some(scope) = &self.catalog_scope {
553 java_catalog_configs.insert("scope".to_owned(), scope.clone());
554 }
555 }
556 "none" | "" => {
557 }
559 _ => {
560 tracing::warn!(
561 "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
562 security
563 );
564 }
565 }
566 } else {
567 if let Some(credential) = &self.catalog_credential {
569 java_catalog_configs
570 .insert("credential".to_owned(), credential.clone());
571 }
572 if let Some(token) = &self.catalog_token {
573 java_catalog_configs.insert("token".to_owned(), token.clone());
574 }
575 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
576 java_catalog_configs
577 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
578 }
579 if let Some(scope) = &self.catalog_scope {
580 java_catalog_configs.insert("scope".to_owned(), scope.clone());
581 }
582 }
583 if let Some(rest_signing_region) = &self.rest_signing_region {
584 java_catalog_configs.insert(
585 "rest.signing-region".to_owned(),
586 rest_signing_region.clone(),
587 );
588 }
589 if let Some(rest_signing_name) = &self.rest_signing_name {
590 java_catalog_configs
591 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
592 }
593 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
594 java_catalog_configs.insert(
595 "rest.sigv4-enabled".to_owned(),
596 rest_sigv4_enabled.to_string(),
597 );
598
599 if let Some(access_key) = &self.s3_access_key {
600 java_catalog_configs
601 .insert("rest.access-key-id".to_owned(), access_key.clone());
602 }
603
604 if let Some(secret_key) = &self.s3_secret_key {
605 java_catalog_configs
606 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
607 }
608 }
609 }
610 "glue" => {
611 let glue_access_key = self.glue_access_key();
612 let glue_secret_key = self.glue_secret_key();
613 let has_glue_credentials =
614 glue_access_key.is_some() && glue_secret_key.is_some();
615 let should_configure_glue_provider = !enable_config_load
616 || has_glue_credentials
617 || self.glue_iam_role_arn.is_some();
618
619 if should_configure_glue_provider {
620 java_catalog_configs.insert(
621 "client.credentials-provider".to_owned(),
622 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
623 );
624 if let Some(region) = self.glue_region() {
625 java_catalog_configs.insert(
626 "client.credentials-provider.glue.region".to_owned(),
627 region.to_owned(),
628 );
629 }
630 if let Some(access_key) = glue_access_key {
631 java_catalog_configs.insert(
632 "client.credentials-provider.glue.access-key-id".to_owned(),
633 access_key.to_owned(),
634 );
635 }
636 if let Some(secret_key) = glue_secret_key {
637 java_catalog_configs.insert(
638 "client.credentials-provider.glue.secret-access-key".to_owned(),
639 secret_key.to_owned(),
640 );
641 }
642 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
643 java_catalog_configs.insert(
644 "client.credentials-provider.glue.iam-role-arn".to_owned(),
645 role_arn.to_owned(),
646 );
647 }
648 if enable_config_load && !has_glue_credentials {
649 java_catalog_configs.insert(
650 "client.credentials-provider.glue.use-default-credential-chain"
651 .to_owned(),
652 "true".to_owned(),
653 );
654 }
655 }
656
657 if let Some(region) = self.glue_region() {
658 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
659 java_catalog_configs.insert(
660 "glue.endpoint".to_owned(),
661 format!("https://glue.{}.amazonaws.com", region),
662 );
663 }
664
665 if let Some(glue_id) = self.glue_id.as_deref() {
666 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
667 }
668 }
669 "jdbc" => {
670 if let Some(iam_role_arn) = &self.s3_iam_role_arn {
671 java_catalog_configs
672 .insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
673 java_catalog_configs.insert(
674 "client.factory".to_owned(),
675 "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
676 );
677 if let Some(region) = &self.s3_region {
678 java_catalog_configs
679 .insert("client.assume-role.region".to_owned(), region.clone());
680 }
681 }
682 }
683 _ => {}
684 }
685 }
686
687 Ok((file_io_props, java_catalog_configs))
688 }
689}
690
691impl IcebergCommon {
692 pub async fn create_catalog(
694 &self,
695 java_catalog_props: &HashMap<String, String>,
696 ) -> ConnectorResult<Arc<dyn Catalog>> {
697 match self.catalog_type() {
698 "storage" => {
699 let warehouse = self
700 .warehouse_path
701 .clone()
702 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
703 let url = Url::parse(warehouse.as_ref())
704 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
705
706 let config = match url.scheme() {
707 "s3" | "s3a" => StorageCatalogConfig::S3(
708 storage_catalog::StorageCatalogS3Config::builder()
709 .warehouse(warehouse)
710 .access_key(self.s3_access_key.clone())
711 .secret_key(self.s3_secret_key.clone())
712 .region(self.s3_region.clone())
713 .endpoint(self.s3_endpoint.clone())
714 .path_style_access(self.s3_path_style_access)
715 .enable_config_load(Some(self.enable_config_load()))
716 .build(),
717 ),
718 "gs" | "gcs" => StorageCatalogConfig::Gcs(
719 storage_catalog::StorageCatalogGcsConfig::builder()
720 .warehouse(warehouse)
721 .credential(self.gcs_credential.clone())
722 .enable_config_load(Some(self.enable_config_load()))
723 .build(),
724 ),
725 "azblob" => StorageCatalogConfig::Azblob(
726 storage_catalog::StorageCatalogAzblobConfig::builder()
727 .warehouse(warehouse)
728 .account_name(self.azblob_account_name.clone())
729 .account_key(self.azblob_account_key.clone())
730 .endpoint(self.azblob_endpoint_url.clone())
731 .build(),
732 ),
733 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
734 };
735
736 let catalog = storage_catalog::StorageCatalog::new(config)?;
737 Ok(Arc::new(catalog))
738 }
739 "rest_rust" => {
740 let mut iceberg_configs = HashMap::new();
741
742 if let Some(gcs_credential) = &self.gcs_credential {
744 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
745 } else {
746 if let Some(region) = &self.s3_region {
747 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
748 }
749 if let Some(endpoint) = &self.s3_endpoint {
750 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
751 }
752 if let Some(access_key) = &self.s3_access_key {
753 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
754 }
755 if let Some(secret_key) = &self.s3_secret_key {
756 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
757 }
758 if let Some(path_style_access) = &self.s3_path_style_access {
759 iceberg_configs.insert(
760 S3_PATH_STYLE_ACCESS.to_owned(),
761 path_style_access.to_string(),
762 );
763 }
764 };
765
766 if let Some(credential) = &self.catalog_credential {
767 iceberg_configs.insert("credential".to_owned(), credential.clone());
768 }
769 if let Some(token) = &self.catalog_token {
770 iceberg_configs.insert("token".to_owned(), token.clone());
771 }
772 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
773 iceberg_configs
774 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
775 }
776 if let Some(scope) = &self.catalog_scope {
777 iceberg_configs.insert("scope".to_owned(), scope.clone());
778 }
779
780 let headers = self.headers()?;
781 for (header_name, header_value) in headers {
782 iceberg_configs.insert(format!("header.{}", header_name), header_value);
783 }
784
785 iceberg_configs.insert(
786 iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
787 self.catalog_uri
788 .clone()
789 .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
790 );
791 if let Some(warehouse_path) = &self.warehouse_path {
792 iceberg_configs.insert(
793 iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
794 warehouse_path.clone(),
795 );
796 }
797 let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
798 .load("rest", iceberg_configs)
799 .await
800 .map_err(|e| anyhow!(IcebergError::from(e)))?;
801 Ok(Arc::new(catalog))
802 }
803 "glue_rust" => {
804 let mut iceberg_configs = HashMap::new();
805 if let Some(region) = self.glue_region() {
807 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
808 }
809 if let Some(access_key) = self.glue_access_key() {
810 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
811 }
812 if let Some(secret_key) = self.glue_secret_key() {
813 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
814 }
815 if let Some(region) = &self.s3_region {
817 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
818 }
819 if let Some(endpoint) = &self.s3_endpoint {
820 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
821 }
822 if let Some(access_key) = &self.s3_access_key {
823 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
824 }
825 if let Some(secret_key) = &self.s3_secret_key {
826 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
827 }
828 if let Some(role_arn) = &self.s3_iam_role_arn {
829 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
830 }
831 if let Some(path_style_access) = &self.s3_path_style_access {
832 iceberg_configs.insert(
833 S3_PATH_STYLE_ACCESS.to_owned(),
834 path_style_access.to_string(),
835 );
836 }
837 iceberg_configs.insert(
838 iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
839 self.warehouse_path
840 .clone()
841 .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
842 );
843 if let Some(uri) = self.catalog_uri.as_deref() {
844 iceberg_configs.insert(
845 iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
846 uri.to_owned(),
847 );
848 }
849 let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
850 .load("glue", iceberg_configs)
851 .await
852 .map_err(|e| anyhow!(IcebergError::from(e)))?;
853 Ok(Arc::new(catalog))
854 }
855 catalog_type
856 if catalog_type == "hive"
857 || catalog_type == "snowflake"
858 || catalog_type == "jdbc"
859 || catalog_type == "rest"
860 || catalog_type == "glue" =>
861 {
862 let (file_io_props, java_catalog_props) =
864 self.build_jni_catalog_configs(java_catalog_props)?;
865 let catalog_impl = match catalog_type {
866 "hive" => "org.apache.iceberg.hive.HiveCatalog",
867 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
868 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
869 "rest" => "org.apache.iceberg.rest.RESTCatalog",
870 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
871 _ => unreachable!(),
872 };
873
874 jni_catalog::JniCatalog::build_catalog(
875 file_io_props,
876 self.catalog_name(),
877 catalog_impl,
878 java_catalog_props,
879 )
880 }
881 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
882 _ => {
883 bail!(
884 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
885 self.catalog_type()
886 )
887 }
888 }
889 }
890
891 pub async fn load_table(
893 &self,
894 table: &IcebergTableIdentifier,
895 java_catalog_props: &HashMap<String, String>,
896 ) -> ConnectorResult<Table> {
897 let catalog = self
898 .create_catalog(java_catalog_props)
899 .await
900 .context("Unable to load iceberg catalog")?;
901
902 let table_id = table
903 .to_table_ident()
904 .context("Unable to parse table name")?;
905
906 let table = catalog.load_table(&table_id).await?;
907 Ok(rebuild_table_with_shared_cache(table).await)
908 }
909}
910
911pub(crate) async fn shared_object_cache(
913 init_object_cache: Arc<ObjectCache>,
914 table_uuid: Uuid,
915) -> Arc<ObjectCache> {
916 static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
917 MokaCache::builder()
918 .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
919 .build()
920 });
921
922 CACHE
923 .get_with(table_uuid, async { init_object_cache })
924 .await
925}
926
927pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
928 let table_uuid = table.metadata().uuid();
929 let init_object_cache = table.object_cache();
930 let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
931 table.with_object_cache(object_cache)
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937
938 #[test]
939 fn test_iceberg_table_identifier_validation() {
940 let valid_identifier = IcebergTableIdentifier {
942 database_name: Some("valid_db".to_owned()),
943 table_name: "test_table".to_owned(),
944 };
945 assert!(valid_identifier.validate().is_ok());
946
947 let valid_underscore = IcebergTableIdentifier {
948 database_name: Some("valid_db_name".to_owned()),
949 table_name: "test_table".to_owned(),
950 };
951 assert!(valid_underscore.validate().is_ok());
952
953 let no_database = IcebergTableIdentifier {
954 database_name: None,
955 table_name: "test_table".to_owned(),
956 };
957 assert!(no_database.validate().is_ok());
958
959 let single_dot = IcebergTableIdentifier {
961 database_name: Some("a.b".to_owned()),
962 table_name: "test_table".to_owned(),
963 };
964 let result = single_dot.validate();
965 assert!(result.is_err());
966 assert!(
967 result
968 .unwrap_err()
969 .to_string()
970 .contains("dots are not allowed")
971 );
972
973 let multiple_dots = IcebergTableIdentifier {
974 database_name: Some("a.b.c".to_owned()),
975 table_name: "test_table".to_owned(),
976 };
977 let result = multiple_dots.validate();
978 assert!(result.is_err());
979 assert!(
980 result
981 .unwrap_err()
982 .to_string()
983 .contains("dots are not allowed")
984 );
985 }
986}