1use std::collections::HashMap;
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use anyhow::Context;
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
25use iceberg::table::Table;
26use iceberg::{
27 Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
28 TableUpdate,
29};
30use itertools::Itertools;
31use jni::JavaVM;
32use jni::objects::{GlobalRef, JObject};
33use risingwave_common::bail;
34use risingwave_jni_core::call_method;
35use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env, jobj_to_str};
36use serde::{Deserialize, Serialize};
37use thiserror_ext::AsReport;
38
39use crate::error::ConnectorResult;
40
41#[derive(Debug, Deserialize)]
42#[serde(rename_all = "kebab-case")]
43struct LoadTableResponse {
44 pub metadata_location: Option<String>,
45 pub metadata: TableMetadata,
46 pub _config: Option<HashMap<String, String>>,
47}
48
49#[derive(Debug, Serialize)]
50#[serde(rename_all = "kebab-case")]
51struct CreateTableRequest {
52 pub name: String,
54 pub location: Option<String>,
56 pub schema: Schema,
58 pub partition_spec: Option<UnboundPartitionSpec>,
60 pub write_order: Option<SortOrder>,
62 pub properties: HashMap<String, String>,
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67struct CommitTableRequest {
68 identifier: TableIdent,
69 requirements: Vec<TableRequirement>,
70 updates: Vec<TableUpdate>,
71}
72
73#[derive(Debug, Serialize, Deserialize)]
74#[serde(rename_all = "kebab-case")]
75struct CommitTableResponse {
76 metadata_location: String,
77 metadata: TableMetadata,
78}
79
80#[derive(Debug, Serialize, Deserialize)]
81#[serde(rename_all = "kebab-case")]
82struct ListNamespacesResponse {
83 namespaces: Vec<NamespaceIdent>,
84 next_page_token: Option<String>,
85}
86
87#[derive(Debug, Serialize, Deserialize)]
88#[serde(rename_all = "kebab-case")]
89struct ListTablesResponse {
90 identifiers: Vec<TableIdent>,
91 next_page_token: Option<String>,
92}
93
94impl From<&TableCreation> for CreateTableRequest {
95 fn from(value: &TableCreation) -> Self {
96 Self {
97 name: value.name.clone(),
98 location: value.location.clone(),
99 schema: value.schema.clone(),
100 partition_spec: value.partition_spec.clone(),
101 write_order: value.sort_order.clone(),
102 properties: value.properties.clone(),
103 }
104 }
105}
106
107#[derive(Debug)]
108pub struct JniCatalog {
109 java_catalog: GlobalRef,
110 jvm: &'static JavaVM,
111 file_io_props: HashMap<String, String>,
112}
113
114#[async_trait]
115impl Catalog for JniCatalog {
116 async fn list_namespaces(
118 &self,
119 _parent: Option<&NamespaceIdent>,
120 ) -> iceberg::Result<Vec<NamespaceIdent>> {
121 execute_with_jni_env(self.jvm, |env| {
122 let result_json =
123 call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
124 .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
125
126 let rust_json_str = jobj_to_str(env, result_json)?;
127
128 let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
129
130 Ok(resp.namespaces)
131 })
132 .map_err(|e| {
133 iceberg::Error::new(
134 iceberg::ErrorKind::Unexpected,
135 "Failed to list iceberg namespaces.",
136 )
137 .with_source(e)
138 })
139 }
140
141 async fn create_namespace(
143 &self,
144 namespace: &iceberg::NamespaceIdent,
145 _properties: HashMap<String, String>,
146 ) -> iceberg::Result<iceberg::Namespace> {
147 execute_with_jni_env(self.jvm, |env| {
148 let namespace_jstr = if namespace.is_empty() {
149 env.new_string("").unwrap()
150 } else {
151 if namespace.len() > 1 {
152 bail!("Namespace with more than one level is not supported!")
153 }
154 env.new_string(&namespace[0]).unwrap()
155 };
156
157 call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
158 &namespace_jstr)
159 .with_context(|| format!("Failed to create namespace: {namespace}"))?;
160
161 Ok(Namespace::new(namespace.clone()))
162 })
163 .map_err(|e| {
164 iceberg::Error::new(
165 iceberg::ErrorKind::Unexpected,
166 "Failed to create namespace.",
167 )
168 .with_source(e)
169 })
170 }
171
172 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
174 todo!()
175 }
176
177 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
179 execute_with_jni_env(self.jvm, |env| {
180 let namespace_jstr = if namespace.is_empty() {
181 env.new_string("").unwrap()
182 } else {
183 if namespace.len() > 1 {
184 bail!("Namespace with more than one level is not supported!")
185 }
186 env.new_string(&namespace[0]).unwrap()
187 };
188
189 let exists =
190 call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
191 &namespace_jstr)
192 .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
193
194 Ok(exists)
195 })
196 .map_err(|e| {
197 iceberg::Error::new(
198 iceberg::ErrorKind::Unexpected,
199 "Failed to check namespace exists.",
200 )
201 .with_source(e)
202 })
203 }
204
205 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
207 todo!()
208 }
209
210 async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
212 execute_with_jni_env(self.jvm, |env| {
213 let namespace_jstr = if namespace.is_empty() {
214 env.new_string("").unwrap()
215 } else {
216 if namespace.len() > 1 {
217 bail!("Namespace with more than one level is not supported!")
218 }
219 env.new_string(&namespace[0]).unwrap()
220 };
221
222 let result_json =
223 call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
224 &namespace_jstr)
225 .with_context(|| {
226 format!("Failed to list iceberg tables in namespace: {}", namespace)
227 })?;
228
229 let rust_json_str = jobj_to_str(env, result_json)?;
230
231 let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
232
233 Ok(resp.identifiers)
234 })
235 .map_err(|e| {
236 iceberg::Error::new(
237 iceberg::ErrorKind::Unexpected,
238 "Failed to list iceberg tables.",
239 )
240 .with_source(e)
241 })
242 }
243
244 async fn update_namespace(
245 &self,
246 _namespace: &NamespaceIdent,
247 _properties: HashMap<String, String>,
248 ) -> iceberg::Result<()> {
249 todo!()
250 }
251
252 async fn create_table(
254 &self,
255 namespace: &NamespaceIdent,
256 creation: TableCreation,
257 ) -> iceberg::Result<Table> {
258 execute_with_jni_env(self.jvm, |env| {
259 let namespace_jstr = if namespace.is_empty() {
260 env.new_string("").unwrap()
261 } else {
262 if namespace.len() > 1 {
263 bail!("Namespace with more than one level is not supported!")
264 }
265 env.new_string(&namespace[0]).unwrap()
266 };
267
268 let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
269
270 let creation_jstr = env.new_string(&creation_str).unwrap();
271
272 let result_json =
273 call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
274 &namespace_jstr, &creation_jstr)
275 .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
276
277 let rust_json_str = jobj_to_str(env, result_json)?;
278
279 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
280
281 let metadata_location = resp.metadata_location.ok_or_else(|| {
282 iceberg::Error::new(
283 iceberg::ErrorKind::FeatureUnsupported,
284 "Loading uncommitted table is not supported!",
285 )
286 })?;
287
288 let table_metadata = resp.metadata;
289
290 let file_io = FileIO::from_path(&metadata_location)?
291 .with_props(self.file_io_props.iter())
292 .build()?;
293
294 Ok(Table::builder()
295 .file_io(file_io)
296 .identifier(TableIdent::new(namespace.clone(), creation.name))
297 .metadata(table_metadata)
298 .build())
299 })
300 .map_err(|e| {
301 iceberg::Error::new(
302 iceberg::ErrorKind::Unexpected,
303 "Failed to create iceberg table.",
304 )
305 .with_source(e)
306 })?
307 }
308
309 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
311 execute_with_jni_env(self.jvm, |env| {
312 let table_name_str = format!(
313 "{}.{}",
314 table.namespace().clone().inner().into_iter().join("."),
315 table.name()
316 );
317
318 let table_name_jstr = env.new_string(&table_name_str).unwrap();
319
320 let result_json =
321 call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
322 &table_name_jstr)
323 .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
324
325 let rust_json_str = jobj_to_str(env, result_json)?;
326
327 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
328
329 let metadata_location = resp.metadata_location.ok_or_else(|| {
330 iceberg::Error::new(
331 iceberg::ErrorKind::FeatureUnsupported,
332 "Loading uncommitted table is not supported!",
333 )
334 })?;
335
336 tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
337
338 let table_metadata = resp.metadata;
339
340 let file_io = FileIO::from_path(&metadata_location)?
341 .with_props(self.file_io_props.iter())
342 .build()?;
343
344 Ok(Table::builder()
345 .file_io(file_io)
346 .identifier(table.clone())
347 .metadata(table_metadata)
348 .build())
349 })
350 .map_err(|e| {
351 iceberg::Error::new(
352 iceberg::ErrorKind::Unexpected,
353 "Failed to load iceberg table.",
354 )
355 .with_source(e)
356 })?
357 }
358
359 async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
361 execute_with_jni_env(self.jvm, |env| {
362 let table_name_str = format!(
363 "{}.{}",
364 table.namespace().clone().inner().into_iter().join("."),
365 table.name()
366 );
367
368 let table_name_jstr = env.new_string(&table_name_str).unwrap();
369
370 call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
371 &table_name_jstr)
372 .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
373
374 Ok(())
375 })
376 .map_err(|e| {
377 iceberg::Error::new(
378 iceberg::ErrorKind::Unexpected,
379 "Failed to drop iceberg table.",
380 )
381 .with_source(e)
382 })
383 }
384
385 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
387 execute_with_jni_env(self.jvm, |env| {
388 let table_name_str = format!(
389 "{}.{}",
390 table.namespace().clone().inner().into_iter().join("."),
391 table.name()
392 );
393
394 let table_name_jstr = env.new_string(&table_name_str).unwrap();
395
396 let exists =
397 call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
398 &table_name_jstr)
399 .with_context(|| {
400 format!("Failed to check iceberg table exists: {table_name_str}")
401 })?;
402
403 Ok(exists)
404 })
405 .map_err(|e| {
406 iceberg::Error::new(
407 iceberg::ErrorKind::Unexpected,
408 "Failed to check iceberg table exists.",
409 )
410 .with_source(e)
411 })
412 }
413
414 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
416 todo!()
417 }
418
419 async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
421 execute_with_jni_env(self.jvm, |env| {
422 let requirements = commit.take_requirements();
423 let updates = commit.take_updates();
424 let request = CommitTableRequest {
425 identifier: commit.identifier().clone(),
426 requirements,
427 updates,
428 };
429 let request_str = serde_json::to_string(&request)?;
430
431 let request_jni_str = env.new_string(&request_str).with_context(|| {
432 format!("Failed to create jni string from request json: {request_str}.")
433 })?;
434
435 let result_json =
436 call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
437 &request_jni_str)
438 .with_context(|| {
439 format!("Failed to update iceberg table: {}", commit.identifier())
440 })?;
441
442 let rust_json_str = jobj_to_str(env, result_json)?;
443
444 let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
445
446 tracing::info!(
447 "Table metadata location of {} is {}",
448 commit.identifier(),
449 response.metadata_location
450 );
451
452 let table_metadata = response.metadata;
453
454 let file_io = FileIO::from_path(&response.metadata_location)?
455 .with_props(self.file_io_props.iter())
456 .build()?;
457
458 Ok(Table::builder()
459 .file_io(file_io)
460 .identifier(commit.identifier().clone())
461 .metadata(table_metadata)
462 .build()?)
463 })
464 .map_err(|e| {
465 iceberg::Error::new(
466 iceberg::ErrorKind::Unexpected,
467 "Failed to update iceberg table.",
468 )
469 .with_source(e)
470 })
471 }
472}
473
474impl Drop for JniCatalog {
475 fn drop(&mut self) {
476 let _ = execute_with_jni_env(self.jvm, |env| {
477 call_method!(env, self.java_catalog.as_obj(), {void close()})
478 .with_context(|| "Failed to close iceberg catalog".to_owned())?;
479 Ok(())
480 })
481 .inspect_err(
482 |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
483 );
484 }
485}
486
487impl JniCatalog {
488 fn build(
489 file_io_props: HashMap<String, String>,
490 name: impl ToString,
491 catalog_impl: impl ToString,
492 java_catalog_props: HashMap<String, String>,
493 ) -> ConnectorResult<Self> {
494 let jvm = JVM.get_or_init()?;
495
496 execute_with_jni_env(jvm, |env| {
497 let props = env.new_object_array(
499 (java_catalog_props.len() * 2) as i32,
500 "java/lang/String",
501 JObject::null(),
502 )?;
503 for (i, (key, value)) in java_catalog_props.iter().enumerate() {
504 let key_j_str = env.new_string(key)?;
505 let value_j_str = env.new_string(value)?;
506 env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
507 env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
508 }
509
510 let jni_catalog_wrapper = env
511 .call_static_method(
512 "com/risingwave/connector/catalog/JniCatalogWrapper",
513 "create",
514 "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
515 &[
516 (&env.new_string(name.to_string()).unwrap()).into(),
517 (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
518 (&props).into(),
519 ],
520 )?;
521
522 let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
523
524 Ok(Self {
525 java_catalog: jni_catalog,
526 jvm,
527 file_io_props,
528 })
529 })
530 .map_err(Into::into)
531 }
532
533 pub fn build_catalog(
534 file_io_props: HashMap<String, String>,
535 name: impl ToString,
536 catalog_impl: impl ToString,
537 java_catalog_props: HashMap<String, String>,
538 ) -> ConnectorResult<Arc<dyn Catalog>> {
539 let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
540 Ok(Arc::new(catalog) as Arc<dyn Catalog>)
541 }
542}