risingwave_connector/connector_common/iceberg/
jni_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provide jni catalog.
16
17#![expect(
18    clippy::disallowed_types,
19    reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33    TableUpdate,
34};
35use itertools::Itertools;
36use jni::objects::{GlobalRef, JObject};
37use risingwave_common::bail;
38use risingwave_common::global_jvm::Jvm;
39use risingwave_jni_core::call_method;
40use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
41use serde::{Deserialize, Serialize};
42use thiserror_ext::AsReport;
43
44use crate::error::ConnectorResult;
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48struct LoadTableResponse {
49    pub metadata_location: Option<String>,
50    pub metadata: TableMetadata,
51    pub _config: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize)]
55#[serde(rename_all = "kebab-case")]
56struct CreateTableRequest {
57    /// The name of the table.
58    pub name: String,
59    /// The location of the table.
60    pub location: Option<String>,
61    /// The schema of the table.
62    pub schema: Schema,
63    /// The partition spec of the table, could be None.
64    pub partition_spec: Option<UnboundPartitionSpec>,
65    /// The sort order of the table.
66    pub write_order: Option<SortOrder>,
67    /// The properties of the table.
68    pub properties: HashMap<String, String>,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72struct CommitTableRequest {
73    identifier: TableIdent,
74    requirements: Vec<TableRequirement>,
75    updates: Vec<TableUpdate>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79#[serde(rename_all = "kebab-case")]
80struct CommitTableResponse {
81    metadata_location: String,
82    metadata: TableMetadata,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86#[serde(rename_all = "kebab-case")]
87struct ListNamespacesResponse {
88    namespaces: Vec<NamespaceIdent>,
89    next_page_token: Option<String>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94struct ListTablesResponse {
95    identifiers: Vec<TableIdent>,
96    next_page_token: Option<String>,
97}
98
99impl From<&TableCreation> for CreateTableRequest {
100    fn from(value: &TableCreation) -> Self {
101        Self {
102            name: value.name.clone(),
103            location: value.location.clone(),
104            schema: value.schema.clone(),
105            partition_spec: value.partition_spec.clone(),
106            write_order: value.sort_order.clone(),
107            properties: value.properties.clone(),
108        }
109    }
110}
111
112#[derive(Debug)]
113pub struct JniCatalog {
114    java_catalog: GlobalRef,
115    jvm: Jvm,
116    file_io_props: HashMap<String, String>,
117}
118
119#[async_trait]
120impl Catalog for JniCatalog {
121    /// List namespaces from the catalog.
122    async fn list_namespaces(
123        &self,
124        _parent: Option<&NamespaceIdent>,
125    ) -> iceberg::Result<Vec<NamespaceIdent>> {
126        execute_with_jni_env(self.jvm, |env| {
127            let result_json =
128                call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
129                    .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
130
131            let rust_json_str = jobj_to_str(env, result_json)?;
132
133            let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
134
135            Ok(resp.namespaces)
136        })
137        .map_err(|e| {
138            iceberg::Error::new(
139                iceberg::ErrorKind::Unexpected,
140                "Failed to list iceberg namespaces.",
141            )
142            .with_source(e)
143        })
144    }
145
146    /// Create a new namespace inside the catalog.
147    async fn create_namespace(
148        &self,
149        namespace: &iceberg::NamespaceIdent,
150        _properties: HashMap<String, String>,
151    ) -> iceberg::Result<iceberg::Namespace> {
152        execute_with_jni_env(self.jvm, |env| {
153            let namespace_jstr = if namespace.is_empty() {
154                env.new_string("").unwrap()
155            } else {
156                if namespace.len() > 1 {
157                    bail!("Namespace with more than one level is not supported!")
158                }
159                env.new_string(&namespace[0]).unwrap()
160            };
161
162            call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
163                &namespace_jstr)
164            .with_context(|| format!("Failed to create namespace: {namespace}"))?;
165
166            Ok(Namespace::new(namespace.clone()))
167        })
168        .map_err(|e| {
169            iceberg::Error::new(
170                iceberg::ErrorKind::Unexpected,
171                "Failed to create namespace.",
172            )
173            .with_source(e)
174        })
175    }
176
177    /// Get a namespace information from the catalog.
178    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
179        todo!()
180    }
181
182    /// Check if namespace exists in catalog.
183    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
184        execute_with_jni_env(self.jvm, |env| {
185            let namespace_jstr = if namespace.is_empty() {
186                env.new_string("").unwrap()
187            } else {
188                if namespace.len() > 1 {
189                    bail!("Namespace with more than one level is not supported!")
190                }
191                env.new_string(&namespace[0]).unwrap()
192            };
193
194            let exists =
195                call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
196                &namespace_jstr)
197                .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
198
199            Ok(exists)
200        })
201        .map_err(|e| {
202            iceberg::Error::new(
203                iceberg::ErrorKind::Unexpected,
204                "Failed to check namespace exists.",
205            )
206            .with_source(e)
207        })
208    }
209
210    /// Drop a namespace from the catalog.
211    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
212        todo!()
213    }
214
215    /// List tables from namespace.
216    async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
217        execute_with_jni_env(self.jvm, |env| {
218            let namespace_jstr = if namespace.is_empty() {
219                env.new_string("").unwrap()
220            } else {
221                if namespace.len() > 1 {
222                    bail!("Namespace with more than one level is not supported!")
223                }
224                env.new_string(&namespace[0]).unwrap()
225            };
226
227            let result_json =
228                call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
229                &namespace_jstr)
230                .with_context(|| {
231                    format!("Failed to list iceberg tables in namespace: {}", namespace)
232                })?;
233
234            let rust_json_str = jobj_to_str(env, result_json)?;
235
236            let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
237
238            Ok(resp.identifiers)
239        })
240        .map_err(|e| {
241            iceberg::Error::new(
242                iceberg::ErrorKind::Unexpected,
243                "Failed to list iceberg  tables.",
244            )
245            .with_source(e)
246        })
247    }
248
249    async fn update_namespace(
250        &self,
251        _namespace: &NamespaceIdent,
252        _properties: HashMap<String, String>,
253    ) -> iceberg::Result<()> {
254        todo!()
255    }
256
257    /// Create a new table inside the namespace.
258    async fn create_table(
259        &self,
260        namespace: &NamespaceIdent,
261        creation: TableCreation,
262    ) -> iceberg::Result<Table> {
263        execute_with_jni_env(self.jvm, |env| {
264            let namespace_jstr = if namespace.is_empty() {
265                env.new_string("").unwrap()
266            } else {
267                if namespace.len() > 1 {
268                    bail!("Namespace with more than one level is not supported!")
269                }
270                env.new_string(&namespace[0]).unwrap()
271            };
272
273            let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
274
275            let creation_jstr = env.new_string(&creation_str).unwrap();
276
277            let result_json =
278                call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
279                &namespace_jstr, &creation_jstr)
280                .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
281
282            let rust_json_str = jobj_to_str(env, result_json)?;
283
284            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
285
286            let metadata_location = resp.metadata_location.ok_or_else(|| {
287                iceberg::Error::new(
288                    iceberg::ErrorKind::FeatureUnsupported,
289                    "Loading uncommitted table is not supported!",
290                )
291            })?;
292
293            let table_metadata = resp.metadata;
294
295            let file_io = FileIO::from_path(&metadata_location)?
296                .with_props(self.file_io_props.iter())
297                .build()?;
298
299            Ok(Table::builder()
300                .file_io(file_io)
301                .identifier(TableIdent::new(namespace.clone(), creation.name))
302                .metadata(table_metadata)
303                .build())
304        })
305        .map_err(|e| {
306            iceberg::Error::new(
307                iceberg::ErrorKind::Unexpected,
308                "Failed to create iceberg table.",
309            )
310            .with_source(e)
311        })?
312    }
313
314    /// Load table from the catalog.
315    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
316        execute_with_jni_env(self.jvm, |env| {
317            let table_name_str = format!(
318                "{}.{}",
319                table.namespace().clone().inner().into_iter().join("."),
320                table.name()
321            );
322
323            let table_name_jstr = env.new_string(&table_name_str).unwrap();
324
325            let result_json =
326                call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
327                &table_name_jstr)
328                .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
329
330            let rust_json_str = jobj_to_str(env, result_json)?;
331
332            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
333
334            let metadata_location = resp.metadata_location.ok_or_else(|| {
335                iceberg::Error::new(
336                    iceberg::ErrorKind::FeatureUnsupported,
337                    "Loading uncommitted table is not supported!",
338                )
339            })?;
340
341            tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
342
343            let table_metadata = resp.metadata;
344
345            let file_io = FileIO::from_path(&metadata_location)?
346                .with_props(self.file_io_props.iter())
347                .build()?;
348
349            Ok(Table::builder()
350                .file_io(file_io)
351                .identifier(table.clone())
352                .metadata(table_metadata)
353                .build())
354        })
355        .map_err(|e| {
356            iceberg::Error::new(
357                iceberg::ErrorKind::Unexpected,
358                "Failed to load iceberg table.",
359            )
360            .with_source(e)
361        })?
362    }
363
364    /// Drop a table from the catalog.
365    async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
366        let jvm = self.jvm;
367        let table = table.to_owned();
368        let java_catalog = self.java_catalog.clone();
369        // spawn blocking the drop table task, since dropping a table by default would purge the data which may take a long time.
370        tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
371            execute_with_jni_env(jvm, |env| {
372                let table_name_str = format!(
373                    "{}.{}",
374                    table.namespace().clone().inner().into_iter().join("."),
375                    table.name()
376                );
377
378                let table_name_jstr = env.new_string(&table_name_str).unwrap();
379
380                call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
381                &table_name_jstr)
382                .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
383
384                Ok(())
385            })
386            .map_err(|e| {
387                iceberg::Error::new(
388                    iceberg::ErrorKind::Unexpected,
389                    "Failed to drop iceberg table.",
390                )
391                .with_source(e)
392            })
393        })
394        .await
395        .map_err(|e| {
396            iceberg::Error::new(
397                iceberg::ErrorKind::Unexpected,
398                "Failed to drop iceberg table.",
399            )
400            .with_source(e)
401        })?
402    }
403
404    /// Check if a table exists in the catalog.
405    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
406        execute_with_jni_env(self.jvm, |env| {
407            let table_name_str = format!(
408                "{}.{}",
409                table.namespace().clone().inner().into_iter().join("."),
410                table.name()
411            );
412
413            let table_name_jstr = env.new_string(&table_name_str).unwrap();
414
415            let exists =
416                call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
417                &table_name_jstr)
418                .with_context(|| {
419                    format!("Failed to check iceberg table exists: {table_name_str}")
420                })?;
421
422            Ok(exists)
423        })
424        .map_err(|e| {
425            iceberg::Error::new(
426                iceberg::ErrorKind::Unexpected,
427                "Failed to check iceberg table exists.",
428            )
429            .with_source(e)
430        })
431    }
432
433    /// Rename a table in the catalog.
434    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
435        todo!()
436    }
437
438    /// Update a table to the catalog.
439    async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
440        execute_with_jni_env(self.jvm, |env| {
441            let requirements = commit.take_requirements();
442            let updates = commit.take_updates();
443            let request = CommitTableRequest {
444                identifier: commit.identifier().clone(),
445                requirements,
446                updates,
447            };
448            let request_str = serde_json::to_string(&request)?;
449
450            let request_jni_str = env.new_string(&request_str).with_context(|| {
451                format!("Failed to create jni string from request json: {request_str}.")
452            })?;
453
454            let result_json =
455                call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
456                &request_jni_str)
457                .with_context(|| {
458                    format!("Failed to update iceberg table: {}", commit.identifier())
459                })?;
460
461            let rust_json_str = jobj_to_str(env, result_json)?;
462
463            let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
464
465            tracing::info!(
466                "Table metadata location of {} is {}",
467                commit.identifier(),
468                response.metadata_location
469            );
470
471            let table_metadata = response.metadata;
472
473            let file_io = FileIO::from_path(&response.metadata_location)?
474                .with_props(self.file_io_props.iter())
475                .build()?;
476
477            Ok(Table::builder()
478                .file_io(file_io)
479                .identifier(commit.identifier().clone())
480                .metadata(table_metadata)
481                .build()?)
482        })
483        .map_err(|e| {
484            iceberg::Error::new(
485                iceberg::ErrorKind::Unexpected,
486                "Failed to update iceberg table.",
487            )
488            .with_source(e)
489        })
490    }
491}
492
493impl Drop for JniCatalog {
494    fn drop(&mut self) {
495        let _ = execute_with_jni_env(self.jvm, |env| {
496            call_method!(env, self.java_catalog.as_obj(), {void close()})
497                .with_context(|| "Failed to close iceberg catalog".to_owned())?;
498            Ok(())
499        })
500        .inspect_err(
501            |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
502        );
503    }
504}
505
506impl JniCatalog {
507    fn build(
508        file_io_props: HashMap<String, String>,
509        name: impl ToString,
510        catalog_impl: impl ToString,
511        java_catalog_props: HashMap<String, String>,
512    ) -> ConnectorResult<Self> {
513        let jvm = Jvm::get_or_init()?;
514
515        execute_with_jni_env(jvm, |env| {
516            // Convert props to string array
517            let props = env.new_object_array(
518                (java_catalog_props.len() * 2) as i32,
519                "java/lang/String",
520                JObject::null(),
521            )?;
522            for (i, (key, value)) in java_catalog_props.iter().enumerate() {
523                let key_j_str = env.new_string(key)?;
524                let value_j_str = env.new_string(value)?;
525                env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
526                env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
527            }
528
529            let jni_catalog_wrapper = env
530                .call_static_method(
531                    "com/risingwave/connector/catalog/JniCatalogWrapper",
532                    "create",
533                    "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
534                    &[
535                        (&env.new_string(name.to_string()).unwrap()).into(),
536                        (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
537                        (&props).into(),
538                    ],
539                )?;
540
541            let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
542
543            Ok(Self {
544                java_catalog: jni_catalog,
545                jvm,
546                file_io_props,
547            })
548        })
549            .map_err(Into::into)
550    }
551
552    pub fn build_catalog(
553        file_io_props: HashMap<String, String>,
554        name: impl ToString,
555        catalog_impl: impl ToString,
556        java_catalog_props: HashMap<String, String>,
557    ) -> ConnectorResult<Arc<dyn Catalog>> {
558        let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
559        Ok(Arc::new(catalog) as Arc<dyn Catalog>)
560    }
561}