1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
32 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
33};
34use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
35use moka::future::Cache as MokaCache;
36use phf::{Set, phf_set};
37use risingwave_common::bail;
38use risingwave_common::error::IcebergError;
39use risingwave_common::util::deployment::Deployment;
40use risingwave_common::util::env_var::env_var_is_true;
41use serde::Deserialize;
42use serde_with::serde_as;
43use url::Url;
44use uuid::Uuid;
45use with_options::WithOptions;
46
47use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
48use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
49use crate::deserialize_optional_bool_from_string;
50use crate::enforce_secret::EnforceSecret;
51use crate::error::ConnectorResult;
52
53#[serde_as]
54#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
55pub struct IcebergCommon {
56 #[serde(rename = "catalog.type")]
59 pub catalog_type: Option<String>,
60 #[serde(rename = "s3.region")]
61 pub s3_region: Option<String>,
62 #[serde(rename = "s3.endpoint")]
63 pub s3_endpoint: Option<String>,
64 #[serde(rename = "s3.access.key")]
65 pub s3_access_key: Option<String>,
66 #[serde(rename = "s3.secret.key")]
67 pub s3_secret_key: Option<String>,
68 #[serde(rename = "s3.iam_role_arn")]
69 pub s3_iam_role_arn: Option<String>,
70
71 #[serde(rename = "glue.access.key")]
72 pub glue_access_key: Option<String>,
73 #[serde(rename = "glue.secret.key")]
74 pub glue_secret_key: Option<String>,
75 #[serde(rename = "glue.iam_role_arn")]
76 pub glue_iam_role_arn: Option<String>,
77 #[serde(rename = "glue.region")]
78 pub glue_region: Option<String>,
79 #[serde(rename = "glue.id")]
82 pub glue_id: Option<String>,
83
84 #[serde(rename = "gcs.credential")]
85 pub gcs_credential: Option<String>,
86
87 #[serde(rename = "azblob.account_name")]
88 pub azblob_account_name: Option<String>,
89 #[serde(rename = "azblob.account_key")]
90 pub azblob_account_key: Option<String>,
91 #[serde(rename = "azblob.endpoint_url")]
92 pub azblob_endpoint_url: Option<String>,
93
94 #[serde(rename = "adlsgen2.account_name")]
95 pub adlsgen2_account_name: Option<String>,
96 #[serde(rename = "adlsgen2.account_key")]
97 pub adlsgen2_account_key: Option<String>,
98 #[serde(rename = "adlsgen2.endpoint")]
99 pub adlsgen2_endpoint: Option<String>,
100
101 #[serde(rename = "warehouse.path")]
103 pub warehouse_path: Option<String>,
104 #[serde(rename = "catalog.name")]
106 pub catalog_name: Option<String>,
107 #[serde(rename = "catalog.uri")]
109 pub catalog_uri: Option<String>,
110 #[serde(rename = "catalog.credential")]
113 pub catalog_credential: Option<String>,
114 #[serde(rename = "catalog.token")]
117 pub catalog_token: Option<String>,
118 #[serde(rename = "catalog.oauth2_server_uri")]
121 pub catalog_oauth2_server_uri: Option<String>,
122 #[serde(rename = "catalog.scope")]
125 pub catalog_scope: Option<String>,
126
127 #[serde(rename = "catalog.rest.signing_region")]
129 pub rest_signing_region: Option<String>,
130
131 #[serde(rename = "catalog.rest.signing_name")]
133 pub rest_signing_name: Option<String>,
134
135 #[serde(
137 rename = "catalog.rest.sigv4_enabled",
138 default,
139 deserialize_with = "deserialize_optional_bool_from_string"
140 )]
141 pub rest_sigv4_enabled: Option<bool>,
142
143 #[serde(
144 rename = "s3.path.style.access",
145 default,
146 deserialize_with = "deserialize_optional_bool_from_string"
147 )]
148 pub s3_path_style_access: Option<bool>,
149 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
151 pub enable_config_load: Option<bool>,
152
153 #[serde(
155 rename = "hosted_catalog",
156 default,
157 deserialize_with = "deserialize_optional_bool_from_string"
158 )]
159 pub hosted_catalog: Option<bool>,
160
161 #[serde(rename = "catalog.header")]
168 pub catalog_header: Option<String>,
169
170 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
172 pub vended_credentials: Option<bool>,
173
174 #[serde(rename = "catalog.security")]
179 pub catalog_security: Option<String>,
180
181 #[serde(rename = "gcp.auth.scopes")]
186 pub gcp_auth_scopes: Option<String>,
187
188 #[serde(rename = "catalog.io_impl")]
195 pub catalog_io_impl: Option<String>,
196}
197
198const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
201const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
202const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
203 SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
204
205impl EnforceSecret for IcebergCommon {
206 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
207 "s3.access.key",
208 "s3.secret.key",
209 "gcs.credential",
210 "catalog.credential",
211 "catalog.token",
212 "catalog.oauth2_server_uri",
213 "adlsgen2.account_key",
214 "adlsgen2.client_secret",
215 "glue.access.key",
216 "glue.secret.key",
217 };
218}
219
220#[serde_as]
221#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
222#[serde(deny_unknown_fields)]
223pub struct IcebergTableIdentifier {
224 #[serde(rename = "database.name")]
225 pub database_name: Option<String>,
226 #[serde(rename = "table.name")]
229 pub table_name: String,
230}
231
232impl IcebergTableIdentifier {
233 pub fn database_name(&self) -> Option<&str> {
234 self.database_name.as_deref()
235 }
236
237 pub fn table_name(&self) -> &str {
238 &self.table_name
239 }
240
241 fn identifier_parts(&self) -> ConnectorResult<Vec<&str>> {
242 let mut parts = Vec::new();
243 if let Some(database_name) = &self.database_name {
244 parts.extend(database_name.split('.'));
245 }
246 parts.extend(self.table_name.split('.'));
247
248 if parts.iter().any(|part| part.is_empty()) {
249 bail!(
250 "Invalid iceberg table identifier '{}': identifier parts must not be empty",
251 self.full_identifier()
252 );
253 }
254
255 Ok(parts)
256 }
257
258 fn full_identifier(&self) -> String {
259 match &self.database_name {
260 Some(database_name) => format!("{}.{}", database_name, self.table_name),
261 None => self.table_name.clone(),
262 }
263 }
264
265 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
266 let ret = TableIdent::from_strs(self.identifier_parts()?);
267
268 Ok(ret.context("Failed to create table identifier")?)
269 }
270
271 pub fn validate(&self) -> ConnectorResult<()> {
272 self.identifier_parts().map(|_| ())
273 }
274}
275
276impl IcebergCommon {
277 pub fn catalog_type(&self) -> &str {
278 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
279 if self.vended_credentials() && catalog_type == "rest" {
280 "rest_rust"
281 } else {
282 catalog_type
283 }
284 }
285
286 pub fn vended_credentials(&self) -> bool {
287 self.vended_credentials.unwrap_or(false)
288 }
289
290 fn glue_access_key(&self) -> Option<&str> {
291 self.glue_access_key
292 .as_deref()
293 .or(self.s3_access_key.as_deref())
294 }
295
296 fn glue_secret_key(&self) -> Option<&str> {
297 self.glue_secret_key
298 .as_deref()
299 .or(self.s3_secret_key.as_deref())
300 }
301
302 fn glue_region(&self) -> Option<&str> {
303 self.glue_region.as_deref().or(self.s3_region.as_deref())
304 }
305
306 pub fn catalog_name(&self) -> String {
307 self.catalog_name
308 .as_ref()
309 .cloned()
310 .unwrap_or_else(|| "risingwave".to_owned())
311 }
312
313 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
314 let mut headers = HashMap::new();
315 let user_agent = match Deployment::current() {
316 Deployment::Ci => "RisingWave(CI)".to_owned(),
317 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
318 Deployment::Other => "RisingWave(OSS)".to_owned(),
319 };
320 if self.vended_credentials() {
321 headers.insert(
322 "X-Iceberg-Access-Delegation".to_owned(),
323 "vended-credentials".to_owned(),
324 );
325 }
326 headers.insert("User-Agent".to_owned(), user_agent);
327 if let Some(header) = &self.catalog_header {
328 for pair in header.split(';') {
329 let mut parts = pair.split('=');
330 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
331 headers.insert(key.to_owned(), value.to_owned());
332 } else {
333 bail!("Invalid header format: {}", pair);
334 }
335 }
336 }
337 Ok(headers)
338 }
339
340 pub fn enable_config_load(&self) -> bool {
341 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
343 if matches!(self.enable_config_load, Some(true)) {
344 tracing::warn!(
345 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
346 );
347 }
348 return false;
349 }
350 self.enable_config_load.unwrap_or(false)
351 }
352
353 fn build_jni_catalog_configs(
355 &self,
356 java_catalog_props: &HashMap<String, String>,
357 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
358 let mut iceberg_configs = HashMap::new();
359 let enable_config_load = self.enable_config_load();
360 let file_io_props = {
361 let catalog_type = self.catalog_type().to_owned();
362
363 if let Some(region) = &self.s3_region {
364 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
366 }
367
368 if let Some(endpoint) = &self.s3_endpoint {
369 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
371 }
372
373 if let Some(access_key) = &self.s3_access_key {
375 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
376 }
377 if let Some(secret_key) = &self.s3_secret_key {
378 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
379 }
380 if let Some(role_arn) = &self.s3_iam_role_arn {
381 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
382 }
383 if let Some(gcs_credential) = &self.gcs_credential {
384 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
385 if catalog_type != "rest" && catalog_type != "rest_rust" {
386 bail!("gcs unsupported in {} catalog", &catalog_type);
387 }
388 }
389
390 if let (
391 Some(azblob_account_name),
392 Some(azblob_account_key),
393 Some(azblob_endpoint_url),
394 ) = (
395 &self.azblob_account_name,
396 &self.azblob_account_key,
397 &self.azblob_endpoint_url,
398 ) {
399 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
400 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
401 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
402
403 if catalog_type != "rest" && catalog_type != "rest_rust" {
404 bail!("azblob unsupported in {} catalog", &catalog_type);
405 }
406 }
407
408 if let (Some(account_name), Some(account_key)) = (
409 self.adlsgen2_account_name.as_ref(),
410 self.adlsgen2_account_key.as_ref(),
411 ) {
412 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
413 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
414 if catalog_type != "rest" && catalog_type != "rest_rust" {
415 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
416 }
417 }
418
419 match &self.warehouse_path {
420 Some(warehouse_path) => {
421 let (bucket, _) = {
422 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
423 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
425 let url = Url::parse(warehouse_path);
426 if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
427 && (catalog_type == "rest" || catalog_type == "rest_rust")
428 {
429 (None, None)
435 } else {
436 let url = url.with_context(|| {
437 format!("Invalid warehouse path: {}", warehouse_path)
438 })?;
439 let bucket = url
440 .host_str()
441 .with_context(|| {
442 format!(
443 "Invalid s3 path: {}, bucket is missing",
444 warehouse_path
445 )
446 })?
447 .to_owned();
448 let root = url.path().trim_start_matches('/').to_owned();
449 (Some(bucket), Some(root))
450 }
451 };
452
453 if let Some(bucket) = bucket {
454 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
455 }
456 }
457 None => {
458 if catalog_type != "rest" && catalog_type != "rest_rust" {
459 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
460 }
461 }
462 }
463 iceberg_configs.insert(
464 S3_DISABLE_CONFIG_LOAD.to_owned(),
465 (!enable_config_load).to_string(),
466 );
467
468 iceberg_configs.insert(
469 GCS_DISABLE_CONFIG_LOAD.to_owned(),
470 (!enable_config_load).to_string(),
471 );
472
473 if let Some(path_style_access) = self.s3_path_style_access {
474 iceberg_configs.insert(
475 S3_PATH_STYLE_ACCESS.to_owned(),
476 path_style_access.to_string(),
477 );
478 }
479
480 iceberg_configs
481 };
482
483 let mut java_catalog_configs = HashMap::new();
485 {
486 if let Some(uri) = self.catalog_uri.as_deref() {
487 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
488 }
489
490 if let Some(warehouse_path) = &self.warehouse_path {
491 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
492 }
493 java_catalog_configs.extend(java_catalog_props.clone());
494
495 let io_impl = self
497 .catalog_io_impl
498 .clone()
499 .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
500 java_catalog_configs.insert("io-impl".to_owned(), io_impl);
501
502 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
504
505 if let Some(region) = &self.s3_region {
506 java_catalog_configs.insert("client.region".to_owned(), region.clone());
507 }
508 if let Some(endpoint) = &self.s3_endpoint {
509 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
510 }
511
512 if let Some(access_key) = &self.s3_access_key {
513 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
514 }
515 if let Some(secret_key) = &self.s3_secret_key {
516 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
517 }
518
519 if let Some(path_style_access) = &self.s3_path_style_access {
520 java_catalog_configs.insert(
521 "s3.path-style-access".to_owned(),
522 path_style_access.to_string(),
523 );
524 }
525
526 let headers = self.headers()?;
527 for (header_name, header_value) in headers {
528 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
529 }
530
531 match self.catalog_type() {
532 "rest" => {
533 if let Some(security) = &self.catalog_security {
535 match security.to_lowercase().as_str() {
536 "google" => {
537 java_catalog_configs.insert(
539 "rest.auth.type".to_owned(),
540 "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
541 );
542 if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
544 java_catalog_configs.insert(
545 "gcp.auth.scopes".to_owned(),
546 gcp_auth_scopes.clone(),
547 );
548 }
549 }
550 "oauth2" => {
551 if let Some(credential) = &self.catalog_credential {
553 java_catalog_configs
554 .insert("credential".to_owned(), credential.clone());
555 }
556 if let Some(token) = &self.catalog_token {
557 java_catalog_configs.insert("token".to_owned(), token.clone());
558 }
559 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
560 java_catalog_configs.insert(
561 "oauth2-server-uri".to_owned(),
562 oauth2_server_uri.clone(),
563 );
564 }
565 if let Some(scope) = &self.catalog_scope {
566 java_catalog_configs.insert("scope".to_owned(), scope.clone());
567 }
568 }
569 "none" | "" => {
570 }
572 _ => {
573 tracing::warn!(
574 "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
575 security
576 );
577 }
578 }
579 } else {
580 if let Some(credential) = &self.catalog_credential {
582 java_catalog_configs
583 .insert("credential".to_owned(), credential.clone());
584 }
585 if let Some(token) = &self.catalog_token {
586 java_catalog_configs.insert("token".to_owned(), token.clone());
587 }
588 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
589 java_catalog_configs
590 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
591 }
592 if let Some(scope) = &self.catalog_scope {
593 java_catalog_configs.insert("scope".to_owned(), scope.clone());
594 }
595 }
596 if let Some(rest_signing_region) = &self.rest_signing_region {
597 java_catalog_configs.insert(
598 "rest.signing-region".to_owned(),
599 rest_signing_region.clone(),
600 );
601 }
602 if let Some(rest_signing_name) = &self.rest_signing_name {
603 java_catalog_configs
604 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
605 }
606 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
607 java_catalog_configs.insert(
608 "rest.sigv4-enabled".to_owned(),
609 rest_sigv4_enabled.to_string(),
610 );
611
612 if let Some(access_key) = &self.s3_access_key {
613 java_catalog_configs
614 .insert("rest.access-key-id".to_owned(), access_key.clone());
615 }
616
617 if let Some(secret_key) = &self.s3_secret_key {
618 java_catalog_configs
619 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
620 }
621 }
622 }
623 "glue" => {
624 let glue_access_key = self.glue_access_key();
625 let glue_secret_key = self.glue_secret_key();
626 let has_glue_credentials =
627 glue_access_key.is_some() && glue_secret_key.is_some();
628 let should_configure_glue_provider = !enable_config_load
629 || has_glue_credentials
630 || self.glue_iam_role_arn.is_some();
631
632 if should_configure_glue_provider {
633 java_catalog_configs.insert(
634 "client.credentials-provider".to_owned(),
635 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
636 );
637 if let Some(region) = self.glue_region() {
638 java_catalog_configs.insert(
639 "client.credentials-provider.glue.region".to_owned(),
640 region.to_owned(),
641 );
642 }
643 if let Some(access_key) = glue_access_key {
644 java_catalog_configs.insert(
645 "client.credentials-provider.glue.access-key-id".to_owned(),
646 access_key.to_owned(),
647 );
648 }
649 if let Some(secret_key) = glue_secret_key {
650 java_catalog_configs.insert(
651 "client.credentials-provider.glue.secret-access-key".to_owned(),
652 secret_key.to_owned(),
653 );
654 }
655 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
656 java_catalog_configs.insert(
657 "client.credentials-provider.glue.iam-role-arn".to_owned(),
658 role_arn.to_owned(),
659 );
660 }
661 if enable_config_load && !has_glue_credentials {
662 java_catalog_configs.insert(
663 "client.credentials-provider.glue.use-default-credential-chain"
664 .to_owned(),
665 "true".to_owned(),
666 );
667 }
668 }
669
670 if let Some(region) = self.glue_region() {
671 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
672 java_catalog_configs.insert(
673 "glue.endpoint".to_owned(),
674 format!("https://glue.{}.amazonaws.com", region),
675 );
676 }
677
678 if let Some(glue_id) = self.glue_id.as_deref() {
679 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
680 }
681 }
682 "jdbc" => {
683 if let Some(iam_role_arn) = &self.s3_iam_role_arn {
684 java_catalog_configs
685 .insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
686 java_catalog_configs.insert(
687 "client.factory".to_owned(),
688 "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
689 );
690 if let Some(region) = &self.s3_region {
691 java_catalog_configs
692 .insert("client.assume-role.region".to_owned(), region.clone());
693 }
694 }
695 }
696 _ => {}
697 }
698 }
699
700 Ok((file_io_props, java_catalog_configs))
701 }
702}
703
704impl IcebergCommon {
705 pub async fn create_catalog(
707 &self,
708 java_catalog_props: &HashMap<String, String>,
709 ) -> ConnectorResult<Arc<dyn Catalog>> {
710 match self.catalog_type() {
711 "storage" => {
712 let warehouse = self
713 .warehouse_path
714 .clone()
715 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
716 let url = Url::parse(warehouse.as_ref())
717 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
718
719 let config = match url.scheme() {
720 "s3" | "s3a" => StorageCatalogConfig::S3(
721 storage_catalog::StorageCatalogS3Config::builder()
722 .warehouse(warehouse)
723 .access_key(self.s3_access_key.clone())
724 .secret_key(self.s3_secret_key.clone())
725 .region(self.s3_region.clone())
726 .endpoint(self.s3_endpoint.clone())
727 .path_style_access(self.s3_path_style_access)
728 .enable_config_load(Some(self.enable_config_load()))
729 .build(),
730 ),
731 "gs" | "gcs" => StorageCatalogConfig::Gcs(
732 storage_catalog::StorageCatalogGcsConfig::builder()
733 .warehouse(warehouse)
734 .credential(self.gcs_credential.clone())
735 .enable_config_load(Some(self.enable_config_load()))
736 .build(),
737 ),
738 "azblob" => StorageCatalogConfig::Azblob(
739 storage_catalog::StorageCatalogAzblobConfig::builder()
740 .warehouse(warehouse)
741 .account_name(self.azblob_account_name.clone())
742 .account_key(self.azblob_account_key.clone())
743 .endpoint(self.azblob_endpoint_url.clone())
744 .build(),
745 ),
746 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
747 };
748
749 let catalog = storage_catalog::StorageCatalog::new(config)?;
750 Ok(Arc::new(catalog))
751 }
752 "rest_rust" => {
753 let mut iceberg_configs = HashMap::new();
754
755 if let Some(gcs_credential) = &self.gcs_credential {
757 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
758 } else {
759 if let Some(region) = &self.s3_region {
760 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
761 }
762 if let Some(endpoint) = &self.s3_endpoint {
763 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
764 }
765 if let Some(access_key) = &self.s3_access_key {
766 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
767 }
768 if let Some(secret_key) = &self.s3_secret_key {
769 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
770 }
771 if let Some(path_style_access) = &self.s3_path_style_access {
772 iceberg_configs.insert(
773 S3_PATH_STYLE_ACCESS.to_owned(),
774 path_style_access.to_string(),
775 );
776 }
777 };
778
779 if let Some(credential) = &self.catalog_credential {
780 iceberg_configs.insert("credential".to_owned(), credential.clone());
781 }
782 if let Some(token) = &self.catalog_token {
783 iceberg_configs.insert("token".to_owned(), token.clone());
784 }
785 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
786 iceberg_configs
787 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
788 }
789 if let Some(scope) = &self.catalog_scope {
790 iceberg_configs.insert("scope".to_owned(), scope.clone());
791 }
792
793 let headers = self.headers()?;
794 for (header_name, header_value) in headers {
795 iceberg_configs.insert(format!("header.{}", header_name), header_value);
796 }
797
798 iceberg_configs.insert(
799 iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
800 self.catalog_uri
801 .clone()
802 .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
803 );
804 if let Some(warehouse_path) = &self.warehouse_path {
805 iceberg_configs.insert(
806 iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
807 warehouse_path.clone(),
808 );
809 }
810 let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
811 .load("rest", iceberg_configs)
812 .await
813 .map_err(|e| anyhow!(IcebergError::from(e)))?;
814 Ok(Arc::new(catalog))
815 }
816 "glue_rust" => {
817 let mut iceberg_configs = HashMap::new();
818 if let Some(region) = self.glue_region() {
820 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
821 }
822 if let Some(access_key) = self.glue_access_key() {
823 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
824 }
825 if let Some(secret_key) = self.glue_secret_key() {
826 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
827 }
828 if let Some(region) = &self.s3_region {
830 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
831 }
832 if let Some(endpoint) = &self.s3_endpoint {
833 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
834 }
835 if let Some(access_key) = &self.s3_access_key {
836 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
837 }
838 if let Some(secret_key) = &self.s3_secret_key {
839 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
840 }
841 if let Some(role_arn) = &self.s3_iam_role_arn {
842 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
843 }
844 if let Some(path_style_access) = &self.s3_path_style_access {
845 iceberg_configs.insert(
846 S3_PATH_STYLE_ACCESS.to_owned(),
847 path_style_access.to_string(),
848 );
849 }
850 iceberg_configs.insert(
851 iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
852 self.warehouse_path
853 .clone()
854 .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
855 );
856 if let Some(uri) = self.catalog_uri.as_deref() {
857 iceberg_configs.insert(
858 iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
859 uri.to_owned(),
860 );
861 }
862 let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
863 .load("glue", iceberg_configs)
864 .await
865 .map_err(|e| anyhow!(IcebergError::from(e)))?;
866 Ok(Arc::new(catalog))
867 }
868 catalog_type
869 if catalog_type == "hive"
870 || catalog_type == "snowflake"
871 || catalog_type == "jdbc"
872 || catalog_type == "rest"
873 || catalog_type == "glue" =>
874 {
875 let (file_io_props, java_catalog_props) =
877 self.build_jni_catalog_configs(java_catalog_props)?;
878 let catalog_impl = match catalog_type {
879 "hive" => "org.apache.iceberg.hive.HiveCatalog",
880 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
881 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
882 "rest" => "org.apache.iceberg.rest.RESTCatalog",
883 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
884 _ => unreachable!(),
885 };
886
887 jni_catalog::JniCatalog::build_catalog(
888 file_io_props,
889 self.catalog_name(),
890 catalog_impl,
891 java_catalog_props,
892 )
893 }
894 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
895 _ => {
896 bail!(
897 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
898 self.catalog_type()
899 )
900 }
901 }
902 }
903
904 pub async fn load_table(
906 &self,
907 table: &IcebergTableIdentifier,
908 java_catalog_props: &HashMap<String, String>,
909 ) -> ConnectorResult<Table> {
910 let catalog = self
911 .create_catalog(java_catalog_props)
912 .await
913 .context("Unable to load iceberg catalog")?;
914
915 let table_id = table
916 .to_table_ident()
917 .context("Unable to parse table name")?;
918
919 let table = catalog.load_table(&table_id).await?;
920 Ok(rebuild_table_with_shared_cache(table).await)
921 }
922}
923
924pub(crate) async fn shared_object_cache(
926 init_object_cache: Arc<ObjectCache>,
927 table_uuid: Uuid,
928) -> Arc<ObjectCache> {
929 static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
930 MokaCache::builder()
931 .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
932 .build()
933 });
934
935 CACHE
936 .get_with(table_uuid, async { init_object_cache })
937 .await
938}
939
940pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
941 let table_uuid = table.metadata().uuid();
942 let init_object_cache = table.object_cache();
943 let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
944 table.with_object_cache(object_cache)
945}
946
947#[cfg(test)]
948mod tests {
949 use super::*;
950
951 #[test]
952 fn test_iceberg_table_identifier_validation() {
953 let valid_identifier = IcebergTableIdentifier {
954 database_name: Some("valid_db".to_owned()),
955 table_name: "test_table".to_owned(),
956 };
957 assert!(valid_identifier.validate().is_ok());
958
959 let valid_underscore = IcebergTableIdentifier {
960 database_name: Some("valid_db_name".to_owned()),
961 table_name: "test_table".to_owned(),
962 };
963 assert!(valid_underscore.validate().is_ok());
964
965 let no_database = IcebergTableIdentifier {
966 database_name: None,
967 table_name: "test_table".to_owned(),
968 };
969 assert!(no_database.validate().is_ok());
970
971 let empty_part = IcebergTableIdentifier {
972 database_name: Some("a..b".to_owned()),
973 table_name: "test_table".to_owned(),
974 };
975 let result = empty_part.validate();
976 assert!(result.is_err());
977 assert!(
978 result
979 .unwrap_err()
980 .to_string()
981 .contains("identifier parts must not be empty")
982 );
983
984 let leading_dot = IcebergTableIdentifier {
985 database_name: None,
986 table_name: ".test_table".to_owned(),
987 };
988 let result = leading_dot.validate();
989 assert!(result.is_err());
990 assert!(
991 result
992 .unwrap_err()
993 .to_string()
994 .contains("identifier parts must not be empty")
995 );
996 }
997
998 #[test]
999 fn test_iceberg_table_identifier_dots_as_namespace_separators() {
1000 let table_ident = IcebergTableIdentifier {
1001 database_name: Some("general.zia.stats".to_owned()),
1002 table_name: "tagged_security_transactions".to_owned(),
1003 }
1004 .to_table_ident()
1005 .unwrap();
1006 let namespace: Vec<_> = table_ident
1007 .namespace()
1008 .as_ref()
1009 .iter()
1010 .map(String::as_str)
1011 .collect();
1012 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1013 assert_eq!(table_ident.name(), "tagged_security_transactions");
1014
1015 let table_ident = IcebergTableIdentifier {
1016 database_name: Some("general".to_owned()),
1017 table_name: "zia.stats.tagged_security_transactions".to_owned(),
1018 }
1019 .to_table_ident()
1020 .unwrap();
1021 let namespace: Vec<_> = table_ident
1022 .namespace()
1023 .as_ref()
1024 .iter()
1025 .map(String::as_str)
1026 .collect();
1027 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1028 assert_eq!(table_ident.name(), "tagged_security_transactions");
1029
1030 let table_ident = IcebergTableIdentifier {
1031 database_name: None,
1032 table_name: "general.zia.stats.tagged_security_transactions".to_owned(),
1033 }
1034 .to_table_ident()
1035 .unwrap();
1036 let namespace: Vec<_> = table_ident
1037 .namespace()
1038 .as_ref()
1039 .iter()
1040 .map(String::as_str)
1041 .collect();
1042 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1043 assert_eq!(table_ident.name(), "tagged_security_transactions");
1044 }
1045}