1#![expect(
18 clippy::disallowed_types,
19 reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33 TableUpdate,
34};
35use itertools::Itertools;
36use jni::objects::{GlobalRef, JObject};
37use risingwave_common::bail;
38use risingwave_common::global_jvm::Jvm;
39use risingwave_jni_core::call_method;
40use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
41use serde::{Deserialize, Serialize};
42use thiserror_ext::AsReport;
43
44use crate::error::ConnectorResult;
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48struct LoadTableResponse {
49 pub metadata_location: Option<String>,
50 pub metadata: TableMetadata,
51 pub _config: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize)]
55#[serde(rename_all = "kebab-case")]
56struct CreateTableRequest {
57 pub name: String,
59 pub location: Option<String>,
61 pub schema: Schema,
63 pub partition_spec: Option<UnboundPartitionSpec>,
65 pub write_order: Option<SortOrder>,
67 pub properties: HashMap<String, String>,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72struct CommitTableRequest {
73 identifier: TableIdent,
74 requirements: Vec<TableRequirement>,
75 updates: Vec<TableUpdate>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79#[serde(rename_all = "kebab-case")]
80struct CommitTableResponse {
81 metadata_location: String,
82 metadata: TableMetadata,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86#[serde(rename_all = "kebab-case")]
87struct ListNamespacesResponse {
88 namespaces: Vec<NamespaceIdent>,
89 next_page_token: Option<String>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94struct ListTablesResponse {
95 identifiers: Vec<TableIdent>,
96 next_page_token: Option<String>,
97}
98
99impl From<&TableCreation> for CreateTableRequest {
100 fn from(value: &TableCreation) -> Self {
101 Self {
102 name: value.name.clone(),
103 location: value.location.clone(),
104 schema: value.schema.clone(),
105 partition_spec: value.partition_spec.clone(),
106 write_order: value.sort_order.clone(),
107 properties: value.properties.clone(),
108 }
109 }
110}
111
112#[derive(Debug)]
113pub struct JniCatalog {
114 java_catalog: GlobalRef,
115 jvm: Jvm,
116 file_io_props: HashMap<String, String>,
117}
118
119#[async_trait]
120impl Catalog for JniCatalog {
121 async fn list_namespaces(
123 &self,
124 _parent: Option<&NamespaceIdent>,
125 ) -> iceberg::Result<Vec<NamespaceIdent>> {
126 execute_with_jni_env(self.jvm, |env| {
127 let result_json =
128 call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
129 .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
130
131 let rust_json_str = jobj_to_str(env, result_json)?;
132
133 let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
134
135 Ok(resp.namespaces)
136 })
137 .map_err(|e| {
138 iceberg::Error::new(
139 iceberg::ErrorKind::Unexpected,
140 "Failed to list iceberg namespaces.",
141 )
142 .with_source(e)
143 })
144 }
145
146 async fn create_namespace(
148 &self,
149 namespace: &iceberg::NamespaceIdent,
150 _properties: HashMap<String, String>,
151 ) -> iceberg::Result<iceberg::Namespace> {
152 execute_with_jni_env(self.jvm, |env| {
153 let namespace_jstr = if namespace.is_empty() {
154 env.new_string("").unwrap()
155 } else {
156 if namespace.len() > 1 {
157 bail!("Namespace with more than one level is not supported!")
158 }
159 env.new_string(&namespace[0]).unwrap()
160 };
161
162 call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
163 &namespace_jstr)
164 .with_context(|| format!("Failed to create namespace: {namespace}"))?;
165
166 Ok(Namespace::new(namespace.clone()))
167 })
168 .map_err(|e| {
169 iceberg::Error::new(
170 iceberg::ErrorKind::Unexpected,
171 "Failed to create namespace.",
172 )
173 .with_source(e)
174 })
175 }
176
177 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
179 todo!()
180 }
181
182 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
184 execute_with_jni_env(self.jvm, |env| {
185 let namespace_jstr = if namespace.is_empty() {
186 env.new_string("").unwrap()
187 } else {
188 if namespace.len() > 1 {
189 bail!("Namespace with more than one level is not supported!")
190 }
191 env.new_string(&namespace[0]).unwrap()
192 };
193
194 let exists =
195 call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
196 &namespace_jstr)
197 .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
198
199 Ok(exists)
200 })
201 .map_err(|e| {
202 iceberg::Error::new(
203 iceberg::ErrorKind::Unexpected,
204 "Failed to check namespace exists.",
205 )
206 .with_source(e)
207 })
208 }
209
210 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
212 todo!()
213 }
214
215 async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
217 execute_with_jni_env(self.jvm, |env| {
218 let namespace_jstr = if namespace.is_empty() {
219 env.new_string("").unwrap()
220 } else {
221 if namespace.len() > 1 {
222 bail!("Namespace with more than one level is not supported!")
223 }
224 env.new_string(&namespace[0]).unwrap()
225 };
226
227 let result_json =
228 call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
229 &namespace_jstr)
230 .with_context(|| {
231 format!("Failed to list iceberg tables in namespace: {}", namespace)
232 })?;
233
234 let rust_json_str = jobj_to_str(env, result_json)?;
235
236 let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
237
238 Ok(resp.identifiers)
239 })
240 .map_err(|e| {
241 iceberg::Error::new(
242 iceberg::ErrorKind::Unexpected,
243 "Failed to list iceberg tables.",
244 )
245 .with_source(e)
246 })
247 }
248
249 async fn update_namespace(
250 &self,
251 _namespace: &NamespaceIdent,
252 _properties: HashMap<String, String>,
253 ) -> iceberg::Result<()> {
254 todo!()
255 }
256
257 async fn create_table(
259 &self,
260 namespace: &NamespaceIdent,
261 creation: TableCreation,
262 ) -> iceberg::Result<Table> {
263 execute_with_jni_env(self.jvm, |env| {
264 let namespace_jstr = if namespace.is_empty() {
265 env.new_string("").unwrap()
266 } else {
267 if namespace.len() > 1 {
268 bail!("Namespace with more than one level is not supported!")
269 }
270 env.new_string(&namespace[0]).unwrap()
271 };
272
273 let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
274
275 let creation_jstr = env.new_string(&creation_str).unwrap();
276
277 let result_json =
278 call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
279 &namespace_jstr, &creation_jstr)
280 .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
281
282 let rust_json_str = jobj_to_str(env, result_json)?;
283
284 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
285
286 let metadata_location = resp.metadata_location.ok_or_else(|| {
287 iceberg::Error::new(
288 iceberg::ErrorKind::FeatureUnsupported,
289 "Loading uncommitted table is not supported!",
290 )
291 })?;
292
293 let table_metadata = resp.metadata;
294
295 let file_io = FileIO::from_path(&metadata_location)?
296 .with_props(self.file_io_props.iter())
297 .build()?;
298
299 Ok(Table::builder()
300 .file_io(file_io)
301 .identifier(TableIdent::new(namespace.clone(), creation.name))
302 .metadata(table_metadata)
303 .build())
304 })
305 .map_err(|e| {
306 iceberg::Error::new(
307 iceberg::ErrorKind::Unexpected,
308 "Failed to create iceberg table.",
309 )
310 .with_source(e)
311 })?
312 }
313
314 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
316 execute_with_jni_env(self.jvm, |env| {
317 let table_name_str = format!(
318 "{}.{}",
319 table.namespace().clone().inner().into_iter().join("."),
320 table.name()
321 );
322
323 let table_name_jstr = env.new_string(&table_name_str).unwrap();
324
325 let result_json =
326 call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
327 &table_name_jstr)
328 .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
329
330 let rust_json_str = jobj_to_str(env, result_json)?;
331
332 let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
333
334 let metadata_location = resp.metadata_location.ok_or_else(|| {
335 iceberg::Error::new(
336 iceberg::ErrorKind::FeatureUnsupported,
337 "Loading uncommitted table is not supported!",
338 )
339 })?;
340
341 tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
342
343 let table_metadata = resp.metadata;
344
345 let file_io = FileIO::from_path(&metadata_location)?
346 .with_props(self.file_io_props.iter())
347 .build()?;
348
349 Ok(Table::builder()
350 .file_io(file_io)
351 .identifier(table.clone())
352 .metadata(table_metadata)
353 .build())
354 })
355 .map_err(|e| {
356 iceberg::Error::new(
357 iceberg::ErrorKind::Unexpected,
358 "Failed to load iceberg table.",
359 )
360 .with_source(e)
361 })?
362 }
363
364 async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
366 let jvm = self.jvm;
367 let table = table.to_owned();
368 let java_catalog = self.java_catalog.clone();
369 tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
371 execute_with_jni_env(jvm, |env| {
372 let table_name_str = format!(
373 "{}.{}",
374 table.namespace().clone().inner().into_iter().join("."),
375 table.name()
376 );
377
378 let table_name_jstr = env.new_string(&table_name_str).unwrap();
379
380 call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
381 &table_name_jstr)
382 .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
383
384 Ok(())
385 })
386 .map_err(|e| {
387 iceberg::Error::new(
388 iceberg::ErrorKind::Unexpected,
389 "Failed to drop iceberg table.",
390 )
391 .with_source(e)
392 })
393 })
394 .await
395 .map_err(|e| {
396 iceberg::Error::new(
397 iceberg::ErrorKind::Unexpected,
398 "Failed to drop iceberg table.",
399 )
400 .with_source(e)
401 })?
402 }
403
404 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
406 execute_with_jni_env(self.jvm, |env| {
407 let table_name_str = format!(
408 "{}.{}",
409 table.namespace().clone().inner().into_iter().join("."),
410 table.name()
411 );
412
413 let table_name_jstr = env.new_string(&table_name_str).unwrap();
414
415 let exists =
416 call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
417 &table_name_jstr)
418 .with_context(|| {
419 format!("Failed to check iceberg table exists: {table_name_str}")
420 })?;
421
422 Ok(exists)
423 })
424 .map_err(|e| {
425 iceberg::Error::new(
426 iceberg::ErrorKind::Unexpected,
427 "Failed to check iceberg table exists.",
428 )
429 .with_source(e)
430 })
431 }
432
433 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
435 todo!()
436 }
437
438 async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
440 execute_with_jni_env(self.jvm, |env| {
441 let requirements = commit.take_requirements();
442 let updates = commit.take_updates();
443 let request = CommitTableRequest {
444 identifier: commit.identifier().clone(),
445 requirements,
446 updates,
447 };
448 let request_str = serde_json::to_string(&request)?;
449
450 let request_jni_str = env.new_string(&request_str).with_context(|| {
451 format!("Failed to create jni string from request json: {request_str}.")
452 })?;
453
454 let result_json =
455 call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
456 &request_jni_str)
457 .with_context(|| {
458 format!("Failed to update iceberg table: {}", commit.identifier())
459 })?;
460
461 let rust_json_str = jobj_to_str(env, result_json)?;
462
463 let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
464
465 tracing::info!(
466 "Table metadata location of {} is {}",
467 commit.identifier(),
468 response.metadata_location
469 );
470
471 let table_metadata = response.metadata;
472
473 let file_io = FileIO::from_path(&response.metadata_location)?
474 .with_props(self.file_io_props.iter())
475 .build()?;
476
477 Ok(Table::builder()
478 .file_io(file_io)
479 .identifier(commit.identifier().clone())
480 .metadata(table_metadata)
481 .build()?)
482 })
483 .map_err(|e| {
484 iceberg::Error::new(
485 iceberg::ErrorKind::Unexpected,
486 "Failed to update iceberg table.",
487 )
488 .with_source(e)
489 })
490 }
491}
492
493impl Drop for JniCatalog {
494 fn drop(&mut self) {
495 let _ = execute_with_jni_env(self.jvm, |env| {
496 call_method!(env, self.java_catalog.as_obj(), {void close()})
497 .with_context(|| "Failed to close iceberg catalog".to_owned())?;
498 Ok(())
499 })
500 .inspect_err(
501 |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
502 );
503 }
504}
505
506impl JniCatalog {
507 fn build(
508 file_io_props: HashMap<String, String>,
509 name: impl ToString,
510 catalog_impl: impl ToString,
511 java_catalog_props: HashMap<String, String>,
512 ) -> ConnectorResult<Self> {
513 let jvm = Jvm::get_or_init()?;
514
515 execute_with_jni_env(jvm, |env| {
516 let props = env.new_object_array(
518 (java_catalog_props.len() * 2) as i32,
519 "java/lang/String",
520 JObject::null(),
521 )?;
522 for (i, (key, value)) in java_catalog_props.iter().enumerate() {
523 let key_j_str = env.new_string(key)?;
524 let value_j_str = env.new_string(value)?;
525 env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
526 env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
527 }
528
529 let jni_catalog_wrapper = env
530 .call_static_method(
531 "com/risingwave/connector/catalog/JniCatalogWrapper",
532 "create",
533 "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
534 &[
535 (&env.new_string(name.to_string()).unwrap()).into(),
536 (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
537 (&props).into(),
538 ],
539 )?;
540
541 let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
542
543 Ok(Self {
544 java_catalog: jni_catalog,
545 jvm,
546 file_io_props,
547 })
548 })
549 .map_err(Into::into)
550 }
551
552 pub fn build_catalog(
553 file_io_props: HashMap<String, String>,
554 name: impl ToString,
555 catalog_impl: impl ToString,
556 java_catalog_props: HashMap<String, String>,
557 ) -> ConnectorResult<Arc<dyn Catalog>> {
558 let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
559 Ok(Arc::new(catalog) as Arc<dyn Catalog>)
560 }
561}