1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::{Arc, LazyLock};
22
23use ::iceberg::io::{
24 S3_ACCESS_KEY_ID, S3_ASSUME_ROLE_ARN, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use ::iceberg::table::Table;
27use ::iceberg::{Catalog, CatalogBuilder, TableIdent};
28use anyhow::{Context, anyhow};
29use iceberg::io::object_cache::ObjectCache;
30use iceberg::io::{
31 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET,
32 ADLS_TENANT_ID, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT, GCS_CREDENTIALS_JSON,
33 GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
34};
35use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
36use moka::future::Cache as MokaCache;
37use phf::{Set, phf_set};
38use risingwave_common::bail;
39use risingwave_common::error::IcebergError;
40use risingwave_common::util::deployment::Deployment;
41use risingwave_common::util::env_var::env_var_is_true;
42use serde::Deserialize;
43use serde_with::serde_as;
44use url::Url;
45use uuid::Uuid;
46use with_options::WithOptions;
47
48use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
49use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
50use crate::deserialize_optional_bool_from_string;
51use crate::enforce_secret::EnforceSecret;
52use crate::error::ConnectorResult;
53
54#[serde_as]
55#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
56pub struct IcebergCommon {
57 #[serde(rename = "catalog.type")]
60 pub catalog_type: Option<String>,
61 #[serde(rename = "s3.region")]
62 pub s3_region: Option<String>,
63 #[serde(rename = "s3.endpoint")]
64 pub s3_endpoint: Option<String>,
65 #[serde(rename = "s3.access.key")]
66 pub s3_access_key: Option<String>,
67 #[serde(rename = "s3.secret.key")]
68 pub s3_secret_key: Option<String>,
69 #[serde(rename = "s3.iam_role_arn")]
70 pub s3_iam_role_arn: Option<String>,
71
72 #[serde(rename = "glue.access.key")]
73 pub glue_access_key: Option<String>,
74 #[serde(rename = "glue.secret.key")]
75 pub glue_secret_key: Option<String>,
76 #[serde(rename = "glue.iam_role_arn")]
77 pub glue_iam_role_arn: Option<String>,
78 #[serde(rename = "glue.region")]
79 pub glue_region: Option<String>,
80 #[serde(rename = "glue.id")]
83 pub glue_id: Option<String>,
84
85 #[serde(rename = "gcs.credential")]
86 pub gcs_credential: Option<String>,
87
88 #[serde(rename = "azblob.account_name")]
89 pub azblob_account_name: Option<String>,
90 #[serde(rename = "azblob.account_key")]
91 pub azblob_account_key: Option<String>,
92 #[serde(rename = "azblob.endpoint_url")]
93 pub azblob_endpoint_url: Option<String>,
94
95 #[serde(rename = "adlsgen2.account_name")]
96 pub adlsgen2_account_name: Option<String>,
97 #[serde(rename = "adlsgen2.account_key")]
98 pub adlsgen2_account_key: Option<String>,
99 #[serde(rename = "adlsgen2.endpoint")]
100 pub adlsgen2_endpoint: Option<String>,
101 #[serde(rename = "adlsgen2.tenant_id")]
102 pub adlsgen2_tenant_id: Option<String>,
103 #[serde(rename = "adlsgen2.client_id")]
104 pub adlsgen2_client_id: Option<String>,
105 #[serde(rename = "adlsgen2.client_secret")]
106 pub adlsgen2_client_secret: Option<String>,
107 #[serde(rename = "adlsgen2.authority_host")]
108 pub adlsgen2_authority_host: Option<String>,
109
110 #[serde(rename = "warehouse.path")]
112 pub warehouse_path: Option<String>,
113 #[serde(rename = "catalog.name")]
115 pub catalog_name: Option<String>,
116 #[serde(rename = "catalog.uri")]
118 pub catalog_uri: Option<String>,
119 #[serde(rename = "catalog.credential")]
122 pub catalog_credential: Option<String>,
123 #[serde(rename = "catalog.token")]
126 pub catalog_token: Option<String>,
127 #[serde(rename = "catalog.oauth2_server_uri")]
130 pub catalog_oauth2_server_uri: Option<String>,
131 #[serde(rename = "catalog.scope")]
134 pub catalog_scope: Option<String>,
135
136 #[serde(rename = "catalog.rest.signing_region")]
138 pub rest_signing_region: Option<String>,
139
140 #[serde(rename = "catalog.rest.signing_name")]
142 pub rest_signing_name: Option<String>,
143
144 #[serde(
146 rename = "catalog.rest.sigv4_enabled",
147 default,
148 deserialize_with = "deserialize_optional_bool_from_string"
149 )]
150 pub rest_sigv4_enabled: Option<bool>,
151
152 #[serde(
153 rename = "s3.path.style.access",
154 default,
155 deserialize_with = "deserialize_optional_bool_from_string"
156 )]
157 pub s3_path_style_access: Option<bool>,
158 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
160 pub enable_config_load: Option<bool>,
161
162 #[serde(
164 rename = "hosted_catalog",
165 default,
166 deserialize_with = "deserialize_optional_bool_from_string"
167 )]
168 pub hosted_catalog: Option<bool>,
169
170 #[serde(rename = "catalog.header")]
179 pub catalog_header: Option<String>,
180
181 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
185 pub vended_credentials: Option<bool>,
186
187 #[serde(rename = "catalog.security")]
192 pub catalog_security: Option<String>,
193
194 #[serde(rename = "gcp.auth.scopes")]
199 pub gcp_auth_scopes: Option<String>,
200
201 #[serde(rename = "catalog.io_impl")]
210 pub catalog_io_impl: Option<String>,
211}
212
213const DEFAULT_OBJECT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024;
216const SHARED_OBJECT_CACHE_BUDGET_BYTES: u64 = 512 * 1024 * 1024;
217const SHARED_OBJECT_CACHE_MAX_TABLES: u64 =
218 SHARED_OBJECT_CACHE_BUDGET_BYTES / DEFAULT_OBJECT_CACHE_SIZE_BYTES;
219
220const ADLS_DEFAULT_AUTHORITY_HOST: &str = "https://login.microsoftonline.com";
223
224impl EnforceSecret for IcebergCommon {
225 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
226 "s3.access.key",
227 "s3.secret.key",
228 "gcs.credential",
229 "catalog.credential",
230 "catalog.token",
231 "catalog.oauth2_server_uri",
232 "adlsgen2.account_key",
233 "adlsgen2.client_secret",
234 "glue.access.key",
235 "glue.secret.key",
236 };
237}
238
239#[serde_as]
240#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
241#[serde(deny_unknown_fields)]
242pub struct IcebergTableIdentifier {
243 #[serde(rename = "database.name")]
244 pub database_name: Option<String>,
245 #[serde(rename = "table.name")]
248 pub table_name: String,
249}
250
251impl IcebergTableIdentifier {
252 pub fn database_name(&self) -> Option<&str> {
253 self.database_name.as_deref()
254 }
255
256 pub fn table_name(&self) -> &str {
257 &self.table_name
258 }
259
260 fn identifier_parts(&self) -> ConnectorResult<Vec<&str>> {
261 let mut parts = Vec::new();
262 if let Some(database_name) = &self.database_name {
263 parts.extend(database_name.split('.'));
264 }
265 parts.extend(self.table_name.split('.'));
266
267 if parts.iter().any(|part| part.is_empty()) {
268 bail!(
269 "Invalid iceberg table identifier '{}': identifier parts must not be empty",
270 self.full_identifier()
271 );
272 }
273
274 Ok(parts)
275 }
276
277 fn full_identifier(&self) -> String {
278 match &self.database_name {
279 Some(database_name) => format!("{}.{}", database_name, self.table_name),
280 None => self.table_name.clone(),
281 }
282 }
283
284 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
285 let ret = TableIdent::from_strs(self.identifier_parts()?);
286
287 Ok(ret.context("Failed to create table identifier")?)
288 }
289
290 pub fn validate(&self) -> ConnectorResult<()> {
291 self.identifier_parts().map(|_| ())
292 }
293}
294
295impl IcebergCommon {
296 pub fn catalog_type(&self) -> &str {
297 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
298 if self.vended_credentials() && catalog_type == "rest" {
299 "rest_rust"
300 } else {
301 catalog_type
302 }
303 }
304
305 pub fn vended_credentials(&self) -> bool {
306 self.vended_credentials.unwrap_or(false)
307 }
308
309 fn glue_access_key(&self) -> Option<&str> {
310 self.glue_access_key
311 .as_deref()
312 .or(self.s3_access_key.as_deref())
313 }
314
315 fn glue_secret_key(&self) -> Option<&str> {
316 self.glue_secret_key
317 .as_deref()
318 .or(self.s3_secret_key.as_deref())
319 }
320
321 fn glue_region(&self) -> Option<&str> {
322 self.glue_region.as_deref().or(self.s3_region.as_deref())
323 }
324
325 pub fn catalog_name(&self) -> String {
326 self.catalog_name
327 .as_ref()
328 .cloned()
329 .unwrap_or_else(|| "risingwave".to_owned())
330 }
331
332 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
333 let mut headers = HashMap::new();
334 let user_agent = match Deployment::current() {
335 Deployment::Ci => "RisingWave(CI)".to_owned(),
336 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
337 Deployment::Other => "RisingWave(OSS)".to_owned(),
338 };
339 if self.vended_credentials() {
340 headers.insert(
341 "X-Iceberg-Access-Delegation".to_owned(),
342 "vended-credentials".to_owned(),
343 );
344 }
345 headers.insert("User-Agent".to_owned(), user_agent);
346 if let Some(header) = &self.catalog_header {
347 for pair in header.split(';') {
348 let mut parts = pair.split('=');
349 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
350 headers.insert(key.to_owned(), value.to_owned());
351 } else {
352 bail!("Invalid header format: {}", pair);
353 }
354 }
355 }
356 Ok(headers)
357 }
358
359 pub fn enable_config_load(&self) -> bool {
360 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
362 if matches!(self.enable_config_load, Some(true)) {
363 tracing::warn!(
364 "`enable_config_load` can't be enabled in SaaS environment, the behavior might be unexpected"
365 );
366 }
367 return false;
368 }
369 self.enable_config_load.unwrap_or(false)
370 }
371
372 fn build_jni_catalog_configs(
374 &self,
375 java_catalog_props: &HashMap<String, String>,
376 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
377 let mut iceberg_configs = HashMap::new();
378 let enable_config_load = self.enable_config_load();
379 let file_io_props = {
380 let catalog_type = self.catalog_type().to_owned();
381
382 let require_rest = |backend: &str| -> ConnectorResult<()> {
386 if catalog_type != "rest" {
387 bail!("{} unsupported in {} catalog", backend, &catalog_type);
388 }
389 Ok(())
390 };
391
392 if let Some(region) = &self.s3_region {
393 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
395 }
396
397 if let Some(endpoint) = &self.s3_endpoint {
398 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
400 }
401
402 if let Some(access_key) = &self.s3_access_key {
404 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
405 }
406 if let Some(secret_key) = &self.s3_secret_key {
407 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
408 }
409 if let Some(role_arn) = &self.s3_iam_role_arn {
410 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
411 }
412 if let Some(gcs_credential) = &self.gcs_credential {
413 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
414 require_rest("gcs")?;
415 }
416
417 if let (
418 Some(azblob_account_name),
419 Some(azblob_account_key),
420 Some(azblob_endpoint_url),
421 ) = (
422 &self.azblob_account_name,
423 &self.azblob_account_key,
424 &self.azblob_endpoint_url,
425 ) {
426 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
427 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
428 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
429
430 require_rest("azblob")?;
431 }
432
433 fn nonempty(v: &Option<String>) -> Option<&str> {
438 v.as_deref().filter(|s| !s.trim().is_empty())
439 }
440 let sp_tenant = nonempty(&self.adlsgen2_tenant_id);
441 let sp_client = nonempty(&self.adlsgen2_client_id);
442 let sp_secret = nonempty(&self.adlsgen2_client_secret);
443 let sp_authority = nonempty(&self.adlsgen2_authority_host);
444 let sk_account_name = nonempty(&self.adlsgen2_account_name);
445 let sk_account_key = nonempty(&self.adlsgen2_account_key);
446 let any_sp_field = sp_tenant.is_some()
447 || sp_client.is_some()
448 || sp_secret.is_some()
449 || sp_authority.is_some();
450 let all_sp_required = sp_tenant.is_some() && sp_client.is_some() && sp_secret.is_some();
451
452 if sk_account_key.is_some() && any_sp_field {
453 bail!(
454 "adlsgen2: cannot configure both shared-key auth \
455 (adlsgen2.account_key) and service-principal auth \
456 (adlsgen2.tenant_id / adlsgen2.client_id / adlsgen2.client_secret / \
457 adlsgen2.authority_host) simultaneously. Specify exactly one auth mode."
458 );
459 }
460 if any_sp_field && !all_sp_required {
461 bail!(
462 "adlsgen2: service-principal auth requires all three of \
463 adlsgen2.tenant_id, adlsgen2.client_id, and adlsgen2.client_secret \
464 to be set. (adlsgen2.authority_host is optional and defaults to the \
465 public Azure AAD endpoint.)"
466 );
467 }
468 if let Some(host) = sp_authority {
473 let parsed = Url::parse(host).map_err(|_| {
474 anyhow!(
475 "adlsgen2.authority_host does not parse as a URL ({} chars)",
476 host.len()
477 )
478 })?;
479 if parsed.scheme() != "https" {
480 bail!(
481 "adlsgen2.authority_host must use the https scheme, got {}",
482 parsed.scheme()
483 );
484 }
485 if !parsed.username().is_empty() || parsed.password().is_some() {
486 bail!("adlsgen2.authority_host must not contain userinfo");
487 }
488 if parsed.query().is_some() || parsed.fragment().is_some() {
489 bail!("adlsgen2.authority_host must not contain a query or fragment");
490 }
491 if !matches!(parsed.path(), "" | "/") {
492 bail!("adlsgen2.authority_host must not contain a path component");
493 }
494 }
495
496 if let (Some(account_name), Some(account_key)) = (sk_account_name, sk_account_key) {
497 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.to_owned());
498 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.to_owned());
499 require_rest("adlsgen2")?;
500 }
501
502 if let (Some(tenant_id), Some(client_id), Some(client_secret)) =
503 (sp_tenant, sp_client, sp_secret)
504 {
505 iceberg_configs.insert(ADLS_TENANT_ID.to_owned(), tenant_id.to_owned());
506 iceberg_configs.insert(ADLS_CLIENT_ID.to_owned(), client_id.to_owned());
507 iceberg_configs.insert(ADLS_CLIENT_SECRET.to_owned(), client_secret.to_owned());
508 let authority_host = sp_authority
510 .unwrap_or(ADLS_DEFAULT_AUTHORITY_HOST)
511 .trim_end_matches('/')
512 .to_owned();
513 iceberg_configs.insert(ADLS_AUTHORITY_HOST.to_owned(), authority_host);
514 require_rest("adlsgen2")?;
515 }
516
517 match &self.warehouse_path {
518 Some(warehouse_path) => {
519 let (bucket, _) = {
520 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
521 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
523 let url = Url::parse(warehouse_path);
524 if (url.is_err() || is_s3_tables || is_bq_catalog_federation)
525 && (catalog_type == "rest" || catalog_type == "rest_rust")
526 {
527 (None, None)
533 } else {
534 let url = url.with_context(|| {
535 format!("Invalid warehouse path: {}", warehouse_path)
536 })?;
537 let bucket = url
538 .host_str()
539 .with_context(|| {
540 format!(
541 "Invalid s3 path: {}, bucket is missing",
542 warehouse_path
543 )
544 })?
545 .to_owned();
546 let root = url.path().trim_start_matches('/').to_owned();
547 (Some(bucket), Some(root))
548 }
549 };
550
551 if let Some(bucket) = bucket {
552 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
553 }
554 }
555 None => {
556 if catalog_type != "rest" && catalog_type != "rest_rust" {
557 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
558 }
559 }
560 }
561 iceberg_configs.insert(
562 S3_DISABLE_CONFIG_LOAD.to_owned(),
563 (!enable_config_load).to_string(),
564 );
565
566 iceberg_configs.insert(
567 GCS_DISABLE_CONFIG_LOAD.to_owned(),
568 (!enable_config_load).to_string(),
569 );
570
571 if let Some(path_style_access) = self.s3_path_style_access {
572 iceberg_configs.insert(
573 S3_PATH_STYLE_ACCESS.to_owned(),
574 path_style_access.to_string(),
575 );
576 }
577
578 iceberg_configs
579 };
580
581 let mut java_catalog_configs = HashMap::new();
583 {
584 if let Some(uri) = self.catalog_uri.as_deref() {
585 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
586 }
587
588 if let Some(warehouse_path) = &self.warehouse_path {
589 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
590 }
591 java_catalog_configs.extend(java_catalog_props.clone());
592
593 let io_impl = self
595 .catalog_io_impl
596 .clone()
597 .unwrap_or_else(|| "org.apache.iceberg.aws.s3.S3FileIO".to_owned());
598 java_catalog_configs.insert("io-impl".to_owned(), io_impl);
599
600 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
602
603 if let Some(region) = &self.s3_region {
604 java_catalog_configs.insert("client.region".to_owned(), region.clone());
605 }
606 if let Some(endpoint) = &self.s3_endpoint {
607 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
608 }
609
610 if let Some(access_key) = &self.s3_access_key {
611 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
612 }
613 if let Some(secret_key) = &self.s3_secret_key {
614 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
615 }
616
617 if let Some(path_style_access) = &self.s3_path_style_access {
618 java_catalog_configs.insert(
619 "s3.path-style-access".to_owned(),
620 path_style_access.to_string(),
621 );
622 }
623
624 let headers = self.headers()?;
625 for (header_name, header_value) in headers {
626 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
627 }
628
629 match self.catalog_type() {
630 "rest" => {
631 if let Some(security) = &self.catalog_security {
633 match security.to_lowercase().as_str() {
634 "google" => {
635 java_catalog_configs.insert(
637 "rest.auth.type".to_owned(),
638 "org.apache.iceberg.gcp.auth.GoogleAuthManager".to_owned(),
639 );
640 if let Some(gcp_auth_scopes) = &self.gcp_auth_scopes {
642 java_catalog_configs.insert(
643 "gcp.auth.scopes".to_owned(),
644 gcp_auth_scopes.clone(),
645 );
646 }
647 }
648 "oauth2" => {
649 if let Some(credential) = &self.catalog_credential {
651 java_catalog_configs
652 .insert("credential".to_owned(), credential.clone());
653 }
654 if let Some(token) = &self.catalog_token {
655 java_catalog_configs.insert("token".to_owned(), token.clone());
656 }
657 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
658 java_catalog_configs.insert(
659 "oauth2-server-uri".to_owned(),
660 oauth2_server_uri.clone(),
661 );
662 }
663 if let Some(scope) = &self.catalog_scope {
664 java_catalog_configs.insert("scope".to_owned(), scope.clone());
665 }
666 }
667 "none" | "" => {
668 }
670 _ => {
671 tracing::warn!(
672 "Unknown catalog.security value: {}. Supported values: none, oauth2, google",
673 security
674 );
675 }
676 }
677 } else {
678 if let Some(credential) = &self.catalog_credential {
680 java_catalog_configs
681 .insert("credential".to_owned(), credential.clone());
682 }
683 if let Some(token) = &self.catalog_token {
684 java_catalog_configs.insert("token".to_owned(), token.clone());
685 }
686 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
687 java_catalog_configs
688 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
689 }
690 if let Some(scope) = &self.catalog_scope {
691 java_catalog_configs.insert("scope".to_owned(), scope.clone());
692 }
693 }
694 if let Some(rest_signing_region) = &self.rest_signing_region {
695 java_catalog_configs.insert(
696 "rest.signing-region".to_owned(),
697 rest_signing_region.clone(),
698 );
699 }
700 if let Some(rest_signing_name) = &self.rest_signing_name {
701 java_catalog_configs
702 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
703 }
704 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
705 java_catalog_configs.insert(
706 "rest.sigv4-enabled".to_owned(),
707 rest_sigv4_enabled.to_string(),
708 );
709
710 if let Some(access_key) = &self.s3_access_key {
711 java_catalog_configs
712 .insert("rest.access-key-id".to_owned(), access_key.clone());
713 }
714
715 if let Some(secret_key) = &self.s3_secret_key {
716 java_catalog_configs
717 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
718 }
719 }
720 }
721 "glue" => {
722 let glue_access_key = self.glue_access_key();
723 let glue_secret_key = self.glue_secret_key();
724 let has_glue_credentials =
725 glue_access_key.is_some() && glue_secret_key.is_some();
726 let should_configure_glue_provider = !enable_config_load
727 || has_glue_credentials
728 || self.glue_iam_role_arn.is_some();
729
730 if should_configure_glue_provider {
731 java_catalog_configs.insert(
732 "client.credentials-provider".to_owned(),
733 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
734 );
735 if let Some(region) = self.glue_region() {
736 java_catalog_configs.insert(
737 "client.credentials-provider.glue.region".to_owned(),
738 region.to_owned(),
739 );
740 }
741 if let Some(access_key) = glue_access_key {
742 java_catalog_configs.insert(
743 "client.credentials-provider.glue.access-key-id".to_owned(),
744 access_key.to_owned(),
745 );
746 }
747 if let Some(secret_key) = glue_secret_key {
748 java_catalog_configs.insert(
749 "client.credentials-provider.glue.secret-access-key".to_owned(),
750 secret_key.to_owned(),
751 );
752 }
753 if let Some(role_arn) = self.glue_iam_role_arn.as_deref() {
754 java_catalog_configs.insert(
755 "client.credentials-provider.glue.iam-role-arn".to_owned(),
756 role_arn.to_owned(),
757 );
758 }
759 if enable_config_load && !has_glue_credentials {
760 java_catalog_configs.insert(
761 "client.credentials-provider.glue.use-default-credential-chain"
762 .to_owned(),
763 "true".to_owned(),
764 );
765 }
766 }
767
768 if let Some(region) = self.glue_region() {
769 java_catalog_configs.insert("client.region".to_owned(), region.to_owned());
770 java_catalog_configs.insert(
771 "glue.endpoint".to_owned(),
772 format!("https://glue.{}.amazonaws.com", region),
773 );
774 }
775
776 if let Some(glue_id) = self.glue_id.as_deref() {
777 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
778 }
779 self.apply_java_s3_file_io_assume_role_configs(&mut java_catalog_configs);
780 }
781 "jdbc" => {
782 self.apply_java_aws_client_assume_role_configs(&mut java_catalog_configs);
783 }
784 _ => {}
785 }
786 }
787
788 Ok((file_io_props, java_catalog_configs))
789 }
790
791 fn apply_java_s3_file_io_assume_role_configs(
792 &self,
793 java_catalog_configs: &mut HashMap<String, String>,
794 ) {
795 if let Some(iam_role_arn) = &self.s3_iam_role_arn {
796 java_catalog_configs.insert(
797 "s3.client-factory-impl".to_owned(),
798 "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory".to_owned(),
799 );
800 java_catalog_configs.insert("s3.iam-role-arn".to_owned(), iam_role_arn.clone());
801 }
802 }
803
804 fn apply_java_aws_client_assume_role_configs(
805 &self,
806 java_catalog_configs: &mut HashMap<String, String>,
807 ) {
808 if let Some(iam_role_arn) = &self.s3_iam_role_arn {
809 java_catalog_configs.insert("client.assume-role.arn".to_owned(), iam_role_arn.clone());
810 java_catalog_configs.insert(
811 "client.factory".to_owned(),
812 "org.apache.iceberg.aws.AssumeRoleAwsClientFactory".to_owned(),
813 );
814 if let Some(region) = &self.s3_region {
815 java_catalog_configs.insert("client.assume-role.region".to_owned(), region.clone());
816 }
817 }
818 }
819}
820
821impl IcebergCommon {
822 pub async fn create_catalog(
824 &self,
825 java_catalog_props: &HashMap<String, String>,
826 ) -> ConnectorResult<Arc<dyn Catalog>> {
827 match self.catalog_type() {
828 "storage" => {
829 let warehouse = self
830 .warehouse_path
831 .clone()
832 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
833 let url = Url::parse(warehouse.as_ref())
834 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
835
836 let config = match url.scheme() {
837 "s3" | "s3a" => StorageCatalogConfig::S3(
838 storage_catalog::StorageCatalogS3Config::builder()
839 .warehouse(warehouse)
840 .access_key(self.s3_access_key.clone())
841 .secret_key(self.s3_secret_key.clone())
842 .region(self.s3_region.clone())
843 .endpoint(self.s3_endpoint.clone())
844 .path_style_access(self.s3_path_style_access)
845 .enable_config_load(Some(self.enable_config_load()))
846 .build(),
847 ),
848 "gs" | "gcs" => StorageCatalogConfig::Gcs(
849 storage_catalog::StorageCatalogGcsConfig::builder()
850 .warehouse(warehouse)
851 .credential(self.gcs_credential.clone())
852 .enable_config_load(Some(self.enable_config_load()))
853 .build(),
854 ),
855 "azblob" => StorageCatalogConfig::Azblob(
856 storage_catalog::StorageCatalogAzblobConfig::builder()
857 .warehouse(warehouse)
858 .account_name(self.azblob_account_name.clone())
859 .account_key(self.azblob_account_key.clone())
860 .endpoint(self.azblob_endpoint_url.clone())
861 .build(),
862 ),
863 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
864 };
865
866 let catalog = storage_catalog::StorageCatalog::new(config)?;
867 Ok(Arc::new(catalog))
868 }
869 "rest_rust" => {
870 let mut iceberg_configs = HashMap::new();
871
872 if let Some(gcs_credential) = &self.gcs_credential {
874 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
875 } else {
876 if let Some(region) = &self.s3_region {
877 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
878 }
879 if let Some(endpoint) = &self.s3_endpoint {
880 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
881 }
882 if let Some(access_key) = &self.s3_access_key {
883 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
884 }
885 if let Some(secret_key) = &self.s3_secret_key {
886 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
887 }
888 if let Some(path_style_access) = &self.s3_path_style_access {
889 iceberg_configs.insert(
890 S3_PATH_STYLE_ACCESS.to_owned(),
891 path_style_access.to_string(),
892 );
893 }
894 };
895
896 if let Some(credential) = &self.catalog_credential {
897 iceberg_configs.insert("credential".to_owned(), credential.clone());
898 }
899 if let Some(token) = &self.catalog_token {
900 iceberg_configs.insert("token".to_owned(), token.clone());
901 }
902 if let Some(oauth2_server_uri) = &self.catalog_oauth2_server_uri {
903 iceberg_configs
904 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
905 }
906 if let Some(scope) = &self.catalog_scope {
907 iceberg_configs.insert("scope".to_owned(), scope.clone());
908 }
909
910 let headers = self.headers()?;
911 for (header_name, header_value) in headers {
912 iceberg_configs.insert(format!("header.{}", header_name), header_value);
913 }
914
915 iceberg_configs.insert(
916 iceberg_catalog_rest::REST_CATALOG_PROP_URI.to_owned(),
917 self.catalog_uri
918 .clone()
919 .with_context(|| "`catalog.uri` must be set in rest catalog".to_owned())?,
920 );
921 if let Some(warehouse_path) = &self.warehouse_path {
922 iceberg_configs.insert(
923 iceberg_catalog_rest::REST_CATALOG_PROP_WAREHOUSE.to_owned(),
924 warehouse_path.clone(),
925 );
926 }
927 let catalog = iceberg_catalog_rest::RestCatalogBuilder::default()
928 .load("rest", iceberg_configs)
929 .await
930 .map_err(|e| anyhow!(IcebergError::from(e)))?;
931 Ok(Arc::new(catalog))
932 }
933 "glue_rust" => {
934 let mut iceberg_configs = HashMap::new();
935 if let Some(region) = self.glue_region() {
937 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.to_owned());
938 }
939 if let Some(access_key) = self.glue_access_key() {
940 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.to_owned());
941 }
942 if let Some(secret_key) = self.glue_secret_key() {
943 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.to_owned());
944 }
945 if let Some(region) = &self.s3_region {
947 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
948 }
949 if let Some(endpoint) = &self.s3_endpoint {
950 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
951 }
952 if let Some(access_key) = &self.s3_access_key {
953 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
954 }
955 if let Some(secret_key) = &self.s3_secret_key {
956 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
957 }
958 if let Some(role_arn) = &self.s3_iam_role_arn {
959 iceberg_configs.insert(S3_ASSUME_ROLE_ARN.to_owned(), role_arn.clone());
960 }
961 if let Some(path_style_access) = &self.s3_path_style_access {
962 iceberg_configs.insert(
963 S3_PATH_STYLE_ACCESS.to_owned(),
964 path_style_access.to_string(),
965 );
966 }
967 iceberg_configs.insert(
968 iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE.to_owned(),
969 self.warehouse_path
970 .clone()
971 .ok_or_else(|| anyhow!("`warehouse.path` must be set in glue catalog"))?,
972 );
973 if let Some(uri) = self.catalog_uri.as_deref() {
974 iceberg_configs.insert(
975 iceberg_catalog_glue::GLUE_CATALOG_PROP_URI.to_owned(),
976 uri.to_owned(),
977 );
978 }
979 let catalog = iceberg_catalog_glue::GlueCatalogBuilder::default()
980 .load("glue", iceberg_configs)
981 .await
982 .map_err(|e| anyhow!(IcebergError::from(e)))?;
983 Ok(Arc::new(catalog))
984 }
985 catalog_type
986 if catalog_type == "hive"
987 || catalog_type == "snowflake"
988 || catalog_type == "jdbc"
989 || catalog_type == "rest"
990 || catalog_type == "glue" =>
991 {
992 let (file_io_props, java_catalog_props) =
994 self.build_jni_catalog_configs(java_catalog_props)?;
995 let catalog_impl = match catalog_type {
996 "hive" => "org.apache.iceberg.hive.HiveCatalog",
997 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
998 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
999 "rest" => "org.apache.iceberg.rest.RESTCatalog",
1000 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
1001 _ => unreachable!(),
1002 };
1003
1004 jni_catalog::JniCatalog::build_catalog(
1005 file_io_props,
1006 self.catalog_name(),
1007 catalog_impl,
1008 java_catalog_props,
1009 )
1010 }
1011 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
1012 _ => {
1013 bail!(
1014 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
1015 self.catalog_type()
1016 )
1017 }
1018 }
1019 }
1020
1021 pub async fn load_table(
1023 &self,
1024 table: &IcebergTableIdentifier,
1025 java_catalog_props: &HashMap<String, String>,
1026 ) -> ConnectorResult<Table> {
1027 let catalog = self
1028 .create_catalog(java_catalog_props)
1029 .await
1030 .context("Unable to load iceberg catalog")?;
1031
1032 let table_id = table
1033 .to_table_ident()
1034 .context("Unable to parse table name")?;
1035
1036 let table = catalog.load_table(&table_id).await?;
1037 Ok(rebuild_table_with_shared_cache(table).await)
1038 }
1039}
1040
1041pub(crate) async fn shared_object_cache(
1043 init_object_cache: Arc<ObjectCache>,
1044 table_uuid: Uuid,
1045) -> Arc<ObjectCache> {
1046 static CACHE: LazyLock<MokaCache<Uuid, Arc<ObjectCache>>> = LazyLock::new(|| {
1047 MokaCache::builder()
1048 .max_capacity(SHARED_OBJECT_CACHE_MAX_TABLES)
1049 .build()
1050 });
1051
1052 CACHE
1053 .get_with(table_uuid, async { init_object_cache })
1054 .await
1055}
1056
1057pub async fn rebuild_table_with_shared_cache(table: Table) -> Table {
1058 let table_uuid = table.metadata().uuid();
1059 let init_object_cache = table.object_cache();
1060 let object_cache = shared_object_cache(init_object_cache, table_uuid).await;
1061 table.with_object_cache(object_cache)
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use std::collections::HashMap;
1067
1068 use super::*;
1069
1070 fn test_common(catalog_type: &str) -> IcebergCommon {
1071 IcebergCommon {
1072 catalog_type: Some(catalog_type.to_owned()),
1073 s3_region: Some("ap-southeast-2".to_owned()),
1074 s3_endpoint: None,
1075 s3_access_key: None,
1076 s3_secret_key: None,
1077 s3_iam_role_arn: None,
1078 glue_access_key: None,
1079 glue_secret_key: None,
1080 glue_iam_role_arn: None,
1081 glue_region: None,
1082 glue_id: None,
1083 gcs_credential: None,
1084 azblob_account_name: None,
1085 azblob_account_key: None,
1086 azblob_endpoint_url: None,
1087 adlsgen2_account_name: None,
1088 adlsgen2_account_key: None,
1089 adlsgen2_endpoint: None,
1090 adlsgen2_tenant_id: None,
1091 adlsgen2_client_id: None,
1092 adlsgen2_client_secret: None,
1093 adlsgen2_authority_host: None,
1094 warehouse_path: Some("s3://bucket/warehouse".to_owned()),
1095 catalog_name: None,
1096 catalog_uri: None,
1097 catalog_credential: None,
1098 catalog_token: None,
1099 catalog_oauth2_server_uri: None,
1100 catalog_scope: None,
1101 rest_signing_region: None,
1102 rest_signing_name: None,
1103 rest_sigv4_enabled: None,
1104 s3_path_style_access: None,
1105 enable_config_load: None,
1106 hosted_catalog: None,
1107 catalog_header: None,
1108 vended_credentials: None,
1109 catalog_security: None,
1110 gcp_auth_scopes: None,
1111 catalog_io_impl: None,
1112 }
1113 }
1114
1115 #[test]
1116 fn test_glue_jni_catalog_uses_s3_assume_role_for_file_io() {
1117 let common = IcebergCommon {
1118 s3_iam_role_arn: Some("arn:aws:iam::123456789012:role/risingwave-s3".to_owned()),
1119 ..test_common("glue")
1120 };
1121
1122 let (_, java_catalog_configs) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1123
1124 assert_eq!(
1125 java_catalog_configs.get("s3.client-factory-impl").unwrap(),
1126 "com.risingwave.connector.catalog.S3FileIOAssumeRoleAwsClientFactory"
1127 );
1128 assert_eq!(
1129 java_catalog_configs.get("s3.iam-role-arn").unwrap(),
1130 "arn:aws:iam::123456789012:role/risingwave-s3"
1131 );
1132 assert!(!java_catalog_configs.contains_key("client.factory"));
1133 }
1134
1135 #[test]
1136 fn test_adlsgen2_service_principal_populates_file_io_configs_with_default_authority_host() {
1137 let common = test_adlsgen2_service_principal_common(None);
1138
1139 let (file_io_props, _) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1140
1141 assert_eq!(file_io_props.get(ADLS_TENANT_ID).unwrap(), "tenant-uuid");
1142 assert_eq!(file_io_props.get(ADLS_CLIENT_ID).unwrap(), "client-uuid");
1143 assert_eq!(
1144 file_io_props.get(ADLS_CLIENT_SECRET).unwrap(),
1145 "secret-value"
1146 );
1147 assert_eq!(
1148 file_io_props.get(ADLS_AUTHORITY_HOST).unwrap(),
1149 ADLS_DEFAULT_AUTHORITY_HOST
1150 );
1151 }
1152
1153 fn test_adlsgen2_service_principal_common(authority_host: Option<&str>) -> IcebergCommon {
1154 IcebergCommon {
1155 adlsgen2_account_name: Some("acct".to_owned()),
1156 adlsgen2_tenant_id: Some("tenant-uuid".to_owned()),
1157 adlsgen2_client_id: Some("client-uuid".to_owned()),
1158 adlsgen2_client_secret: Some("secret-value".to_owned()),
1159 adlsgen2_authority_host: authority_host.map(str::to_owned),
1160 warehouse_path: Some("abfss://wh@acct.dfs.core.windows.net/wh".to_owned()),
1161 ..test_common("rest")
1162 }
1163 }
1164
1165 #[test]
1166 fn test_adlsgen2_service_principal_authority_host_override_is_respected() {
1167 let common =
1168 test_adlsgen2_service_principal_common(Some("https://login.microsoftonline.us"));
1169
1170 let (file_io_props, _) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1171
1172 assert_eq!(
1173 file_io_props.get(ADLS_AUTHORITY_HOST).unwrap(),
1174 "https://login.microsoftonline.us"
1175 );
1176 }
1177
1178 #[test]
1179 fn test_adlsgen2_authority_host_rejects_non_bare_https_origins() {
1180 let cases = [
1181 ("not a url", "does not parse as a URL"),
1182 (
1183 "http://login.microsoftonline.com",
1184 "must use the https scheme",
1185 ),
1186 (
1187 "https://user:pass@login.microsoftonline.com",
1188 "must not contain userinfo",
1189 ),
1190 (
1191 "https://login.microsoftonline.com?bar=baz",
1192 "must not contain a query or fragment",
1193 ),
1194 (
1195 "https://login.microsoftonline.com#frag",
1196 "must not contain a query or fragment",
1197 ),
1198 (
1199 "https://login.microsoftonline.com/foo",
1200 "must not contain a path component",
1201 ),
1202 ];
1203 for (authority_host, expected_error) in cases {
1204 let common = test_adlsgen2_service_principal_common(Some(authority_host));
1205 let err = common
1206 .build_jni_catalog_configs(&HashMap::new())
1207 .unwrap_err();
1208 assert!(
1209 format!("{:#}", err).contains(expected_error),
1210 "authority_host {authority_host:?}: expected error containing {expected_error:?}, got: {err:#}"
1211 );
1212 }
1213 }
1214
1215 #[test]
1216 fn test_adlsgen2_authority_host_trailing_slash_is_normalized() {
1217 let common =
1218 test_adlsgen2_service_principal_common(Some("https://login.microsoftonline.us/"));
1219
1220 let (file_io_props, _) = common.build_jni_catalog_configs(&HashMap::new()).unwrap();
1221
1222 assert_eq!(
1223 file_io_props.get(ADLS_AUTHORITY_HOST).unwrap(),
1224 "https://login.microsoftonline.us"
1225 );
1226 }
1227
1228 #[test]
1229 fn test_adlsgen2_rejects_mixing_shared_key_and_service_principal() {
1230 let common = IcebergCommon {
1231 adlsgen2_account_key: Some("shared-key".to_owned()),
1232 ..test_adlsgen2_service_principal_common(None)
1233 };
1234
1235 let err = common
1236 .build_jni_catalog_configs(&HashMap::new())
1237 .unwrap_err();
1238 assert!(
1239 format!("{:#}", err).contains("exactly one auth mode"),
1240 "expected mutual-exclusion error, got: {err:#}"
1241 );
1242 }
1243
1244 #[test]
1245 fn test_adlsgen2_rejects_partial_service_principal_config() {
1246 let common = IcebergCommon {
1247 adlsgen2_client_secret: None,
1248 ..test_adlsgen2_service_principal_common(None)
1249 };
1250
1251 let err = common
1252 .build_jni_catalog_configs(&HashMap::new())
1253 .unwrap_err();
1254 assert!(
1255 format!("{:#}", err).contains("requires all three"),
1256 "expected partial-config error, got: {err:#}"
1257 );
1258 }
1259
1260 #[test]
1261 fn test_iceberg_table_identifier_validation() {
1262 let valid_identifier = IcebergTableIdentifier {
1263 database_name: Some("valid_db".to_owned()),
1264 table_name: "test_table".to_owned(),
1265 };
1266 assert!(valid_identifier.validate().is_ok());
1267
1268 let valid_underscore = IcebergTableIdentifier {
1269 database_name: Some("valid_db_name".to_owned()),
1270 table_name: "test_table".to_owned(),
1271 };
1272 assert!(valid_underscore.validate().is_ok());
1273
1274 let no_database = IcebergTableIdentifier {
1275 database_name: None,
1276 table_name: "test_table".to_owned(),
1277 };
1278 assert!(no_database.validate().is_ok());
1279
1280 let empty_part = IcebergTableIdentifier {
1281 database_name: Some("a..b".to_owned()),
1282 table_name: "test_table".to_owned(),
1283 };
1284 let result = empty_part.validate();
1285 assert!(result.is_err());
1286 assert!(
1287 result
1288 .unwrap_err()
1289 .to_string()
1290 .contains("identifier parts must not be empty")
1291 );
1292
1293 let leading_dot = IcebergTableIdentifier {
1294 database_name: None,
1295 table_name: ".test_table".to_owned(),
1296 };
1297 let result = leading_dot.validate();
1298 assert!(result.is_err());
1299 assert!(
1300 result
1301 .unwrap_err()
1302 .to_string()
1303 .contains("identifier parts must not be empty")
1304 );
1305 }
1306
1307 #[test]
1308 fn test_iceberg_table_identifier_dots_as_namespace_separators() {
1309 let table_ident = IcebergTableIdentifier {
1310 database_name: Some("general.zia.stats".to_owned()),
1311 table_name: "tagged_security_transactions".to_owned(),
1312 }
1313 .to_table_ident()
1314 .unwrap();
1315 let namespace: Vec<_> = table_ident
1316 .namespace()
1317 .as_ref()
1318 .iter()
1319 .map(String::as_str)
1320 .collect();
1321 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1322 assert_eq!(table_ident.name(), "tagged_security_transactions");
1323
1324 let table_ident = IcebergTableIdentifier {
1325 database_name: Some("general".to_owned()),
1326 table_name: "zia.stats.tagged_security_transactions".to_owned(),
1327 }
1328 .to_table_ident()
1329 .unwrap();
1330 let namespace: Vec<_> = table_ident
1331 .namespace()
1332 .as_ref()
1333 .iter()
1334 .map(String::as_str)
1335 .collect();
1336 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1337 assert_eq!(table_ident.name(), "tagged_security_transactions");
1338
1339 let table_ident = IcebergTableIdentifier {
1340 database_name: None,
1341 table_name: "general.zia.stats.tagged_security_transactions".to_owned(),
1342 }
1343 .to_table_ident()
1344 .unwrap();
1345 let namespace: Vec<_> = table_ident
1346 .namespace()
1347 .as_ref()
1348 .iter()
1349 .map(String::as_str)
1350 .collect();
1351 assert_eq!(namespace, vec!["general", "zia", "stats"]);
1352 assert_eq!(table_ident.name(), "tagged_security_transactions");
1353 }
1354}