risingwave_connector/connector_common/iceberg/
jni_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provide jni catalog.
16
17#![expect(
18    clippy::disallowed_types,
19    reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33    TableUpdate,
34};
35use itertools::Itertools;
36use jni::JavaVM;
37use jni::objects::{GlobalRef, JObject};
38use risingwave_common::bail;
39use risingwave_jni_core::call_method;
40use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env, jobj_to_str};
41use serde::{Deserialize, Serialize};
42use thiserror_ext::AsReport;
43
44use crate::error::ConnectorResult;
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48struct LoadTableResponse {
49    pub metadata_location: Option<String>,
50    pub metadata: TableMetadata,
51    pub _config: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize)]
55#[serde(rename_all = "kebab-case")]
56struct CreateTableRequest {
57    /// The name of the table.
58    pub name: String,
59    /// The location of the table.
60    pub location: Option<String>,
61    /// The schema of the table.
62    pub schema: Schema,
63    /// The partition spec of the table, could be None.
64    pub partition_spec: Option<UnboundPartitionSpec>,
65    /// The sort order of the table.
66    pub write_order: Option<SortOrder>,
67    /// The properties of the table.
68    pub properties: HashMap<String, String>,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72struct CommitTableRequest {
73    identifier: TableIdent,
74    requirements: Vec<TableRequirement>,
75    updates: Vec<TableUpdate>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79#[serde(rename_all = "kebab-case")]
80struct CommitTableResponse {
81    metadata_location: String,
82    metadata: TableMetadata,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86#[serde(rename_all = "kebab-case")]
87struct ListNamespacesResponse {
88    namespaces: Vec<NamespaceIdent>,
89    next_page_token: Option<String>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94struct ListTablesResponse {
95    identifiers: Vec<TableIdent>,
96    next_page_token: Option<String>,
97}
98
99impl From<&TableCreation> for CreateTableRequest {
100    fn from(value: &TableCreation) -> Self {
101        Self {
102            name: value.name.clone(),
103            location: value.location.clone(),
104            schema: value.schema.clone(),
105            partition_spec: value.partition_spec.clone(),
106            write_order: value.sort_order.clone(),
107            properties: value.properties.clone(),
108        }
109    }
110}
111
112#[derive(Debug)]
113pub struct JniCatalog {
114    java_catalog: GlobalRef,
115    jvm: &'static JavaVM,
116    file_io_props: HashMap<String, String>,
117}
118
119#[async_trait]
120impl Catalog for JniCatalog {
121    /// List namespaces from the catalog.
122    async fn list_namespaces(
123        &self,
124        _parent: Option<&NamespaceIdent>,
125    ) -> iceberg::Result<Vec<NamespaceIdent>> {
126        execute_with_jni_env(self.jvm, |env| {
127            let result_json =
128                call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
129                    .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
130
131            let rust_json_str = jobj_to_str(env, result_json)?;
132
133            let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
134
135            Ok(resp.namespaces)
136        })
137        .map_err(|e| {
138            iceberg::Error::new(
139                iceberg::ErrorKind::Unexpected,
140                "Failed to list iceberg namespaces.",
141            )
142            .with_source(e)
143        })
144    }
145
146    /// Create a new namespace inside the catalog.
147    async fn create_namespace(
148        &self,
149        namespace: &iceberg::NamespaceIdent,
150        _properties: HashMap<String, String>,
151    ) -> iceberg::Result<iceberg::Namespace> {
152        execute_with_jni_env(self.jvm, |env| {
153            let namespace_jstr = if namespace.is_empty() {
154                env.new_string("").unwrap()
155            } else {
156                if namespace.len() > 1 {
157                    bail!("Namespace with more than one level is not supported!")
158                }
159                env.new_string(&namespace[0]).unwrap()
160            };
161
162            call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
163                &namespace_jstr)
164            .with_context(|| format!("Failed to create namespace: {namespace}"))?;
165
166            Ok(Namespace::new(namespace.clone()))
167        })
168        .map_err(|e| {
169            iceberg::Error::new(
170                iceberg::ErrorKind::Unexpected,
171                "Failed to create namespace.",
172            )
173            .with_source(e)
174        })
175    }
176
177    /// Get a namespace information from the catalog.
178    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
179        todo!()
180    }
181
182    /// Check if namespace exists in catalog.
183    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
184        execute_with_jni_env(self.jvm, |env| {
185            let namespace_jstr = if namespace.is_empty() {
186                env.new_string("").unwrap()
187            } else {
188                if namespace.len() > 1 {
189                    bail!("Namespace with more than one level is not supported!")
190                }
191                env.new_string(&namespace[0]).unwrap()
192            };
193
194            let exists =
195                call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
196                &namespace_jstr)
197                .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
198
199            Ok(exists)
200        })
201        .map_err(|e| {
202            iceberg::Error::new(
203                iceberg::ErrorKind::Unexpected,
204                "Failed to check namespace exists.",
205            )
206            .with_source(e)
207        })
208    }
209
210    /// Drop a namespace from the catalog.
211    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
212        todo!()
213    }
214
215    /// List tables from namespace.
216    async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
217        execute_with_jni_env(self.jvm, |env| {
218            let namespace_jstr = if namespace.is_empty() {
219                env.new_string("").unwrap()
220            } else {
221                if namespace.len() > 1 {
222                    bail!("Namespace with more than one level is not supported!")
223                }
224                env.new_string(&namespace[0]).unwrap()
225            };
226
227            let result_json =
228                call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
229                &namespace_jstr)
230                .with_context(|| {
231                    format!("Failed to list iceberg tables in namespace: {}", namespace)
232                })?;
233
234            let rust_json_str = jobj_to_str(env, result_json)?;
235
236            let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
237
238            Ok(resp.identifiers)
239        })
240        .map_err(|e| {
241            iceberg::Error::new(
242                iceberg::ErrorKind::Unexpected,
243                "Failed to list iceberg  tables.",
244            )
245            .with_source(e)
246        })
247    }
248
249    async fn update_namespace(
250        &self,
251        _namespace: &NamespaceIdent,
252        _properties: HashMap<String, String>,
253    ) -> iceberg::Result<()> {
254        todo!()
255    }
256
257    /// Create a new table inside the namespace.
258    async fn create_table(
259        &self,
260        namespace: &NamespaceIdent,
261        creation: TableCreation,
262    ) -> iceberg::Result<Table> {
263        execute_with_jni_env(self.jvm, |env| {
264            let namespace_jstr = if namespace.is_empty() {
265                env.new_string("").unwrap()
266            } else {
267                if namespace.len() > 1 {
268                    bail!("Namespace with more than one level is not supported!")
269                }
270                env.new_string(&namespace[0]).unwrap()
271            };
272
273            let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
274
275            let creation_jstr = env.new_string(&creation_str).unwrap();
276
277            let result_json =
278                call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
279                &namespace_jstr, &creation_jstr)
280                .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
281
282            let rust_json_str = jobj_to_str(env, result_json)?;
283
284            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
285
286            let metadata_location = resp.metadata_location.ok_or_else(|| {
287                iceberg::Error::new(
288                    iceberg::ErrorKind::FeatureUnsupported,
289                    "Loading uncommitted table is not supported!",
290                )
291            })?;
292
293            let table_metadata = resp.metadata;
294
295            let file_io = FileIO::from_path(&metadata_location)?
296                .with_props(self.file_io_props.iter())
297                .build()?;
298
299            Ok(Table::builder()
300                .file_io(file_io)
301                .identifier(TableIdent::new(namespace.clone(), creation.name))
302                .metadata(table_metadata)
303                .build())
304        })
305        .map_err(|e| {
306            iceberg::Error::new(
307                iceberg::ErrorKind::Unexpected,
308                "Failed to create iceberg table.",
309            )
310            .with_source(e)
311        })?
312    }
313
314    /// Load table from the catalog.
315    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
316        execute_with_jni_env(self.jvm, |env| {
317            let table_name_str = format!(
318                "{}.{}",
319                table.namespace().clone().inner().into_iter().join("."),
320                table.name()
321            );
322
323            let table_name_jstr = env.new_string(&table_name_str).unwrap();
324
325            let result_json =
326                call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
327                &table_name_jstr)
328                .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
329
330            let rust_json_str = jobj_to_str(env, result_json)?;
331
332            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
333
334            let metadata_location = resp.metadata_location.ok_or_else(|| {
335                iceberg::Error::new(
336                    iceberg::ErrorKind::FeatureUnsupported,
337                    "Loading uncommitted table is not supported!",
338                )
339            })?;
340
341            tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
342
343            let table_metadata = resp.metadata;
344
345            let file_io = FileIO::from_path(&metadata_location)?
346                .with_props(self.file_io_props.iter())
347                .build()?;
348
349            Ok(Table::builder()
350                .file_io(file_io)
351                .identifier(table.clone())
352                .metadata(table_metadata)
353                .build())
354        })
355        .map_err(|e| {
356            iceberg::Error::new(
357                iceberg::ErrorKind::Unexpected,
358                "Failed to load iceberg table.",
359            )
360            .with_source(e)
361        })?
362    }
363
364    /// Drop a table from the catalog.
365    async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
366        execute_with_jni_env(self.jvm, |env| {
367            let table_name_str = format!(
368                "{}.{}",
369                table.namespace().clone().inner().into_iter().join("."),
370                table.name()
371            );
372
373            let table_name_jstr = env.new_string(&table_name_str).unwrap();
374
375            call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
376            &table_name_jstr)
377            .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
378
379            Ok(())
380        })
381        .map_err(|e| {
382            iceberg::Error::new(
383                iceberg::ErrorKind::Unexpected,
384                "Failed to drop iceberg table.",
385            )
386            .with_source(e)
387        })
388    }
389
390    /// Check if a table exists in the catalog.
391    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
392        execute_with_jni_env(self.jvm, |env| {
393            let table_name_str = format!(
394                "{}.{}",
395                table.namespace().clone().inner().into_iter().join("."),
396                table.name()
397            );
398
399            let table_name_jstr = env.new_string(&table_name_str).unwrap();
400
401            let exists =
402                call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
403                &table_name_jstr)
404                .with_context(|| {
405                    format!("Failed to check iceberg table exists: {table_name_str}")
406                })?;
407
408            Ok(exists)
409        })
410        .map_err(|e| {
411            iceberg::Error::new(
412                iceberg::ErrorKind::Unexpected,
413                "Failed to check iceberg table exists.",
414            )
415            .with_source(e)
416        })
417    }
418
419    /// Rename a table in the catalog.
420    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
421        todo!()
422    }
423
424    /// Update a table to the catalog.
425    async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
426        execute_with_jni_env(self.jvm, |env| {
427            let requirements = commit.take_requirements();
428            let updates = commit.take_updates();
429            let request = CommitTableRequest {
430                identifier: commit.identifier().clone(),
431                requirements,
432                updates,
433            };
434            let request_str = serde_json::to_string(&request)?;
435
436            let request_jni_str = env.new_string(&request_str).with_context(|| {
437                format!("Failed to create jni string from request json: {request_str}.")
438            })?;
439
440            let result_json =
441                call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
442                &request_jni_str)
443                .with_context(|| {
444                    format!("Failed to update iceberg table: {}", commit.identifier())
445                })?;
446
447            let rust_json_str = jobj_to_str(env, result_json)?;
448
449            let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
450
451            tracing::info!(
452                "Table metadata location of {} is {}",
453                commit.identifier(),
454                response.metadata_location
455            );
456
457            let table_metadata = response.metadata;
458
459            let file_io = FileIO::from_path(&response.metadata_location)?
460                .with_props(self.file_io_props.iter())
461                .build()?;
462
463            Ok(Table::builder()
464                .file_io(file_io)
465                .identifier(commit.identifier().clone())
466                .metadata(table_metadata)
467                .build()?)
468        })
469        .map_err(|e| {
470            iceberg::Error::new(
471                iceberg::ErrorKind::Unexpected,
472                "Failed to update iceberg table.",
473            )
474            .with_source(e)
475        })
476    }
477}
478
479impl Drop for JniCatalog {
480    fn drop(&mut self) {
481        let _ = execute_with_jni_env(self.jvm, |env| {
482            call_method!(env, self.java_catalog.as_obj(), {void close()})
483                .with_context(|| "Failed to close iceberg catalog".to_owned())?;
484            Ok(())
485        })
486        .inspect_err(
487            |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
488        );
489    }
490}
491
492impl JniCatalog {
493    fn build(
494        file_io_props: HashMap<String, String>,
495        name: impl ToString,
496        catalog_impl: impl ToString,
497        java_catalog_props: HashMap<String, String>,
498    ) -> ConnectorResult<Self> {
499        let jvm = JVM.get_or_init()?;
500
501        execute_with_jni_env(jvm, |env| {
502            // Convert props to string array
503            let props = env.new_object_array(
504                (java_catalog_props.len() * 2) as i32,
505                "java/lang/String",
506                JObject::null(),
507            )?;
508            for (i, (key, value)) in java_catalog_props.iter().enumerate() {
509                let key_j_str = env.new_string(key)?;
510                let value_j_str = env.new_string(value)?;
511                env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
512                env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
513            }
514
515            let jni_catalog_wrapper = env
516                .call_static_method(
517                    "com/risingwave/connector/catalog/JniCatalogWrapper",
518                    "create",
519                    "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
520                    &[
521                        (&env.new_string(name.to_string()).unwrap()).into(),
522                        (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
523                        (&props).into(),
524                    ],
525                )?;
526
527            let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
528
529            Ok(Self {
530                java_catalog: jni_catalog,
531                jvm,
532                file_io_props,
533            })
534        })
535            .map_err(Into::into)
536    }
537
538    pub fn build_catalog(
539        file_io_props: HashMap<String, String>,
540        name: impl ToString,
541        catalog_impl: impl ToString,
542        java_catalog_props: HashMap<String, String>,
543    ) -> ConnectorResult<Arc<dyn Catalog>> {
544        let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
545        Ok(Arc::new(catalog) as Arc<dyn Catalog>)
546    }
547}