1#![expect(
18 clippy::disallowed_types,
19 reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33 TableUpdate,
34};
35use itertools::Itertools;
36use jni::objects::{GlobalRef, JObject};
37use risingwave_common::global_jvm::Jvm;
38use risingwave_jni_core::call_method;
39use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
40use serde::{Deserialize, Serialize};
41use thiserror_ext::AsReport;
42
43use crate::error::ConnectorResult;
44
45#[derive(Debug, Deserialize)]
46#[serde(rename_all = "kebab-case")]
47struct LoadTableResponse {
48 pub metadata_location: Option<String>,
49 pub metadata: TableMetadata,
50 pub _config: Option<HashMap<String, String>>,
51}
52
53#[derive(Debug, Serialize)]
54#[serde(rename_all = "kebab-case")]
55struct CreateTableRequest {
56 pub name: String,
58 pub location: Option<String>,
60 pub schema: Schema,
62 pub partition_spec: Option<UnboundPartitionSpec>,
64 pub write_order: Option<SortOrder>,
66 pub properties: HashMap<String, String>,
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71struct CommitTableRequest {
72 identifier: TableIdent,
73 requirements: Vec<TableRequirement>,
74 updates: Vec<TableUpdate>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78#[serde(rename_all = "kebab-case")]
79struct CommitTableResponse {
80 metadata_location: String,
81 metadata: TableMetadata,
82}
83
84#[derive(Debug, Serialize, Deserialize)]
85#[serde(rename_all = "kebab-case")]
86struct ListNamespacesResponse {
87 namespaces: Vec<NamespaceIdent>,
88 next_page_token: Option<String>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
92#[serde(rename_all = "kebab-case")]
93struct ListTablesResponse {
94 identifiers: Vec<TableIdent>,
95 next_page_token: Option<String>,
96}
97
98impl From<&TableCreation> for CreateTableRequest {
99 fn from(value: &TableCreation) -> Self {
100 Self {
101 name: value.name.clone(),
102 location: value.location.clone(),
103 schema: value.schema.clone(),
104 partition_spec: value.partition_spec.clone(),
105 write_order: value.sort_order.clone(),
106 properties: value.properties.clone(),
107 }
108 }
109}
110
111fn namespace_to_string(namespace: &NamespaceIdent) -> String {
112 namespace.iter().join(".")
113}
114
115#[derive(Debug)]
116pub struct JniCatalog {
117 java_catalog: GlobalRef,
118 jvm: Jvm,
119 file_io_props: HashMap<String, String>,
120}
121
122#[async_trait]
123impl Catalog for JniCatalog {
124 async fn list_namespaces(
126 &self,
127 _parent: Option<&NamespaceIdent>,
128 ) -> iceberg::Result<Vec<NamespaceIdent>> {
129 execute_with_jni_env(self.jvm, |env| {
130 let result_json =
131 call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
132 .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
133
134 let rust_json_str = jobj_to_str(env, result_json)?;
135
136 let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
137
138 Ok(resp.namespaces)
139 })
140 .map_err(|e| {
141 iceberg::Error::new(
142 iceberg::ErrorKind::Unexpected,
143 "Failed to list iceberg namespaces.",
144 )
145 .with_source(e)
146 })
147 }
148
149 async fn create_namespace(
151 &self,
152 namespace: &iceberg::NamespaceIdent,
153 _properties: HashMap<String, String>,
154 ) -> iceberg::Result<iceberg::Namespace> {
155 execute_with_jni_env(self.jvm, |env| {
156 let namespace_str = namespace_to_string(namespace);
157 let namespace_jstr = env.new_string(&namespace_str).unwrap();
158
159 call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
160 &namespace_jstr)
161 .with_context(|| format!("Failed to create namespace: {namespace}"))?;
162
163 Ok(Namespace::new(namespace.clone()))
164 })
165 .map_err(|e| {
166 iceberg::Error::new(
167 iceberg::ErrorKind::Unexpected,
168 "Failed to create namespace.",
169 )
170 .with_source(e)
171 })
172 }
173
174 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
176 todo!()
177 }
178
179 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
181 execute_with_jni_env(self.jvm, |env| {
182 let namespace_str = namespace_to_string(namespace);
183 let namespace_jstr = env.new_string(&namespace_str).unwrap();
184
185 let exists =
186 call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
187 &namespace_jstr)
188 .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
189
190 Ok(exists)
191 })
192 .map_err(|e| {
193 iceberg::Error::new(
194 iceberg::ErrorKind::Unexpected,
195 "Failed to check namespace exists.",
196 )
197 .with_source(e)
198 })
199 }
200
201 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
203 todo!()
204 }
205
206 async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
208 execute_with_jni_env(self.jvm, |env| {
209 let namespace_str = namespace_to_string(namespace);
210 let namespace_jstr = env.new_string(&namespace_str).unwrap();
211
212 let result_json =
213 call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
214 &namespace_jstr)
215 .with_context(|| {
216 format!("Failed to list iceberg tables in namespace: {}", namespace)
217 })?;
218
219 let rust_json_str = jobj_to_str(env, result_json)?;
220
221 let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
222
223 Ok(resp.identifiers)
224 })
225 .map_err(|e| {
226 iceberg::Error::new(
227 iceberg::ErrorKind::Unexpected,
228 "Failed to list iceberg tables.",
229 )
230 .with_source(e)
231 })
232 }
233
234 async fn update_namespace(
235 &self,
236 _namespace: &NamespaceIdent,
237 _properties: HashMap<String, String>,
238 ) -> iceberg::Result<()> {
239 todo!()
240 }
241
242 async fn create_table(
244 &self,
245 namespace: &NamespaceIdent,
246 creation: TableCreation,
247 ) -> iceberg::Result<Table> {
248 execute_with_jni_env(self.jvm, |env| {
249 let namespace_str = namespace_to_string(namespace);
250 let namespace_jstr = env.new_string(&namespace_str).unwrap();
251
252 let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
253
254 let creation_jstr = env.new_string(&creation_str).unwrap();
255
256 let result_json =
257 call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
258 &namespace_jstr, &creation_jstr)
259 .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
260
261 let rust_json_str = jobj_to_str(env, result_json)?;
262
263 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
264
265 let metadata_location = resp.metadata_location.ok_or_else(|| {
266 iceberg::Error::new(
267 iceberg::ErrorKind::FeatureUnsupported,
268 "Loading uncommitted table is not supported!",
269 )
270 })?;
271
272 let table_metadata = resp.metadata;
273
274 let file_io = FileIO::from_path(&metadata_location)?
275 .with_props(self.file_io_props.iter())
276 .build()?;
277
278 Ok(Table::builder()
279 .file_io(file_io)
280 .identifier(TableIdent::new(namespace.clone(), creation.name))
281 .metadata(table_metadata)
282 .build())
283 })
284 .map_err(|e| {
285 iceberg::Error::new(
286 iceberg::ErrorKind::Unexpected,
287 "Failed to create iceberg table.",
288 )
289 .with_source(e)
290 })?
291 }
292
293 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
295 execute_with_jni_env(self.jvm, |env| {
296 let table_name_str = table.to_string();
297
298 let table_name_jstr = env.new_string(&table_name_str).unwrap();
299
300 let result_json =
301 call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
302 &table_name_jstr)
303 .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
304
305 let rust_json_str = jobj_to_str(env, result_json)?;
306
307 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
308
309 let metadata_location = resp.metadata_location.ok_or_else(|| {
310 iceberg::Error::new(
311 iceberg::ErrorKind::FeatureUnsupported,
312 "Loading uncommitted table is not supported!",
313 )
314 })?;
315
316 tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
317
318 let table_metadata = resp.metadata;
319
320 let file_io = FileIO::from_path(&metadata_location)?
321 .with_props(self.file_io_props.iter())
322 .build()?;
323
324 Ok(Table::builder()
325 .file_io(file_io)
326 .identifier(table.clone())
327 .metadata(table_metadata)
328 .build())
329 })
330 .map_err(|e| {
331 iceberg::Error::new(
332 iceberg::ErrorKind::Unexpected,
333 "Failed to load iceberg table.",
334 )
335 .with_source(e)
336 })?
337 }
338
339 async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
341 let jvm = self.jvm;
342 let table = table.to_owned();
343 let java_catalog = self.java_catalog.clone();
344 tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
346 execute_with_jni_env(jvm, |env| {
347 let table_name_str = table.to_string();
348
349 let table_name_jstr = env.new_string(&table_name_str).unwrap();
350
351 call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
352 &table_name_jstr)
353 .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
354
355 Ok(())
356 })
357 .map_err(|e| {
358 iceberg::Error::new(
359 iceberg::ErrorKind::Unexpected,
360 "Failed to drop iceberg table.",
361 )
362 .with_source(e)
363 })
364 })
365 .await
366 .map_err(|e| {
367 iceberg::Error::new(
368 iceberg::ErrorKind::Unexpected,
369 "Failed to drop iceberg table.",
370 )
371 .with_source(e)
372 })?
373 }
374
375 async fn register_table(
376 &self,
377 _table_ident: &TableIdent,
378 _metadata_location: String,
379 ) -> iceberg::Result<Table> {
380 Err(iceberg::Error::new(
381 iceberg::ErrorKind::Unexpected,
382 "register_table is not supported by JniCatalog",
383 ))
384 }
385
386 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
388 execute_with_jni_env(self.jvm, |env| {
389 let table_name_str = table.to_string();
390
391 let table_name_jstr = env.new_string(&table_name_str).unwrap();
392
393 let exists =
394 call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
395 &table_name_jstr)
396 .with_context(|| {
397 format!("Failed to check iceberg table exists: {table_name_str}")
398 })?;
399
400 Ok(exists)
401 })
402 .map_err(|e| {
403 iceberg::Error::new(
404 iceberg::ErrorKind::Unexpected,
405 "Failed to check iceberg table exists.",
406 )
407 .with_source(e)
408 })
409 }
410
411 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
413 todo!()
414 }
415
416 async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
418 execute_with_jni_env(self.jvm, |env| {
419 let requirements = commit.take_requirements();
420 let updates = commit.take_updates();
421 let request = CommitTableRequest {
422 identifier: commit.identifier().clone(),
423 requirements,
424 updates,
425 };
426 let request_str = serde_json::to_string(&request)?;
427
428 let request_jni_str = env.new_string(&request_str).with_context(|| {
429 format!("Failed to create jni string from request json: {request_str}.")
430 })?;
431
432 let result_json =
433 call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
434 &request_jni_str)
435 .with_context(|| {
436 format!("Failed to update iceberg table: {}", commit.identifier())
437 })?;
438
439 let rust_json_str = jobj_to_str(env, result_json)?;
440
441 let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
442
443 tracing::info!(
444 "Table metadata location of {} is {}",
445 commit.identifier(),
446 response.metadata_location
447 );
448
449 let table_metadata = response.metadata;
450
451 let file_io = FileIO::from_path(&response.metadata_location)?
452 .with_props(self.file_io_props.iter())
453 .build()?;
454
455 Ok(Table::builder()
456 .file_io(file_io)
457 .identifier(commit.identifier().clone())
458 .metadata(table_metadata)
459 .build()?)
460 })
461 .map_err(|e| {
462 iceberg::Error::new(
463 iceberg::ErrorKind::Unexpected,
464 "Failed to update iceberg table.",
465 )
466 .with_source(e)
467 })
468 }
469}
470
471impl Drop for JniCatalog {
472 fn drop(&mut self) {
473 let _ = execute_with_jni_env(self.jvm, |env| {
474 call_method!(env, self.java_catalog.as_obj(), {void close()})
475 .with_context(|| "Failed to close iceberg catalog".to_owned())?;
476 Ok(())
477 })
478 .inspect_err(
479 |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
480 );
481 }
482}
483
484impl JniCatalog {
485 fn build(
486 file_io_props: HashMap<String, String>,
487 name: impl ToString,
488 catalog_impl: impl ToString,
489 java_catalog_props: HashMap<String, String>,
490 ) -> ConnectorResult<Self> {
491 let jvm = Jvm::get_or_init()?;
492
493 execute_with_jni_env(jvm, |env| {
494 let props = env.new_object_array(
496 (java_catalog_props.len() * 2) as i32,
497 "java/lang/String",
498 JObject::null(),
499 )?;
500 for (i, (key, value)) in java_catalog_props.iter().enumerate() {
501 let key_j_str = env.new_string(key)?;
502 let value_j_str = env.new_string(value)?;
503 env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
504 env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
505 }
506
507 let jni_catalog_wrapper = env
508 .call_static_method(
509 "com/risingwave/connector/catalog/JniCatalogWrapper",
510 "create",
511 "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
512 &[
513 (&env.new_string(name.to_string()).unwrap()).into(),
514 (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
515 (&props).into(),
516 ],
517 )?;
518
519 let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
520
521 Ok(Self {
522 java_catalog: jni_catalog,
523 jvm,
524 file_io_props,
525 })
526 })
527 .map_err(Into::into)
528 }
529
530 pub fn build_catalog(
531 file_io_props: HashMap<String, String>,
532 name: impl ToString,
533 catalog_impl: impl ToString,
534 java_catalog_props: HashMap<String, String>,
535 ) -> ConnectorResult<Arc<dyn Catalog>> {
536 let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
537 Ok(Arc::new(catalog) as Arc<dyn Catalog>)
538 }
539}