1#![expect(
18 clippy::disallowed_types,
19 reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33 TableUpdate,
34};
35use itertools::Itertools;
36use jni::JavaVM;
37use jni::objects::{GlobalRef, JObject};
38use risingwave_common::bail;
39use risingwave_common::global_jvm::JVM;
40use risingwave_jni_core::call_method;
41use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
42use serde::{Deserialize, Serialize};
43use thiserror_ext::AsReport;
44
45use crate::error::ConnectorResult;
46
47#[derive(Debug, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49struct LoadTableResponse {
50 pub metadata_location: Option<String>,
51 pub metadata: TableMetadata,
52 pub _config: Option<HashMap<String, String>>,
53}
54
55#[derive(Debug, Serialize)]
56#[serde(rename_all = "kebab-case")]
57struct CreateTableRequest {
58 pub name: String,
60 pub location: Option<String>,
62 pub schema: Schema,
64 pub partition_spec: Option<UnboundPartitionSpec>,
66 pub write_order: Option<SortOrder>,
68 pub properties: HashMap<String, String>,
70}
71
72#[derive(Debug, Serialize, Deserialize)]
73struct CommitTableRequest {
74 identifier: TableIdent,
75 requirements: Vec<TableRequirement>,
76 updates: Vec<TableUpdate>,
77}
78
79#[derive(Debug, Serialize, Deserialize)]
80#[serde(rename_all = "kebab-case")]
81struct CommitTableResponse {
82 metadata_location: String,
83 metadata: TableMetadata,
84}
85
86#[derive(Debug, Serialize, Deserialize)]
87#[serde(rename_all = "kebab-case")]
88struct ListNamespacesResponse {
89 namespaces: Vec<NamespaceIdent>,
90 next_page_token: Option<String>,
91}
92
93#[derive(Debug, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95struct ListTablesResponse {
96 identifiers: Vec<TableIdent>,
97 next_page_token: Option<String>,
98}
99
100impl From<&TableCreation> for CreateTableRequest {
101 fn from(value: &TableCreation) -> Self {
102 Self {
103 name: value.name.clone(),
104 location: value.location.clone(),
105 schema: value.schema.clone(),
106 partition_spec: value.partition_spec.clone(),
107 write_order: value.sort_order.clone(),
108 properties: value.properties.clone(),
109 }
110 }
111}
112
113#[derive(Debug)]
114pub struct JniCatalog {
115 java_catalog: GlobalRef,
116 jvm: &'static JavaVM,
117 file_io_props: HashMap<String, String>,
118}
119
120#[async_trait]
121impl Catalog for JniCatalog {
122 async fn list_namespaces(
124 &self,
125 _parent: Option<&NamespaceIdent>,
126 ) -> iceberg::Result<Vec<NamespaceIdent>> {
127 execute_with_jni_env(self.jvm, |env| {
128 let result_json =
129 call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
130 .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
131
132 let rust_json_str = jobj_to_str(env, result_json)?;
133
134 let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
135
136 Ok(resp.namespaces)
137 })
138 .map_err(|e| {
139 iceberg::Error::new(
140 iceberg::ErrorKind::Unexpected,
141 "Failed to list iceberg namespaces.",
142 )
143 .with_source(e)
144 })
145 }
146
147 async fn create_namespace(
149 &self,
150 namespace: &iceberg::NamespaceIdent,
151 _properties: HashMap<String, String>,
152 ) -> iceberg::Result<iceberg::Namespace> {
153 execute_with_jni_env(self.jvm, |env| {
154 let namespace_jstr = if namespace.is_empty() {
155 env.new_string("").unwrap()
156 } else {
157 if namespace.len() > 1 {
158 bail!("Namespace with more than one level is not supported!")
159 }
160 env.new_string(&namespace[0]).unwrap()
161 };
162
163 call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
164 &namespace_jstr)
165 .with_context(|| format!("Failed to create namespace: {namespace}"))?;
166
167 Ok(Namespace::new(namespace.clone()))
168 })
169 .map_err(|e| {
170 iceberg::Error::new(
171 iceberg::ErrorKind::Unexpected,
172 "Failed to create namespace.",
173 )
174 .with_source(e)
175 })
176 }
177
178 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
180 todo!()
181 }
182
183 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
185 execute_with_jni_env(self.jvm, |env| {
186 let namespace_jstr = if namespace.is_empty() {
187 env.new_string("").unwrap()
188 } else {
189 if namespace.len() > 1 {
190 bail!("Namespace with more than one level is not supported!")
191 }
192 env.new_string(&namespace[0]).unwrap()
193 };
194
195 let exists =
196 call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
197 &namespace_jstr)
198 .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
199
200 Ok(exists)
201 })
202 .map_err(|e| {
203 iceberg::Error::new(
204 iceberg::ErrorKind::Unexpected,
205 "Failed to check namespace exists.",
206 )
207 .with_source(e)
208 })
209 }
210
211 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
213 todo!()
214 }
215
216 async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
218 execute_with_jni_env(self.jvm, |env| {
219 let namespace_jstr = if namespace.is_empty() {
220 env.new_string("").unwrap()
221 } else {
222 if namespace.len() > 1 {
223 bail!("Namespace with more than one level is not supported!")
224 }
225 env.new_string(&namespace[0]).unwrap()
226 };
227
228 let result_json =
229 call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
230 &namespace_jstr)
231 .with_context(|| {
232 format!("Failed to list iceberg tables in namespace: {}", namespace)
233 })?;
234
235 let rust_json_str = jobj_to_str(env, result_json)?;
236
237 let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
238
239 Ok(resp.identifiers)
240 })
241 .map_err(|e| {
242 iceberg::Error::new(
243 iceberg::ErrorKind::Unexpected,
244 "Failed to list iceberg tables.",
245 )
246 .with_source(e)
247 })
248 }
249
250 async fn update_namespace(
251 &self,
252 _namespace: &NamespaceIdent,
253 _properties: HashMap<String, String>,
254 ) -> iceberg::Result<()> {
255 todo!()
256 }
257
258 async fn create_table(
260 &self,
261 namespace: &NamespaceIdent,
262 creation: TableCreation,
263 ) -> iceberg::Result<Table> {
264 execute_with_jni_env(self.jvm, |env| {
265 let namespace_jstr = if namespace.is_empty() {
266 env.new_string("").unwrap()
267 } else {
268 if namespace.len() > 1 {
269 bail!("Namespace with more than one level is not supported!")
270 }
271 env.new_string(&namespace[0]).unwrap()
272 };
273
274 let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
275
276 let creation_jstr = env.new_string(&creation_str).unwrap();
277
278 let result_json =
279 call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
280 &namespace_jstr, &creation_jstr)
281 .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
282
283 let rust_json_str = jobj_to_str(env, result_json)?;
284
285 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
286
287 let metadata_location = resp.metadata_location.ok_or_else(|| {
288 iceberg::Error::new(
289 iceberg::ErrorKind::FeatureUnsupported,
290 "Loading uncommitted table is not supported!",
291 )
292 })?;
293
294 let table_metadata = resp.metadata;
295
296 let file_io = FileIO::from_path(&metadata_location)?
297 .with_props(self.file_io_props.iter())
298 .build()?;
299
300 Ok(Table::builder()
301 .file_io(file_io)
302 .identifier(TableIdent::new(namespace.clone(), creation.name))
303 .metadata(table_metadata)
304 .build())
305 })
306 .map_err(|e| {
307 iceberg::Error::new(
308 iceberg::ErrorKind::Unexpected,
309 "Failed to create iceberg table.",
310 )
311 .with_source(e)
312 })?
313 }
314
315 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
317 execute_with_jni_env(self.jvm, |env| {
318 let table_name_str = format!(
319 "{}.{}",
320 table.namespace().clone().inner().into_iter().join("."),
321 table.name()
322 );
323
324 let table_name_jstr = env.new_string(&table_name_str).unwrap();
325
326 let result_json =
327 call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
328 &table_name_jstr)
329 .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
330
331 let rust_json_str = jobj_to_str(env, result_json)?;
332
333 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
334
335 let metadata_location = resp.metadata_location.ok_or_else(|| {
336 iceberg::Error::new(
337 iceberg::ErrorKind::FeatureUnsupported,
338 "Loading uncommitted table is not supported!",
339 )
340 })?;
341
342 tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
343
344 let table_metadata = resp.metadata;
345
346 let file_io = FileIO::from_path(&metadata_location)?
347 .with_props(self.file_io_props.iter())
348 .build()?;
349
350 Ok(Table::builder()
351 .file_io(file_io)
352 .identifier(table.clone())
353 .metadata(table_metadata)
354 .build())
355 })
356 .map_err(|e| {
357 iceberg::Error::new(
358 iceberg::ErrorKind::Unexpected,
359 "Failed to load iceberg table.",
360 )
361 .with_source(e)
362 })?
363 }
364
365 async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
367 let jvm = self.jvm;
368 let table = table.to_owned();
369 let java_catalog = self.java_catalog.clone();
370 tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
372 execute_with_jni_env(jvm, |env| {
373 let table_name_str = format!(
374 "{}.{}",
375 table.namespace().clone().inner().into_iter().join("."),
376 table.name()
377 );
378
379 let table_name_jstr = env.new_string(&table_name_str).unwrap();
380
381 call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
382 &table_name_jstr)
383 .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
384
385 Ok(())
386 })
387 .map_err(|e| {
388 iceberg::Error::new(
389 iceberg::ErrorKind::Unexpected,
390 "Failed to drop iceberg table.",
391 )
392 .with_source(e)
393 })
394 })
395 .await
396 .map_err(|e| {
397 iceberg::Error::new(
398 iceberg::ErrorKind::Unexpected,
399 "Failed to drop iceberg table.",
400 )
401 .with_source(e)
402 })?
403 }
404
405 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
407 execute_with_jni_env(self.jvm, |env| {
408 let table_name_str = format!(
409 "{}.{}",
410 table.namespace().clone().inner().into_iter().join("."),
411 table.name()
412 );
413
414 let table_name_jstr = env.new_string(&table_name_str).unwrap();
415
416 let exists =
417 call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
418 &table_name_jstr)
419 .with_context(|| {
420 format!("Failed to check iceberg table exists: {table_name_str}")
421 })?;
422
423 Ok(exists)
424 })
425 .map_err(|e| {
426 iceberg::Error::new(
427 iceberg::ErrorKind::Unexpected,
428 "Failed to check iceberg table exists.",
429 )
430 .with_source(e)
431 })
432 }
433
434 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
436 todo!()
437 }
438
439 async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
441 execute_with_jni_env(self.jvm, |env| {
442 let requirements = commit.take_requirements();
443 let updates = commit.take_updates();
444 let request = CommitTableRequest {
445 identifier: commit.identifier().clone(),
446 requirements,
447 updates,
448 };
449 let request_str = serde_json::to_string(&request)?;
450
451 let request_jni_str = env.new_string(&request_str).with_context(|| {
452 format!("Failed to create jni string from request json: {request_str}.")
453 })?;
454
455 let result_json =
456 call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
457 &request_jni_str)
458 .with_context(|| {
459 format!("Failed to update iceberg table: {}", commit.identifier())
460 })?;
461
462 let rust_json_str = jobj_to_str(env, result_json)?;
463
464 let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
465
466 tracing::info!(
467 "Table metadata location of {} is {}",
468 commit.identifier(),
469 response.metadata_location
470 );
471
472 let table_metadata = response.metadata;
473
474 let file_io = FileIO::from_path(&response.metadata_location)?
475 .with_props(self.file_io_props.iter())
476 .build()?;
477
478 Ok(Table::builder()
479 .file_io(file_io)
480 .identifier(commit.identifier().clone())
481 .metadata(table_metadata)
482 .build()?)
483 })
484 .map_err(|e| {
485 iceberg::Error::new(
486 iceberg::ErrorKind::Unexpected,
487 "Failed to update iceberg table.",
488 )
489 .with_source(e)
490 })
491 }
492}
493
494impl Drop for JniCatalog {
495 fn drop(&mut self) {
496 let _ = execute_with_jni_env(self.jvm, |env| {
497 call_method!(env, self.java_catalog.as_obj(), {void close()})
498 .with_context(|| "Failed to close iceberg catalog".to_owned())?;
499 Ok(())
500 })
501 .inspect_err(
502 |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
503 );
504 }
505}
506
507impl JniCatalog {
508 fn build(
509 file_io_props: HashMap<String, String>,
510 name: impl ToString,
511 catalog_impl: impl ToString,
512 java_catalog_props: HashMap<String, String>,
513 ) -> ConnectorResult<Self> {
514 let jvm = JVM.get_or_init()?;
515
516 execute_with_jni_env(jvm, |env| {
517 let props = env.new_object_array(
519 (java_catalog_props.len() * 2) as i32,
520 "java/lang/String",
521 JObject::null(),
522 )?;
523 for (i, (key, value)) in java_catalog_props.iter().enumerate() {
524 let key_j_str = env.new_string(key)?;
525 let value_j_str = env.new_string(value)?;
526 env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
527 env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
528 }
529
530 let jni_catalog_wrapper = env
531 .call_static_method(
532 "com/risingwave/connector/catalog/JniCatalogWrapper",
533 "create",
534 "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
535 &[
536 (&env.new_string(name.to_string()).unwrap()).into(),
537 (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
538 (&props).into(),
539 ],
540 )?;
541
542 let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
543
544 Ok(Self {
545 java_catalog: jni_catalog,
546 jvm,
547 file_io_props,
548 })
549 })
550 .map_err(Into::into)
551 }
552
553 pub fn build_catalog(
554 file_io_props: HashMap<String, String>,
555 name: impl ToString,
556 catalog_impl: impl ToString,
557 java_catalog_props: HashMap<String, String>,
558 ) -> ConnectorResult<Arc<dyn Catalog>> {
559 let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
560 Ok(Arc::new(catalog) as Arc<dyn Catalog>)
561 }
562}