1#![expect(
18 clippy::disallowed_types,
19 reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33 TableUpdate,
34};
35use itertools::Itertools;
36use jni::JavaVM;
37use jni::objects::{GlobalRef, JObject};
38use risingwave_common::bail;
39use risingwave_jni_core::call_method;
40use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env, jobj_to_str};
41use serde::{Deserialize, Serialize};
42use thiserror_ext::AsReport;
43
44use crate::error::ConnectorResult;
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48struct LoadTableResponse {
49 pub metadata_location: Option<String>,
50 pub metadata: TableMetadata,
51 pub _config: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize)]
55#[serde(rename_all = "kebab-case")]
56struct CreateTableRequest {
57 pub name: String,
59 pub location: Option<String>,
61 pub schema: Schema,
63 pub partition_spec: Option<UnboundPartitionSpec>,
65 pub write_order: Option<SortOrder>,
67 pub properties: HashMap<String, String>,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72struct CommitTableRequest {
73 identifier: TableIdent,
74 requirements: Vec<TableRequirement>,
75 updates: Vec<TableUpdate>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79#[serde(rename_all = "kebab-case")]
80struct CommitTableResponse {
81 metadata_location: String,
82 metadata: TableMetadata,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86#[serde(rename_all = "kebab-case")]
87struct ListNamespacesResponse {
88 namespaces: Vec<NamespaceIdent>,
89 next_page_token: Option<String>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94struct ListTablesResponse {
95 identifiers: Vec<TableIdent>,
96 next_page_token: Option<String>,
97}
98
99impl From<&TableCreation> for CreateTableRequest {
100 fn from(value: &TableCreation) -> Self {
101 Self {
102 name: value.name.clone(),
103 location: value.location.clone(),
104 schema: value.schema.clone(),
105 partition_spec: value.partition_spec.clone(),
106 write_order: value.sort_order.clone(),
107 properties: value.properties.clone(),
108 }
109 }
110}
111
112#[derive(Debug)]
113pub struct JniCatalog {
114 java_catalog: GlobalRef,
115 jvm: &'static JavaVM,
116 file_io_props: HashMap<String, String>,
117}
118
119#[async_trait]
120impl Catalog for JniCatalog {
121 async fn list_namespaces(
123 &self,
124 _parent: Option<&NamespaceIdent>,
125 ) -> iceberg::Result<Vec<NamespaceIdent>> {
126 execute_with_jni_env(self.jvm, |env| {
127 let result_json =
128 call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
129 .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
130
131 let rust_json_str = jobj_to_str(env, result_json)?;
132
133 let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
134
135 Ok(resp.namespaces)
136 })
137 .map_err(|e| {
138 iceberg::Error::new(
139 iceberg::ErrorKind::Unexpected,
140 "Failed to list iceberg namespaces.",
141 )
142 .with_source(e)
143 })
144 }
145
146 async fn create_namespace(
148 &self,
149 namespace: &iceberg::NamespaceIdent,
150 _properties: HashMap<String, String>,
151 ) -> iceberg::Result<iceberg::Namespace> {
152 execute_with_jni_env(self.jvm, |env| {
153 let namespace_jstr = if namespace.is_empty() {
154 env.new_string("").unwrap()
155 } else {
156 if namespace.len() > 1 {
157 bail!("Namespace with more than one level is not supported!")
158 }
159 env.new_string(&namespace[0]).unwrap()
160 };
161
162 call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
163 &namespace_jstr)
164 .with_context(|| format!("Failed to create namespace: {namespace}"))?;
165
166 Ok(Namespace::new(namespace.clone()))
167 })
168 .map_err(|e| {
169 iceberg::Error::new(
170 iceberg::ErrorKind::Unexpected,
171 "Failed to create namespace.",
172 )
173 .with_source(e)
174 })
175 }
176
177 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
179 todo!()
180 }
181
182 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
184 execute_with_jni_env(self.jvm, |env| {
185 let namespace_jstr = if namespace.is_empty() {
186 env.new_string("").unwrap()
187 } else {
188 if namespace.len() > 1 {
189 bail!("Namespace with more than one level is not supported!")
190 }
191 env.new_string(&namespace[0]).unwrap()
192 };
193
194 let exists =
195 call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
196 &namespace_jstr)
197 .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
198
199 Ok(exists)
200 })
201 .map_err(|e| {
202 iceberg::Error::new(
203 iceberg::ErrorKind::Unexpected,
204 "Failed to check namespace exists.",
205 )
206 .with_source(e)
207 })
208 }
209
210 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
212 todo!()
213 }
214
215 async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
217 execute_with_jni_env(self.jvm, |env| {
218 let namespace_jstr = if namespace.is_empty() {
219 env.new_string("").unwrap()
220 } else {
221 if namespace.len() > 1 {
222 bail!("Namespace with more than one level is not supported!")
223 }
224 env.new_string(&namespace[0]).unwrap()
225 };
226
227 let result_json =
228 call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
229 &namespace_jstr)
230 .with_context(|| {
231 format!("Failed to list iceberg tables in namespace: {}", namespace)
232 })?;
233
234 let rust_json_str = jobj_to_str(env, result_json)?;
235
236 let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
237
238 Ok(resp.identifiers)
239 })
240 .map_err(|e| {
241 iceberg::Error::new(
242 iceberg::ErrorKind::Unexpected,
243 "Failed to list iceberg tables.",
244 )
245 .with_source(e)
246 })
247 }
248
249 async fn update_namespace(
250 &self,
251 _namespace: &NamespaceIdent,
252 _properties: HashMap<String, String>,
253 ) -> iceberg::Result<()> {
254 todo!()
255 }
256
257 async fn create_table(
259 &self,
260 namespace: &NamespaceIdent,
261 creation: TableCreation,
262 ) -> iceberg::Result<Table> {
263 execute_with_jni_env(self.jvm, |env| {
264 let namespace_jstr = if namespace.is_empty() {
265 env.new_string("").unwrap()
266 } else {
267 if namespace.len() > 1 {
268 bail!("Namespace with more than one level is not supported!")
269 }
270 env.new_string(&namespace[0]).unwrap()
271 };
272
273 let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
274
275 let creation_jstr = env.new_string(&creation_str).unwrap();
276
277 let result_json =
278 call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
279 &namespace_jstr, &creation_jstr)
280 .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
281
282 let rust_json_str = jobj_to_str(env, result_json)?;
283
284 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
285
286 let metadata_location = resp.metadata_location.ok_or_else(|| {
287 iceberg::Error::new(
288 iceberg::ErrorKind::FeatureUnsupported,
289 "Loading uncommitted table is not supported!",
290 )
291 })?;
292
293 let table_metadata = resp.metadata;
294
295 let file_io = FileIO::from_path(&metadata_location)?
296 .with_props(self.file_io_props.iter())
297 .build()?;
298
299 Ok(Table::builder()
300 .file_io(file_io)
301 .identifier(TableIdent::new(namespace.clone(), creation.name))
302 .metadata(table_metadata)
303 .build())
304 })
305 .map_err(|e| {
306 iceberg::Error::new(
307 iceberg::ErrorKind::Unexpected,
308 "Failed to create iceberg table.",
309 )
310 .with_source(e)
311 })?
312 }
313
314 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
316 execute_with_jni_env(self.jvm, |env| {
317 let table_name_str = format!(
318 "{}.{}",
319 table.namespace().clone().inner().into_iter().join("."),
320 table.name()
321 );
322
323 let table_name_jstr = env.new_string(&table_name_str).unwrap();
324
325 let result_json =
326 call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
327 &table_name_jstr)
328 .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
329
330 let rust_json_str = jobj_to_str(env, result_json)?;
331
332 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
333
334 let metadata_location = resp.metadata_location.ok_or_else(|| {
335 iceberg::Error::new(
336 iceberg::ErrorKind::FeatureUnsupported,
337 "Loading uncommitted table is not supported!",
338 )
339 })?;
340
341 tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
342
343 let table_metadata = resp.metadata;
344
345 let file_io = FileIO::from_path(&metadata_location)?
346 .with_props(self.file_io_props.iter())
347 .build()?;
348
349 Ok(Table::builder()
350 .file_io(file_io)
351 .identifier(table.clone())
352 .metadata(table_metadata)
353 .build())
354 })
355 .map_err(|e| {
356 iceberg::Error::new(
357 iceberg::ErrorKind::Unexpected,
358 "Failed to load iceberg table.",
359 )
360 .with_source(e)
361 })?
362 }
363
364 async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
366 execute_with_jni_env(self.jvm, |env| {
367 let table_name_str = format!(
368 "{}.{}",
369 table.namespace().clone().inner().into_iter().join("."),
370 table.name()
371 );
372
373 let table_name_jstr = env.new_string(&table_name_str).unwrap();
374
375 call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
376 &table_name_jstr)
377 .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
378
379 Ok(())
380 })
381 .map_err(|e| {
382 iceberg::Error::new(
383 iceberg::ErrorKind::Unexpected,
384 "Failed to drop iceberg table.",
385 )
386 .with_source(e)
387 })
388 }
389
390 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
392 execute_with_jni_env(self.jvm, |env| {
393 let table_name_str = format!(
394 "{}.{}",
395 table.namespace().clone().inner().into_iter().join("."),
396 table.name()
397 );
398
399 let table_name_jstr = env.new_string(&table_name_str).unwrap();
400
401 let exists =
402 call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
403 &table_name_jstr)
404 .with_context(|| {
405 format!("Failed to check iceberg table exists: {table_name_str}")
406 })?;
407
408 Ok(exists)
409 })
410 .map_err(|e| {
411 iceberg::Error::new(
412 iceberg::ErrorKind::Unexpected,
413 "Failed to check iceberg table exists.",
414 )
415 .with_source(e)
416 })
417 }
418
419 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
421 todo!()
422 }
423
424 async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
426 execute_with_jni_env(self.jvm, |env| {
427 let requirements = commit.take_requirements();
428 let updates = commit.take_updates();
429 let request = CommitTableRequest {
430 identifier: commit.identifier().clone(),
431 requirements,
432 updates,
433 };
434 let request_str = serde_json::to_string(&request)?;
435
436 let request_jni_str = env.new_string(&request_str).with_context(|| {
437 format!("Failed to create jni string from request json: {request_str}.")
438 })?;
439
440 let result_json =
441 call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
442 &request_jni_str)
443 .with_context(|| {
444 format!("Failed to update iceberg table: {}", commit.identifier())
445 })?;
446
447 let rust_json_str = jobj_to_str(env, result_json)?;
448
449 let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
450
451 tracing::info!(
452 "Table metadata location of {} is {}",
453 commit.identifier(),
454 response.metadata_location
455 );
456
457 let table_metadata = response.metadata;
458
459 let file_io = FileIO::from_path(&response.metadata_location)?
460 .with_props(self.file_io_props.iter())
461 .build()?;
462
463 Ok(Table::builder()
464 .file_io(file_io)
465 .identifier(commit.identifier().clone())
466 .metadata(table_metadata)
467 .build()?)
468 })
469 .map_err(|e| {
470 iceberg::Error::new(
471 iceberg::ErrorKind::Unexpected,
472 "Failed to update iceberg table.",
473 )
474 .with_source(e)
475 })
476 }
477}
478
479impl Drop for JniCatalog {
480 fn drop(&mut self) {
481 let _ = execute_with_jni_env(self.jvm, |env| {
482 call_method!(env, self.java_catalog.as_obj(), {void close()})
483 .with_context(|| "Failed to close iceberg catalog".to_owned())?;
484 Ok(())
485 })
486 .inspect_err(
487 |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
488 );
489 }
490}
491
492impl JniCatalog {
493 fn build(
494 file_io_props: HashMap<String, String>,
495 name: impl ToString,
496 catalog_impl: impl ToString,
497 java_catalog_props: HashMap<String, String>,
498 ) -> ConnectorResult<Self> {
499 let jvm = JVM.get_or_init()?;
500
501 execute_with_jni_env(jvm, |env| {
502 let props = env.new_object_array(
504 (java_catalog_props.len() * 2) as i32,
505 "java/lang/String",
506 JObject::null(),
507 )?;
508 for (i, (key, value)) in java_catalog_props.iter().enumerate() {
509 let key_j_str = env.new_string(key)?;
510 let value_j_str = env.new_string(value)?;
511 env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
512 env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
513 }
514
515 let jni_catalog_wrapper = env
516 .call_static_method(
517 "com/risingwave/connector/catalog/JniCatalogWrapper",
518 "create",
519 "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
520 &[
521 (&env.new_string(name.to_string()).unwrap()).into(),
522 (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
523 (&props).into(),
524 ],
525 )?;
526
527 let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
528
529 Ok(Self {
530 java_catalog: jni_catalog,
531 jvm,
532 file_io_props,
533 })
534 })
535 .map_err(Into::into)
536 }
537
538 pub fn build_catalog(
539 file_io_props: HashMap<String, String>,
540 name: impl ToString,
541 catalog_impl: impl ToString,
542 java_catalog_props: HashMap<String, String>,
543 ) -> ConnectorResult<Arc<dyn Catalog>> {
544 let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
545 Ok(Arc::new(catalog) as Arc<dyn Catalog>)
546 }
547}