risingwave_connector/connector_common/iceberg/
mod.rs1mod jni_catalog;
16mod mock_catalog;
17mod storage_catalog;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
22use ::iceberg::table::Table;
23use ::iceberg::{Catalog, TableIdent};
24use anyhow::{Context, anyhow};
25use iceberg::io::{GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD};
26use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
27use risingwave_common::bail;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use url::Url;
31use with_options::WithOptions;
32
33use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
34use crate::deserialize_optional_bool_from_string;
35use crate::error::ConnectorResult;
36
37#[serde_as]
38#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
39pub struct IcebergCommon {
40 #[serde(rename = "catalog.type")]
43 pub catalog_type: Option<String>,
44 #[serde(rename = "s3.region")]
45 pub region: Option<String>,
46 #[serde(rename = "s3.endpoint")]
47 pub endpoint: Option<String>,
48 #[serde(rename = "s3.access.key")]
49 pub access_key: Option<String>,
50 #[serde(rename = "s3.secret.key")]
51 pub secret_key: Option<String>,
52
53 #[serde(rename = "gcs.credential")]
54 pub gcs_credential: Option<String>,
55
56 #[serde(rename = "warehouse.path")]
58 pub warehouse_path: Option<String>,
59 #[serde(rename = "glue.id")]
62 pub glue_id: Option<String>,
63 #[serde(rename = "catalog.name")]
65 pub catalog_name: Option<String>,
66 #[serde(rename = "catalog.uri")]
68 pub catalog_uri: Option<String>,
69 #[serde(rename = "database.name")]
70 pub database_name: Option<String>,
71 #[serde(rename = "table.name")]
73 pub table_name: String,
74 #[serde(rename = "catalog.credential")]
77 pub credential: Option<String>,
78 #[serde(rename = "catalog.token")]
81 pub token: Option<String>,
82 #[serde(rename = "catalog.oauth2_server_uri")]
85 pub oauth2_server_uri: Option<String>,
86 #[serde(rename = "catalog.scope")]
89 pub scope: Option<String>,
90
91 #[serde(rename = "catalog.rest.signing_region")]
93 pub rest_signing_region: Option<String>,
94
95 #[serde(rename = "catalog.rest.signing_name")]
97 pub rest_signing_name: Option<String>,
98
99 #[serde(
101 rename = "catalog.rest.sigv4_enabled",
102 default,
103 deserialize_with = "deserialize_optional_bool_from_string"
104 )]
105 pub rest_sigv4_enabled: Option<bool>,
106
107 #[serde(
108 rename = "s3.path.style.access",
109 default,
110 deserialize_with = "deserialize_optional_bool_from_string"
111 )]
112 pub path_style_access: Option<bool>,
113 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
115 pub enable_config_load: Option<bool>,
116}
117
118impl IcebergCommon {
119 pub fn catalog_type(&self) -> &str {
120 self.catalog_type.as_deref().unwrap_or("storage")
121 }
122
123 pub fn catalog_name(&self) -> String {
124 self.catalog_name
125 .as_ref()
126 .map(|s| s.to_string())
127 .unwrap_or_else(|| "risingwave".to_owned())
128 }
129
130 fn build_jni_catalog_configs(
132 &self,
133 java_catalog_props: &HashMap<String, String>,
134 ) -> ConnectorResult<(HashMap<String, String>, HashMap<String, String>)> {
135 let mut iceberg_configs = HashMap::new();
136 let enable_config_load = self.enable_config_load.unwrap_or(false);
137 let file_io_props = {
138 let catalog_type = self.catalog_type().to_owned();
139
140 if let Some(region) = &self.region {
141 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
143 }
144
145 if let Some(endpoint) = &self.endpoint {
146 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
148 }
149
150 if let Some(access_key) = &self.access_key {
152 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
153 }
154 if let Some(secret_key) = &self.secret_key {
155 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
156 }
157 if let Some(gcs_credential) = &self.gcs_credential {
158 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
159 if catalog_type != "rest" && catalog_type != "rest_rust" {
160 bail!("gcs unsupported in {} catalog", &catalog_type);
161 }
162 }
163
164 match &self.warehouse_path {
165 Some(warehouse_path) => {
166 let (bucket, _) = {
167 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
168 let url = Url::parse(warehouse_path);
169 if (url.is_err() || is_s3_tables)
170 && (catalog_type == "rest" || catalog_type == "rest_rust")
171 {
172 (None, None)
176 } else {
177 let url = url.with_context(|| {
178 format!("Invalid warehouse path: {}", warehouse_path)
179 })?;
180 let bucket = url
181 .host_str()
182 .with_context(|| {
183 format!(
184 "Invalid s3 path: {}, bucket is missing",
185 warehouse_path
186 )
187 })?
188 .to_owned();
189 let root = url.path().trim_start_matches('/').to_owned();
190 (Some(bucket), Some(root))
191 }
192 };
193
194 if let Some(bucket) = bucket {
195 iceberg_configs.insert("iceberg.table.io.bucket".to_owned(), bucket);
196 }
197 }
198 None => {
199 if catalog_type != "rest" && catalog_type != "rest_rust" {
200 bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
201 }
202 }
203 }
204 iceberg_configs.insert(
205 S3_DISABLE_CONFIG_LOAD.to_owned(),
206 (!enable_config_load).to_string(),
207 );
208
209 iceberg_configs.insert(
210 GCS_DISABLE_CONFIG_LOAD.to_owned(),
211 (!enable_config_load).to_string(),
212 );
213
214 iceberg_configs
215 };
216
217 let mut java_catalog_configs = HashMap::new();
219 {
220 if let Some(uri) = self.catalog_uri.as_deref() {
221 java_catalog_configs.insert("uri".to_owned(), uri.to_owned());
222 }
223
224 if let Some(warehouse_path) = &self.warehouse_path {
225 java_catalog_configs.insert("warehouse".to_owned(), warehouse_path.clone());
226 }
227 java_catalog_configs.extend(java_catalog_props.clone());
228
229 java_catalog_configs.insert(
231 "io-impl".to_owned(),
232 "org.apache.iceberg.aws.s3.S3FileIO".to_owned(),
233 );
234
235 java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());
237
238 if let Some(region) = &self.region {
239 java_catalog_configs.insert("client.region".to_owned(), region.clone());
240 }
241 if let Some(endpoint) = &self.endpoint {
242 java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
243 }
244
245 if let Some(access_key) = &self.access_key {
246 java_catalog_configs.insert("s3.access-key-id".to_owned(), access_key.clone());
247 }
248 if let Some(secret_key) = &self.secret_key {
249 java_catalog_configs.insert("s3.secret-access-key".to_owned(), secret_key.clone());
250 }
251
252 if let Some(path_style_access) = self.path_style_access {
253 java_catalog_configs.insert(
254 "s3.path-style-access".to_owned(),
255 path_style_access.to_string(),
256 );
257 }
258
259 match self.catalog_type.as_deref() {
260 Some("rest") => {
261 if let Some(credential) = &self.credential {
262 java_catalog_configs.insert("credential".to_owned(), credential.clone());
263 }
264 if let Some(token) = &self.token {
265 java_catalog_configs.insert("token".to_owned(), token.clone());
266 }
267 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
268 java_catalog_configs
269 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
270 }
271 if let Some(scope) = &self.scope {
272 java_catalog_configs.insert("scope".to_owned(), scope.clone());
273 }
274 if let Some(rest_signing_region) = &self.rest_signing_region {
275 java_catalog_configs.insert(
276 "rest.signing-region".to_owned(),
277 rest_signing_region.clone(),
278 );
279 }
280 if let Some(rest_signing_name) = &self.rest_signing_name {
281 java_catalog_configs
282 .insert("rest.signing-name".to_owned(), rest_signing_name.clone());
283 }
284 if let Some(rest_sigv4_enabled) = self.rest_sigv4_enabled {
285 java_catalog_configs.insert(
286 "rest.sigv4-enabled".to_owned(),
287 rest_sigv4_enabled.to_string(),
288 );
289
290 if let Some(access_key) = &self.access_key {
291 java_catalog_configs
292 .insert("rest.access-key-id".to_owned(), access_key.clone());
293 }
294
295 if let Some(secret_key) = &self.secret_key {
296 java_catalog_configs
297 .insert("rest.secret-access-key".to_owned(), secret_key.clone());
298 }
299 }
300 }
301 Some("glue") => {
302 if !enable_config_load {
303 java_catalog_configs.insert(
304 "client.credentials-provider".to_owned(),
305 "com.risingwave.connector.catalog.GlueCredentialProvider".to_owned(),
306 );
307 if let Some(access_key) = &self.access_key {
310 java_catalog_configs.insert(
311 "client.credentials-provider.glue.access-key-id".to_owned(),
312 access_key.clone(),
313 );
314 }
315 if let Some(secret_key) = &self.secret_key {
316 java_catalog_configs.insert(
317 "client.credentials-provider.glue.secret-access-key".to_owned(),
318 secret_key.clone(),
319 );
320 }
321 }
322
323 if let Some(region) = &self.region {
324 java_catalog_configs.insert("client.region".to_owned(), region.clone());
325 java_catalog_configs.insert(
326 "glue.endpoint".to_owned(),
327 format!("https://glue.{}.amazonaws.com", region),
328 );
329 }
330
331 if let Some(glue_id) = self.glue_id.as_deref() {
332 java_catalog_configs.insert("glue.id".to_owned(), glue_id.to_owned());
333 }
334 }
335 _ => {}
336 }
337 }
338
339 Ok((file_io_props, java_catalog_configs))
340 }
341}
342
343impl IcebergCommon {
344 pub fn full_table_name(&self) -> ConnectorResult<TableIdent> {
345 let ret = if let Some(database_name) = &self.database_name {
346 TableIdent::from_strs(vec![database_name, &self.table_name])
347 } else {
348 TableIdent::from_strs(vec![&self.table_name])
349 };
350
351 Ok(ret.context("Failed to create table identifier")?)
352 }
353
354 pub async fn create_catalog(
356 &self,
357 java_catalog_props: &HashMap<String, String>,
358 ) -> ConnectorResult<Arc<dyn Catalog>> {
359 match self.catalog_type() {
360 "storage" => {
361 let warehouse = self
362 .warehouse_path
363 .clone()
364 .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
365 let url = Url::parse(warehouse.as_ref())
366 .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;
367
368 let config = match url.scheme() {
369 "s3" | "s3a" => StorageCatalogConfig::S3(
370 storage_catalog::StorageCatalogS3Config::builder()
371 .warehouse(warehouse)
372 .access_key(self.access_key.clone())
373 .secret_key(self.secret_key.clone())
374 .region(self.region.clone())
375 .endpoint(self.endpoint.clone())
376 .enable_config_load(self.enable_config_load)
377 .build(),
378 ),
379 "gs" | "gcs" => StorageCatalogConfig::Gcs(
380 storage_catalog::StorageCatalogGcsConfig::builder()
381 .warehouse(warehouse)
382 .credential(self.gcs_credential.clone())
383 .enable_config_load(self.enable_config_load)
384 .build(),
385 ),
386 scheme => bail!("Unsupported warehouse scheme: {}", scheme),
387 };
388
389 let catalog = storage_catalog::StorageCatalog::new(config)?;
390 Ok(Arc::new(catalog))
391 }
392 "rest_rust" => {
393 let mut iceberg_configs = HashMap::new();
394
395 if let Some(gcs_credential) = &self.gcs_credential {
397 iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
398 } else {
399 if let Some(region) = &self.region {
400 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
401 }
402 if let Some(endpoint) = &self.endpoint {
403 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
404 }
405 if let Some(access_key) = &self.access_key {
406 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
407 }
408 if let Some(secret_key) = &self.secret_key {
409 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
410 }
411 };
412
413 if let Some(credential) = &self.credential {
414 iceberg_configs.insert("credential".to_owned(), credential.clone());
415 }
416 if let Some(token) = &self.token {
417 iceberg_configs.insert("token".to_owned(), token.clone());
418 }
419 if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
420 iceberg_configs
421 .insert("oauth2-server-uri".to_owned(), oauth2_server_uri.clone());
422 }
423 if let Some(scope) = &self.scope {
424 iceberg_configs.insert("scope".to_owned(), scope.clone());
425 }
426
427 let config_builder =
428 iceberg_catalog_rest::RestCatalogConfig::builder()
429 .uri(self.catalog_uri.clone().with_context(|| {
430 "`catalog.uri` must be set in rest catalog".to_owned()
431 })?)
432 .props(iceberg_configs);
433
434 let config = match &self.warehouse_path {
435 Some(warehouse_path) => {
436 config_builder.warehouse(warehouse_path.clone()).build()
437 }
438 None => config_builder.build(),
439 };
440 let catalog = iceberg_catalog_rest::RestCatalog::new(config);
441 Ok(Arc::new(catalog))
442 }
443 "glue_rust" => {
444 let mut iceberg_configs = HashMap::new();
445 if let Some(region) = &self.region {
447 iceberg_configs.insert(AWS_REGION_NAME.to_owned(), region.clone());
448 }
449 if let Some(access_key) = &self.access_key {
450 iceberg_configs.insert(AWS_ACCESS_KEY_ID.to_owned(), access_key.clone());
451 }
452 if let Some(secret_key) = &self.secret_key {
453 iceberg_configs.insert(AWS_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
454 }
455 if let Some(region) = &self.region {
457 iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
458 }
459 if let Some(endpoint) = &self.endpoint {
460 iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
461 }
462 if let Some(access_key) = &self.access_key {
463 iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
464 }
465 if let Some(secret_key) = &self.secret_key {
466 iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
467 }
468 let config_builder =
469 iceberg_catalog_glue::GlueCatalogConfig::builder()
470 .warehouse(self.warehouse_path.clone().ok_or_else(|| {
471 anyhow!("`warehouse.path` must be set in glue catalog")
472 })?)
473 .props(iceberg_configs);
474 let config = if let Some(uri) = self.catalog_uri.as_deref() {
475 config_builder.uri(uri.to_owned()).build()
476 } else {
477 config_builder.build()
478 };
479 let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
480 Ok(Arc::new(catalog))
481 }
482 catalog_type
483 if catalog_type == "hive"
484 || catalog_type == "snowflake"
485 || catalog_type == "jdbc"
486 || catalog_type == "rest"
487 || catalog_type == "glue" =>
488 {
489 let (file_io_props, java_catalog_props) =
491 self.build_jni_catalog_configs(java_catalog_props)?;
492 let catalog_impl = match catalog_type {
493 "hive" => "org.apache.iceberg.hive.HiveCatalog",
494 "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
495 "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
496 "rest" => "org.apache.iceberg.rest.RESTCatalog",
497 "glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
498 _ => unreachable!(),
499 };
500
501 jni_catalog::JniCatalog::build_catalog(
502 file_io_props,
503 self.catalog_name(),
504 catalog_impl,
505 java_catalog_props,
506 )
507 }
508 "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
509 _ => {
510 bail!(
511 "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`, `snowflake`",
512 self.catalog_type()
513 )
514 }
515 }
516 }
517
518 pub async fn load_table(
520 &self,
521 java_catalog_props: &HashMap<String, String>,
522 ) -> ConnectorResult<Table> {
523 let catalog = self
524 .create_catalog(java_catalog_props)
525 .await
526 .context("Unable to load iceberg catalog")?;
527
528 let table_id = self
529 .full_table_name()
530 .context("Unable to parse table name")?;
531
532 catalog.load_table(&table_id).await.map_err(Into::into)
533 }
534}