1#![expect(
18 clippy::disallowed_types,
19 reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33 TableUpdate,
34};
35use itertools::Itertools;
36use jni::objects::{GlobalRef, JObject};
37use risingwave_common::bail;
38use risingwave_common::global_jvm::Jvm;
39use risingwave_jni_core::call_method;
40use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
41use serde::{Deserialize, Serialize};
42use thiserror_ext::AsReport;
43
44use crate::error::ConnectorResult;
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48struct LoadTableResponse {
49 pub metadata_location: Option<String>,
50 pub metadata: TableMetadata,
51 pub _config: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize)]
55#[serde(rename_all = "kebab-case")]
56struct CreateTableRequest {
57 pub name: String,
59 pub location: Option<String>,
61 pub schema: Schema,
63 pub partition_spec: Option<UnboundPartitionSpec>,
65 pub write_order: Option<SortOrder>,
67 pub properties: HashMap<String, String>,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72struct CommitTableRequest {
73 identifier: TableIdent,
74 requirements: Vec<TableRequirement>,
75 updates: Vec<TableUpdate>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79#[serde(rename_all = "kebab-case")]
80struct CommitTableResponse {
81 metadata_location: String,
82 metadata: TableMetadata,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86#[serde(rename_all = "kebab-case")]
87struct ListNamespacesResponse {
88 namespaces: Vec<NamespaceIdent>,
89 next_page_token: Option<String>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94struct ListTablesResponse {
95 identifiers: Vec<TableIdent>,
96 next_page_token: Option<String>,
97}
98
99impl From<&TableCreation> for CreateTableRequest {
100 fn from(value: &TableCreation) -> Self {
101 Self {
102 name: value.name.clone(),
103 location: value.location.clone(),
104 schema: value.schema.clone(),
105 partition_spec: value.partition_spec.clone(),
106 write_order: value.sort_order.clone(),
107 properties: value.properties.clone(),
108 }
109 }
110}
111
112#[derive(Debug)]
113pub struct JniCatalog {
114 java_catalog: GlobalRef,
115 jvm: Jvm,
116 file_io_props: HashMap<String, String>,
117}
118
119#[async_trait]
120impl Catalog for JniCatalog {
121 async fn list_namespaces(
123 &self,
124 _parent: Option<&NamespaceIdent>,
125 ) -> iceberg::Result<Vec<NamespaceIdent>> {
126 execute_with_jni_env(self.jvm, |env| {
127 let result_json =
128 call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
129 .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
130
131 let rust_json_str = jobj_to_str(env, result_json)?;
132
133 let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
134
135 Ok(resp.namespaces)
136 })
137 .map_err(|e| {
138 iceberg::Error::new(
139 iceberg::ErrorKind::Unexpected,
140 "Failed to list iceberg namespaces.",
141 )
142 .with_source(e)
143 })
144 }
145
146 async fn create_namespace(
148 &self,
149 namespace: &iceberg::NamespaceIdent,
150 _properties: HashMap<String, String>,
151 ) -> iceberg::Result<iceberg::Namespace> {
152 execute_with_jni_env(self.jvm, |env| {
153 let namespace_jstr = if namespace.is_empty() {
154 env.new_string("").unwrap()
155 } else {
156 if namespace.len() > 1 {
157 bail!("Namespace with more than one level is not supported!")
158 }
159 env.new_string(&namespace[0]).unwrap()
160 };
161
162 call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
163 &namespace_jstr)
164 .with_context(|| format!("Failed to create namespace: {namespace}"))?;
165
166 Ok(Namespace::new(namespace.clone()))
167 })
168 .map_err(|e| {
169 iceberg::Error::new(
170 iceberg::ErrorKind::Unexpected,
171 "Failed to create namespace.",
172 )
173 .with_source(e)
174 })
175 }
176
177 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
179 todo!()
180 }
181
182 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
184 execute_with_jni_env(self.jvm, |env| {
185 let namespace_jstr = if namespace.is_empty() {
186 env.new_string("").unwrap()
187 } else {
188 if namespace.len() > 1 {
189 bail!("Namespace with more than one level is not supported!")
190 }
191 env.new_string(&namespace[0]).unwrap()
192 };
193
194 let exists =
195 call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
196 &namespace_jstr)
197 .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
198
199 Ok(exists)
200 })
201 .map_err(|e| {
202 iceberg::Error::new(
203 iceberg::ErrorKind::Unexpected,
204 "Failed to check namespace exists.",
205 )
206 .with_source(e)
207 })
208 }
209
210 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
212 todo!()
213 }
214
215 async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
217 execute_with_jni_env(self.jvm, |env| {
218 let namespace_jstr = if namespace.is_empty() {
219 env.new_string("").unwrap()
220 } else {
221 if namespace.len() > 1 {
222 bail!("Namespace with more than one level is not supported!")
223 }
224 env.new_string(&namespace[0]).unwrap()
225 };
226
227 let result_json =
228 call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
229 &namespace_jstr)
230 .with_context(|| {
231 format!("Failed to list iceberg tables in namespace: {}", namespace)
232 })?;
233
234 let rust_json_str = jobj_to_str(env, result_json)?;
235
236 let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
237
238 Ok(resp.identifiers)
239 })
240 .map_err(|e| {
241 iceberg::Error::new(
242 iceberg::ErrorKind::Unexpected,
243 "Failed to list iceberg tables.",
244 )
245 .with_source(e)
246 })
247 }
248
249 async fn update_namespace(
250 &self,
251 _namespace: &NamespaceIdent,
252 _properties: HashMap<String, String>,
253 ) -> iceberg::Result<()> {
254 todo!()
255 }
256
257 async fn create_table(
259 &self,
260 namespace: &NamespaceIdent,
261 creation: TableCreation,
262 ) -> iceberg::Result<Table> {
263 execute_with_jni_env(self.jvm, |env| {
264 let namespace_jstr = if namespace.is_empty() {
265 env.new_string("").unwrap()
266 } else {
267 if namespace.len() > 1 {
268 bail!("Namespace with more than one level is not supported!")
269 }
270 env.new_string(&namespace[0]).unwrap()
271 };
272
273 let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
274
275 let creation_jstr = env.new_string(&creation_str).unwrap();
276
277 let result_json =
278 call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
279 &namespace_jstr, &creation_jstr)
280 .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
281
282 let rust_json_str = jobj_to_str(env, result_json)?;
283
284 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
285
286 let metadata_location = resp.metadata_location.ok_or_else(|| {
287 iceberg::Error::new(
288 iceberg::ErrorKind::FeatureUnsupported,
289 "Loading uncommitted table is not supported!",
290 )
291 })?;
292
293 let table_metadata = resp.metadata;
294
295 let file_io = FileIO::from_path(&metadata_location)?
296 .with_props(self.file_io_props.iter())
297 .build()?;
298
299 Ok(Table::builder()
300 .file_io(file_io)
301 .identifier(TableIdent::new(namespace.clone(), creation.name))
302 .metadata(table_metadata)
303 .build())
304 })
305 .map_err(|e| {
306 iceberg::Error::new(
307 iceberg::ErrorKind::Unexpected,
308 "Failed to create iceberg table.",
309 )
310 .with_source(e)
311 })?
312 }
313
314 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
316 execute_with_jni_env(self.jvm, |env| {
317 let table_name_str = format!(
318 "{}.{}",
319 table.namespace().clone().inner().into_iter().join("."),
320 table.name()
321 );
322
323 let table_name_jstr = env.new_string(&table_name_str).unwrap();
324
325 let result_json =
326 call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
327 &table_name_jstr)
328 .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
329
330 let rust_json_str = jobj_to_str(env, result_json)?;
331
332 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
333
334 let metadata_location = resp.metadata_location.ok_or_else(|| {
335 iceberg::Error::new(
336 iceberg::ErrorKind::FeatureUnsupported,
337 "Loading uncommitted table is not supported!",
338 )
339 })?;
340
341 tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
342
343 let table_metadata = resp.metadata;
344
345 let file_io = FileIO::from_path(&metadata_location)?
346 .with_props(self.file_io_props.iter())
347 .build()?;
348
349 Ok(Table::builder()
350 .file_io(file_io)
351 .identifier(table.clone())
352 .metadata(table_metadata)
353 .build())
354 })
355 .map_err(|e| {
356 iceberg::Error::new(
357 iceberg::ErrorKind::Unexpected,
358 "Failed to load iceberg table.",
359 )
360 .with_source(e)
361 })?
362 }
363
364 async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
366 let jvm = self.jvm;
367 let table = table.to_owned();
368 let java_catalog = self.java_catalog.clone();
369 tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
371 execute_with_jni_env(jvm, |env| {
372 let table_name_str = format!(
373 "{}.{}",
374 table.namespace().clone().inner().into_iter().join("."),
375 table.name()
376 );
377
378 let table_name_jstr = env.new_string(&table_name_str).unwrap();
379
380 call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
381 &table_name_jstr)
382 .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
383
384 Ok(())
385 })
386 .map_err(|e| {
387 iceberg::Error::new(
388 iceberg::ErrorKind::Unexpected,
389 "Failed to drop iceberg table.",
390 )
391 .with_source(e)
392 })
393 })
394 .await
395 .map_err(|e| {
396 iceberg::Error::new(
397 iceberg::ErrorKind::Unexpected,
398 "Failed to drop iceberg table.",
399 )
400 .with_source(e)
401 })?
402 }
403
404 async fn register_table(
405 &self,
406 _table_ident: &TableIdent,
407 _metadata_location: String,
408 ) -> iceberg::Result<Table> {
409 Err(iceberg::Error::new(
410 iceberg::ErrorKind::Unexpected,
411 "register_table is not supported by JniCatalog",
412 ))
413 }
414
415 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
417 execute_with_jni_env(self.jvm, |env| {
418 let table_name_str = format!(
419 "{}.{}",
420 table.namespace().clone().inner().into_iter().join("."),
421 table.name()
422 );
423
424 let table_name_jstr = env.new_string(&table_name_str).unwrap();
425
426 let exists =
427 call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
428 &table_name_jstr)
429 .with_context(|| {
430 format!("Failed to check iceberg table exists: {table_name_str}")
431 })?;
432
433 Ok(exists)
434 })
435 .map_err(|e| {
436 iceberg::Error::new(
437 iceberg::ErrorKind::Unexpected,
438 "Failed to check iceberg table exists.",
439 )
440 .with_source(e)
441 })
442 }
443
444 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
446 todo!()
447 }
448
449 async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
451 execute_with_jni_env(self.jvm, |env| {
452 let requirements = commit.take_requirements();
453 let updates = commit.take_updates();
454 let request = CommitTableRequest {
455 identifier: commit.identifier().clone(),
456 requirements,
457 updates,
458 };
459 let request_str = serde_json::to_string(&request)?;
460
461 let request_jni_str = env.new_string(&request_str).with_context(|| {
462 format!("Failed to create jni string from request json: {request_str}.")
463 })?;
464
465 let result_json =
466 call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
467 &request_jni_str)
468 .with_context(|| {
469 format!("Failed to update iceberg table: {}", commit.identifier())
470 })?;
471
472 let rust_json_str = jobj_to_str(env, result_json)?;
473
474 let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
475
476 tracing::info!(
477 "Table metadata location of {} is {}",
478 commit.identifier(),
479 response.metadata_location
480 );
481
482 let table_metadata = response.metadata;
483
484 let file_io = FileIO::from_path(&response.metadata_location)?
485 .with_props(self.file_io_props.iter())
486 .build()?;
487
488 Ok(Table::builder()
489 .file_io(file_io)
490 .identifier(commit.identifier().clone())
491 .metadata(table_metadata)
492 .build()?)
493 })
494 .map_err(|e| {
495 iceberg::Error::new(
496 iceberg::ErrorKind::Unexpected,
497 "Failed to update iceberg table.",
498 )
499 .with_source(e)
500 })
501 }
502}
503
504impl Drop for JniCatalog {
505 fn drop(&mut self) {
506 let _ = execute_with_jni_env(self.jvm, |env| {
507 call_method!(env, self.java_catalog.as_obj(), {void close()})
508 .with_context(|| "Failed to close iceberg catalog".to_owned())?;
509 Ok(())
510 })
511 .inspect_err(
512 |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
513 );
514 }
515}
516
517impl JniCatalog {
518 fn build(
519 file_io_props: HashMap<String, String>,
520 name: impl ToString,
521 catalog_impl: impl ToString,
522 java_catalog_props: HashMap<String, String>,
523 ) -> ConnectorResult<Self> {
524 let jvm = Jvm::get_or_init()?;
525
526 execute_with_jni_env(jvm, |env| {
527 let props = env.new_object_array(
529 (java_catalog_props.len() * 2) as i32,
530 "java/lang/String",
531 JObject::null(),
532 )?;
533 for (i, (key, value)) in java_catalog_props.iter().enumerate() {
534 let key_j_str = env.new_string(key)?;
535 let value_j_str = env.new_string(value)?;
536 env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
537 env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
538 }
539
540 let jni_catalog_wrapper = env
541 .call_static_method(
542 "com/risingwave/connector/catalog/JniCatalogWrapper",
543 "create",
544 "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
545 &[
546 (&env.new_string(name.to_string()).unwrap()).into(),
547 (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
548 (&props).into(),
549 ],
550 )?;
551
552 let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
553
554 Ok(Self {
555 java_catalog: jni_catalog,
556 jvm,
557 file_io_props,
558 })
559 })
560 .map_err(Into::into)
561 }
562
563 pub fn build_catalog(
564 file_io_props: HashMap<String, String>,
565 name: impl ToString,
566 catalog_impl: impl ToString,
567 java_catalog_props: HashMap<String, String>,
568 ) -> ConnectorResult<Arc<dyn Catalog>> {
569 let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
570 Ok(Arc::new(catalog) as Arc<dyn Catalog>)
571 }
572}