1pub mod compaction;
16mod jni_catalog;
17mod mock_catalog;
18mod storage_catalog;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
24use ::iceberg::table::Table;
25use ::iceberg::{Catalog, TableIdent};
26use anyhow::{Context, anyhow};
27use iceberg::io::{
28 ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, AZBLOB_ACCOUNT_KEY, AZBLOB_ACCOUNT_NAME, AZBLOB_ENDPOINT,
29 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD, S3_PATH_STYLE_ACCESS,
30};
31use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
32use phf::{Set, phf_set};
33use risingwave_common::bail;
34use risingwave_common::util::deployment::Deployment;
35use risingwave_common::util::env_var::env_var_is_true;
36use serde::Deserialize;
37use serde_with::serde_as;
38use url::Url;
39use with_options::WithOptions;
40
41use crate::connector_common::common::DISABLE_DEFAULT_CREDENTIAL;
42use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
43use crate::deserialize_optional_bool_from_string;
44use crate::enforce_secret::EnforceSecret;
45use crate::error::ConnectorResult;
46
47#[serde_as]
48#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
49pub struct IcebergCommon {
50 #[serde(rename = "catalog.type")]
53 pub catalog_type: Option<String>,
54 #[serde(rename = "s3.region")]
55 pub region: Option<String>,
56 #[serde(rename = "s3.endpoint")]
57 pub endpoint: Option<String>,
58 #[serde(rename = "s3.access.key")]
59 pub access_key: Option<String>,
60 #[serde(rename = "s3.secret.key")]
61 pub secret_key: Option<String>,
62
63 #[serde(rename = "gcs.credential")]
64 pub gcs_credential: Option<String>,
65
66 #[serde(rename = "azblob.account_name")]
67 pub azblob_account_name: Option<String>,
68 #[serde(rename = "azblob.account_key")]
69 pub azblob_account_key: Option<String>,
70 #[serde(rename = "azblob.endpoint_url")]
71 pub azblob_endpoint_url: Option<String>,
72
73 #[serde(rename = "adlsgen2.account_name")]
74 pub adlsgen2_account_name: Option<String>,
75 #[serde(rename = "adlsgen2.account_key")]
76 pub adlsgen2_account_key: Option<String>,
77 #[serde(rename = "adlsgen2.endpoint")]
78 pub adlsgen2_endpoint: Option<String>,
79
80 #[serde(rename = "warehouse.path")]
82 pub warehouse_path: Option<String>,
83 #[serde(rename = "glue.id")]
86 pub glue_id: Option<String>,
87 #[serde(rename = "catalog.name")]
89 pub catalog_name: Option<String>,
90 #[serde(rename = "catalog.uri")]
92 pub catalog_uri: Option<String>,
93 #[serde(rename = "catalog.credential")]
96 pub credential: Option<String>,
97 #[serde(rename = "catalog.token")]
100 pub token: Option<String>,
101 #[serde(rename = "catalog.oauth2_server_uri")]
104 pub oauth2_server_uri: Option<String>,
105 #[serde(rename = "catalog.scope")]
108 pub scope: Option<String>,
109
110 #[serde(rename = "catalog.rest.signing_region")]
112 pub rest_signing_region: Option<String>,
113
114 #[serde(rename = "catalog.rest.signing_name")]
116 pub rest_signing_name: Option<String>,
117
118 #[serde(
120 rename = "catalog.rest.sigv4_enabled",
121 default,
122 deserialize_with = "deserialize_optional_bool_from_string"
123 )]
124 pub rest_sigv4_enabled: Option<bool>,
125
126 #[serde(
127 rename = "s3.path.style.access",
128 default,
129 deserialize_with = "deserialize_optional_bool_from_string"
130 )]
131 pub path_style_access: Option<bool>,
132 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
134 pub enable_config_load: Option<bool>,
135
136 #[serde(
138 rename = "hosted_catalog",
139 default,
140 deserialize_with = "deserialize_optional_bool_from_string"
141 )]
142 pub hosted_catalog: Option<bool>,
143
144 #[serde(rename = "catalog.header")]
151 pub header: Option<String>,
152
153 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
155 pub vended_credentials: Option<bool>,
156}
157
158impl EnforceSecret for IcebergCommon {
159 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
160 "s3.access.key",
161 "s3.secret.key",
162 "gcs.credential",
163 "catalog.credential",
164 "catalog.token",
165 "catalog.oauth2_server_uri",
166 "adlsgen2.account_key",
167 "adlsgen2.client_secret",
168 };
169}
170
171#[serde_as]
172#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
173#[serde(deny_unknown_fields)]
174pub struct IcebergTableIdentifier {
175 #[serde(rename = "database.name")]
176 pub database_name: Option<String>,
177 #[serde(rename = "table.name")]
179 pub table_name: String,
180}
181
182impl IcebergTableIdentifier {
183 pub fn database_name(&self) -> Option<&str> {
184 self.database_name.as_deref()
185 }
186
187 pub fn table_name(&self) -> &str {
188 &self.table_name
189 }
190
191 pub fn to_table_ident(&self) -> ConnectorResult<TableIdent> {
192 let ret = if let Some(database_name) = &self.database_name {
193 TableIdent::from_strs(vec![database_name, &self.table_name])
194 } else {
195 TableIdent::from_strs(vec![&self.table_name])
196 };
197
198 Ok(ret.context("Failed to create table identifier")?)
199 }
200}
201
202impl IcebergCommon {
203 pub fn catalog_type(&self) -> &str {
204 let catalog_type: &str = self.catalog_type.as_deref().unwrap_or("storage");
205 if self.vended_credentials() && catalog_type == "rest" {
206 "rest_rust"
207 } else {
208 catalog_type
209 }
210 }
211
212 pub fn vended_credentials(&self) -> bool {
213 self.vended_credentials.unwrap_or(false)
214 }
215
216 pub fn catalog_name(&self) -> String {
217 self.catalog_name
218 .as_ref()
219 .cloned()
220 .unwrap_or_else(|| "risingwave".to_owned())
221 }
222
223 pub fn headers(&self) -> ConnectorResult<HashMap<String, String>> {
224 let mut headers = HashMap::new();
225 let user_agent = match Deployment::current() {
226 Deployment::Ci => "RisingWave(CI)".to_owned(),
227 Deployment::Cloud => "RisingWave(Cloud)".to_owned(),
228 Deployment::Other => "RisingWave(OSS)".to_owned(),
229 };
230 if self.vended_credentials() {
231 headers.insert(
232 "X-Iceberg-Access-Delegation".to_owned(),
233 "vended-credentials".to_owned(),
234 );
235 }
236 headers.insert("User-Agent".to_owned(), user_agent);
237 if let Some(header) = &self.header {
238 for pair in header.split(';') {
239 let mut parts = pair.split('=');
240 if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
241 headers.insert(key.to_owned(), value.to_owned());
242 } else {
243 bail!("Invalid header format: {}", pair);
244 }
245 }
246 }
247 Ok(headers)
248 }
249
250 pub fn enable_config_load(&self) -> bool {
251 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
253 return false;
254 }
255 self.enable_config_load.unwrap_or(false)
256 }
257
258 fn build_jni_catalog_configs(
260 &self,
261 java_catalog_props: &HashMap<String, String>,
262 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
263 let mut iceberg_configs = HashMap::new();
264 let enable_config_load = self.enable_config_load();
265 let file_io_props = {
266 let catalog_type = self.catalog_type().to_owned();
267
268 if let Some(region) = &self.region {
269 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
271 }
272
273 if let Some(endpoint) = &self.endpoint {
274 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
276 }
277
278 if let Some(access_key) = &self.access_key {
280 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
281 }
282 if let Some(secret_key) = &self.secret_key {
283 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
284 }
285 if let Some(gcs_credential) = &self.gcs_credential {
286 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
287 if catalog_type != "rest" && catalog_type != "rest_rust" {
288 bail!("gcs unsupported in {} catalog", &catalog_type);
289 }
290 }
291
292 if let (
293 Some(azblob_account_name),
294 Some(azblob_account_key),
295 Some(azblob_endpoint_url),
296 ) = (
297 &self.azblob_account_name,
298 &self.azblob_account_key,
299 &self.azblob_endpoint_url,
300 ) {
301 iceberg_configs.insert(AZBLOB_ACCOUNT_NAME.to_owned(), azblob_account_name.clone());
302 iceberg_configs.insert(AZBLOB_ACCOUNT_KEY.to_owned(), azblob_account_key.clone());
303 iceberg_configs.insert(AZBLOB_ENDPOINT.to_owned(), azblob_endpoint_url.clone());
304
305 if catalog_type != "rest" && catalog_type != "rest_rust" {
306 bail!("azblob unsupported in {} catalog", &catalog_type);
307 }
308 }
309
310 if let (Some(account_name), Some(account_key)) = (
311 self.adlsgen2_account_name.as_ref(),
312 self.adlsgen2_account_key.as_ref(),
313 ) {
314 iceberg_configs.insert(ADLS_ACCOUNT_NAME.to_owned(), account_name.clone());
315 iceberg_configs.insert(ADLS_ACCOUNT_KEY.to_owned(), account_key.clone());
316 if catalog_type != "rest" && catalog_type != "rest_rust" {
317 bail!("adlsgen2 unsupported in {} catalog", &catalog_type);
318 }
319 }
320
321 match &self.warehouse_path {
322 Some(warehouse_path) => {
323 let (bucket, _) = {
324 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
325 let url = Url::parse(warehouse_path);
326 if (url.is_err() || is_s3_tables)
327 && (catalog_type == "rest" || catalog_type == "rest_rust")
328 {
329 (None, None)
333 } else {
334 let url = url.with_context(|| {
335 format!("Invalid warehouse path: {}", warehouse_path)
336 })?;
337 let bucket = url
338 .host_str()
339 .with_context(|| {
340 format!(
341 "Invalid s3 path: {}, bucket is missing",
342 warehouse_path
343 )
344 })?
345 .to_owned();
346 let root = url.path().trim_start_matches('/').to_owned();
347 (Some(bucket), Some(root))
348 }
349 };
350
351 if let Some(bucket) = bucket {
352 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
353 }
354 }
355 None => {
356 if catalog_type != "rest" && catalog_type != "rest_rust" {
357 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
358 }
359 }
360 }
361 iceberg_configs.insert(
362 S3_DISABLE_CONFIG_LOAD.to_owned(),
363 (!enable_config_load).to_string(),
364 );
365
366 iceberg_configs.insert(
367 GCS_DISABLE_CONFIG_LOAD.to_owned(),
368 (!enable_config_load).to_string(),
369 );
370
371 if let Some(path_style_access) = self.path_style_access {
372 iceberg_configs.insert(
373 S3_PATH_STYLE_ACCESS.to_owned(),
374 path_style_access.to_string(),
375 );
376 }
377
378 iceberg_configs
379 };
380
381 let mut java_catalog_configs = HashMap::new();
383 {
384 if let Some(uri) = self.catalog_uri.as_deref() {
385 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
386 }
387
388 if let Some(warehouse_path) = &self.warehouse_path {
389 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
390 }
391 java_catalog_configs.extend(java_catalog_props.clone());
392
393 java_catalog_configs.insert(
395 "io-impl".to_owned(),
396 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
397 );
398
399 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
401
402 if let Some(region) = &self.region {
403 java_catalog_configs.insert("client.region".to_owned(), region.clone());
404 }
405 if let Some(endpoint) = &self.endpoint {
406 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
407 }
408
409 if let Some(access_key) = &self.access_key {
410 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
411 }
412 if let Some(secret_key) = &self.secret_key {
413 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
414 }
415
416 if let Some(path_style_access) = &self.path_style_access {
417 java_catalog_configs.insert(
418 "s3.path-style-access".to_owned(),
419 path_style_access.to_string(),
420 );
421 }
422
423 let headers = self.headers()?;
424 for (header_name, header_value) in headers {
425 java_catalog_configs.insert(format!("header.{}", header_name), header_value);
426 }
427
428 match self.catalog_type() {
429 "rest" => {
430 if let Some(credential) = &self.credential {
431 java_catalog_configs.insert("credential".to_owned(), credential.clone());
432 }
433 if let Some(token) = &self.token {
434 java_catalog_configs.insert("token".to_owned(), token.clone());
435 }
436 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
437 java_catalog_configs
438 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
439 }
440 if let Some(scope) = &self.scope {
441 java_catalog_configs.insert("scope".to_owned(), scope.clone());
442 }
443 if let Some(rest_signing_region) = &self.rest_signing_region {
444 java_catalog_configs.insert(
445 "rest.signing-region".to_owned(),
446 rest_signing_region.clone(),
447 );
448 }
449 if let Some(rest_signing_name) = &self.rest_signing_name {
450 java_catalog_configs
451 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
452 }
453 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
454 java_catalog_configs.insert(
455 "rest.sigv4-enabled".to_owned(),
456 rest_sigv4_enabled.to_string(),
457 );
458
459 if let Some(access_key) = &self.access_key {
460 java_catalog_configs
461 .insert("rest.access-key-id".to_owned(), access_key.clone());
462 }
463
464 if let Some(secret_key) = &self.secret_key {
465 java_catalog_configs
466 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
467 }
468 }
469 }
470 "glue" => {
471 if !enable_config_load {
472 java_catalog_configs.insert(
473 "client.credentials-provider".to_owned(),
474 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
475 );
476 if let Some(access_key) = &self.access_key {
479 java_catalog_configs.insert(
480 "client.credentials-provider.glue.access-key-id".to_owned(),
481 access_key.clone(),
482 );
483 }
484 if let Some(secret_key) = &self.secret_key {
485 java_catalog_configs.insert(
486 "client.credentials-provider.glue.secret-access-key".to_owned(),
487 secret_key.clone(),
488 );
489 }
490 }
491
492 if let Some(region) = &self.region {
493 java_catalog_configs.insert("client.region".to_owned(), region.clone());
494 java_catalog_configs.insert(
495 "glue.endpoint".to_owned(),
496 format!("https://glue.{}.amazonaws.com", region),
497 );
498 }
499
500 if let Some(glue_id) = self.glue_id.as_deref() {
501 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
502 }
503 }
504 _ => {}
505 }
506 }
507
508 Ok((file_io_props, java_catalog_configs))
509 }
510}
511
512impl IcebergCommon {
513 pub async fn create_catalog(
515 &self,
516 java_catalog_props: &HashMap<String, String>,
517 ) -> ConnectorResult<Arc<dyn Catalog>> {
518 match self.catalog_type() {
519 "storage" => {
520 let warehouse = self
521 .warehouse_path
522 .clone()
523 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
524 let url = Url::parse(warehouse.as_ref())
525 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
526
527 let config = match url.scheme() {
528 "s3" | "s3a" => StorageCatalogConfig::S3(
529 storage_catalog::StorageCatalogS3Config::builder()
530 .warehouse(warehouse)
531 .access_key(self.access_key.clone())
532 .secret_key(self.secret_key.clone())
533 .region(self.region.clone())
534 .endpoint(self.endpoint.clone())
535 .path_style_access(self.path_style_access)
536 .enable_config_load(Some(self.enable_config_load()))
537 .build(),
538 ),
539 "gs" | "gcs" => StorageCatalogConfig::Gcs(
540 storage_catalog::StorageCatalogGcsConfig::builder()
541 .warehouse(warehouse)
542 .credential(self.gcs_credential.clone())
543 .enable_config_load(Some(self.enable_config_load()))
544 .build(),
545 ),
546 "azblob" => StorageCatalogConfig::Azblob(
547 storage_catalog::StorageCatalogAzblobConfig::builder()
548 .warehouse(warehouse)
549 .account_name(self.azblob_account_name.clone())
550 .account_key(self.azblob_account_key.clone())
551 .endpoint(self.azblob_endpoint_url.clone())
552 .build(),
553 ),
554 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
555 };
556
557 let catalog = storage_catalog::StorageCatalog::new(config)?;
558 Ok(Arc::new(catalog))
559 }
560 "rest_rust" => {
561 let mut iceberg_configs = HashMap::new();
562
563 if let Some(gcs_credential) = &self.gcs_credential {
565 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
566 } else {
567 if let Some(region) = &self.region {
568 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
569 }
570 if let Some(endpoint) = &self.endpoint {
571 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
572 }
573 if let Some(access_key) = &self.access_key {
574 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
575 }
576 if let Some(secret_key) = &self.secret_key {
577 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
578 }
579 if let Some(path_style_access) = &self.path_style_access {
580 iceberg_configs.insert(
581 S3_PATH_STYLE_ACCESS.to_owned(),
582 path_style_access.to_string(),
583 );
584 }
585 };
586
587 if let Some(credential) = &self.credential {
588 iceberg_configs.insert("credential".to_owned(), credential.clone());
589 }
590 if let Some(token) = &self.token {
591 iceberg_configs.insert("token".to_owned(), token.clone());
592 }
593 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
594 iceberg_configs
595 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
596 }
597 if let Some(scope) = &self.scope {
598 iceberg_configs.insert("scope".to_owned(), scope.clone());
599 }
600
601 let headers = self.headers()?;
602 for (header_name, header_value) in headers {
603 iceberg_configs.insert(format!("header.{}", header_name), header_value);
604 }
605
606 let config_builder =
607 iceberg_catalog_rest::RestCatalogConfig::builder()
608 .uri(self.catalog_uri.clone().with_context(|| {
609 "`catalog.uri` must be set in rest catalog".to_owned()
610 })?)
611 .props(iceberg_configs);
612
613 let config = match &self.warehouse_path {
614 Some(warehouse_path) => {
615 config_builder.warehouse(warehouse_path.clone()).build()
616 }
617 None => config_builder.build(),
618 };
619 let catalog = iceberg_catalog_rest::RestCatalog::new(config);
620 Ok(Arc::new(catalog))
621 }
622 "glue_rust" => {
623 let mut iceberg_configs = HashMap::new();
624 if let Some(region) = &self.region {
626 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
627 }
628 if let Some(access_key) = &self.access_key {
629 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
630 }
631 if let Some(secret_key) = &self.secret_key {
632 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
633 }
634 if let Some(region) = &self.region {
636 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
637 }
638 if let Some(endpoint) = &self.endpoint {
639 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
640 }
641 if let Some(access_key) = &self.access_key {
642 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
643 }
644 if let Some(secret_key) = &self.secret_key {
645 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
646 }
647 if let Some(path_style_access) = &self.path_style_access {
648 iceberg_configs.insert(
649 S3_PATH_STYLE_ACCESS.to_owned(),
650 path_style_access.to_string(),
651 );
652 }
653 let config_builder =
654 iceberg_catalog_glue::GlueCatalogConfig::builder()
655 .warehouse(self.warehouse_path.clone().ok_or_else(|| {
656 anyhow!("`warehouse.path` must be set in glue catalog")
657 })?)
658 .props(iceberg_configs);
659 let config = if let Some(uri) = self.catalog_uri.as_deref() {
660 config_builder.uri(uri.to_owned()).build()
661 } else {
662 config_builder.build()
663 };
664 let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
665 Ok(Arc::new(catalog))
666 }
667 catalog_type
668 if catalog_type == "hive"
669 || catalog_type == "snowflake"
670 || catalog_type == "jdbc"
671 || catalog_type == "rest"
672 || catalog_type == "glue" =>
673 {
674 let (file_io_props, java_catalog_props) =
676 self.build_jni_catalog_configs(java_catalog_props)?;
677 let catalog_impl = match catalog_type {
678 "hive" => "org.apache.iceberg.hive.HiveCatalog",
679 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
680 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
681 "rest" => "org.apache.iceberg.rest.RESTCatalog",
682 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
683 _ => unreachable!(),
684 };
685
686 jni_catalog::JniCatalog::build_catalog(
687 file_io_props,
688 self.catalog_name(),
689 catalog_impl,
690 java_catalog_props,
691 )
692 }
693 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
694 _ => {
695 bail!(
696 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
697 self.catalog_type()
698 )
699 }
700 }
701 }
702
703 pub async fn load_table(
705 &self,
706 table: &IcebergTableIdentifier,
707 java_catalog_props: &HashMap<String, String>,
708 ) -> ConnectorResult<Table> {
709 let catalog = self
710 .create_catalog(java_catalog_props)
711 .await
712 .context("Unable to load iceberg catalog")?;
713
714 let table_id = table
715 .to_table_ident()
716 .context("Unable to parse table name")?;
717
718 catalog.load_table(&table_id).await.map_err(Into::into)
719 }
720}