risingwave_connector/connector_common/iceberg/
jni_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provide jni catalog.
16
17#![expect(
18    clippy::disallowed_types,
19    reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33    TableUpdate,
34};
35use itertools::Itertools;
36use jni::JavaVM;
37use jni::objects::{GlobalRef, JObject};
38use risingwave_common::bail;
39use risingwave_common::global_jvm::JVM;
40use risingwave_jni_core::call_method;
41use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
42use serde::{Deserialize, Serialize};
43use thiserror_ext::AsReport;
44
45use crate::error::ConnectorResult;
46
47#[derive(Debug, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49struct LoadTableResponse {
50    pub metadata_location: Option<String>,
51    pub metadata: TableMetadata,
52    pub _config: Option<HashMap<String, String>>,
53}
54
55#[derive(Debug, Serialize)]
56#[serde(rename_all = "kebab-case")]
57struct CreateTableRequest {
58    /// The name of the table.
59    pub name: String,
60    /// The location of the table.
61    pub location: Option<String>,
62    /// The schema of the table.
63    pub schema: Schema,
64    /// The partition spec of the table, could be None.
65    pub partition_spec: Option<UnboundPartitionSpec>,
66    /// The sort order of the table.
67    pub write_order: Option<SortOrder>,
68    /// The properties of the table.
69    pub properties: HashMap<String, String>,
70}
71
72#[derive(Debug, Serialize, Deserialize)]
73struct CommitTableRequest {
74    identifier: TableIdent,
75    requirements: Vec<TableRequirement>,
76    updates: Vec<TableUpdate>,
77}
78
79#[derive(Debug, Serialize, Deserialize)]
80#[serde(rename_all = "kebab-case")]
81struct CommitTableResponse {
82    metadata_location: String,
83    metadata: TableMetadata,
84}
85
86#[derive(Debug, Serialize, Deserialize)]
87#[serde(rename_all = "kebab-case")]
88struct ListNamespacesResponse {
89    namespaces: Vec<NamespaceIdent>,
90    next_page_token: Option<String>,
91}
92
93#[derive(Debug, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95struct ListTablesResponse {
96    identifiers: Vec<TableIdent>,
97    next_page_token: Option<String>,
98}
99
100impl From<&TableCreation> for CreateTableRequest {
101    fn from(value: &TableCreation) -> Self {
102        Self {
103            name: value.name.clone(),
104            location: value.location.clone(),
105            schema: value.schema.clone(),
106            partition_spec: value.partition_spec.clone(),
107            write_order: value.sort_order.clone(),
108            properties: value.properties.clone(),
109        }
110    }
111}
112
113#[derive(Debug)]
114pub struct JniCatalog {
115    java_catalog: GlobalRef,
116    jvm: &'static JavaVM,
117    file_io_props: HashMap<String, String>,
118}
119
120#[async_trait]
121impl Catalog for JniCatalog {
122    /// List namespaces from the catalog.
123    async fn list_namespaces(
124        &self,
125        _parent: Option<&NamespaceIdent>,
126    ) -> iceberg::Result<Vec<NamespaceIdent>> {
127        execute_with_jni_env(self.jvm, |env| {
128            let result_json =
129                call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
130                    .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
131
132            let rust_json_str = jobj_to_str(env, result_json)?;
133
134            let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
135
136            Ok(resp.namespaces)
137        })
138        .map_err(|e| {
139            iceberg::Error::new(
140                iceberg::ErrorKind::Unexpected,
141                "Failed to list iceberg namespaces.",
142            )
143            .with_source(e)
144        })
145    }
146
147    /// Create a new namespace inside the catalog.
148    async fn create_namespace(
149        &self,
150        namespace: &iceberg::NamespaceIdent,
151        _properties: HashMap<String, String>,
152    ) -> iceberg::Result<iceberg::Namespace> {
153        execute_with_jni_env(self.jvm, |env| {
154            let namespace_jstr = if namespace.is_empty() {
155                env.new_string("").unwrap()
156            } else {
157                if namespace.len() > 1 {
158                    bail!("Namespace with more than one level is not supported!")
159                }
160                env.new_string(&namespace[0]).unwrap()
161            };
162
163            call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
164                &namespace_jstr)
165            .with_context(|| format!("Failed to create namespace: {namespace}"))?;
166
167            Ok(Namespace::new(namespace.clone()))
168        })
169        .map_err(|e| {
170            iceberg::Error::new(
171                iceberg::ErrorKind::Unexpected,
172                "Failed to create namespace.",
173            )
174            .with_source(e)
175        })
176    }
177
178    /// Get a namespace information from the catalog.
179    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
180        todo!()
181    }
182
183    /// Check if namespace exists in catalog.
184    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
185        execute_with_jni_env(self.jvm, |env| {
186            let namespace_jstr = if namespace.is_empty() {
187                env.new_string("").unwrap()
188            } else {
189                if namespace.len() > 1 {
190                    bail!("Namespace with more than one level is not supported!")
191                }
192                env.new_string(&namespace[0]).unwrap()
193            };
194
195            let exists =
196                call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
197                &namespace_jstr)
198                .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
199
200            Ok(exists)
201        })
202        .map_err(|e| {
203            iceberg::Error::new(
204                iceberg::ErrorKind::Unexpected,
205                "Failed to check namespace exists.",
206            )
207            .with_source(e)
208        })
209    }
210
211    /// Drop a namespace from the catalog.
212    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
213        todo!()
214    }
215
216    /// List tables from namespace.
217    async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
218        execute_with_jni_env(self.jvm, |env| {
219            let namespace_jstr = if namespace.is_empty() {
220                env.new_string("").unwrap()
221            } else {
222                if namespace.len() > 1 {
223                    bail!("Namespace with more than one level is not supported!")
224                }
225                env.new_string(&namespace[0]).unwrap()
226            };
227
228            let result_json =
229                call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
230                &namespace_jstr)
231                .with_context(|| {
232                    format!("Failed to list iceberg tables in namespace: {}", namespace)
233                })?;
234
235            let rust_json_str = jobj_to_str(env, result_json)?;
236
237            let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
238
239            Ok(resp.identifiers)
240        })
241        .map_err(|e| {
242            iceberg::Error::new(
243                iceberg::ErrorKind::Unexpected,
244                "Failed to list iceberg  tables.",
245            )
246            .with_source(e)
247        })
248    }
249
250    async fn update_namespace(
251        &self,
252        _namespace: &NamespaceIdent,
253        _properties: HashMap<String, String>,
254    ) -> iceberg::Result<()> {
255        todo!()
256    }
257
258    /// Create a new table inside the namespace.
259    async fn create_table(
260        &self,
261        namespace: &NamespaceIdent,
262        creation: TableCreation,
263    ) -> iceberg::Result<Table> {
264        execute_with_jni_env(self.jvm, |env| {
265            let namespace_jstr = if namespace.is_empty() {
266                env.new_string("").unwrap()
267            } else {
268                if namespace.len() > 1 {
269                    bail!("Namespace with more than one level is not supported!")
270                }
271                env.new_string(&namespace[0]).unwrap()
272            };
273
274            let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
275
276            let creation_jstr = env.new_string(&creation_str).unwrap();
277
278            let result_json =
279                call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
280                &namespace_jstr, &creation_jstr)
281                .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
282
283            let rust_json_str = jobj_to_str(env, result_json)?;
284
285            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
286
287            let metadata_location = resp.metadata_location.ok_or_else(|| {
288                iceberg::Error::new(
289                    iceberg::ErrorKind::FeatureUnsupported,
290                    "Loading uncommitted table is not supported!",
291                )
292            })?;
293
294            let table_metadata = resp.metadata;
295
296            let file_io = FileIO::from_path(&metadata_location)?
297                .with_props(self.file_io_props.iter())
298                .build()?;
299
300            Ok(Table::builder()
301                .file_io(file_io)
302                .identifier(TableIdent::new(namespace.clone(), creation.name))
303                .metadata(table_metadata)
304                .build())
305        })
306        .map_err(|e| {
307            iceberg::Error::new(
308                iceberg::ErrorKind::Unexpected,
309                "Failed to create iceberg table.",
310            )
311            .with_source(e)
312        })?
313    }
314
315    /// Load table from the catalog.
316    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
317        execute_with_jni_env(self.jvm, |env| {
318            let table_name_str = format!(
319                "{}.{}",
320                table.namespace().clone().inner().into_iter().join("."),
321                table.name()
322            );
323
324            let table_name_jstr = env.new_string(&table_name_str).unwrap();
325
326            let result_json =
327                call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
328                &table_name_jstr)
329                .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
330
331            let rust_json_str = jobj_to_str(env, result_json)?;
332
333            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
334
335            let metadata_location = resp.metadata_location.ok_or_else(|| {
336                iceberg::Error::new(
337                    iceberg::ErrorKind::FeatureUnsupported,
338                    "Loading uncommitted table is not supported!",
339                )
340            })?;
341
342            tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
343
344            let table_metadata = resp.metadata;
345
346            let file_io = FileIO::from_path(&metadata_location)?
347                .with_props(self.file_io_props.iter())
348                .build()?;
349
350            Ok(Table::builder()
351                .file_io(file_io)
352                .identifier(table.clone())
353                .metadata(table_metadata)
354                .build())
355        })
356        .map_err(|e| {
357            iceberg::Error::new(
358                iceberg::ErrorKind::Unexpected,
359                "Failed to load iceberg table.",
360            )
361            .with_source(e)
362        })?
363    }
364
365    /// Drop a table from the catalog.
366    async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
367        let jvm = self.jvm;
368        let table = table.to_owned();
369        let java_catalog = self.java_catalog.clone();
370        // spawn blocking the drop table task, since dropping a table by default would purge the data which may take a long time.
371        tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
372            execute_with_jni_env(jvm, |env| {
373                let table_name_str = format!(
374                    "{}.{}",
375                    table.namespace().clone().inner().into_iter().join("."),
376                    table.name()
377                );
378
379                let table_name_jstr = env.new_string(&table_name_str).unwrap();
380
381                call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
382                &table_name_jstr)
383                .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
384
385                Ok(())
386            })
387            .map_err(|e| {
388                iceberg::Error::new(
389                    iceberg::ErrorKind::Unexpected,
390                    "Failed to drop iceberg table.",
391                )
392                .with_source(e)
393            })
394        })
395        .await
396        .map_err(|e| {
397            iceberg::Error::new(
398                iceberg::ErrorKind::Unexpected,
399                "Failed to drop iceberg table.",
400            )
401            .with_source(e)
402        })?
403    }
404
405    /// Check if a table exists in the catalog.
406    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
407        execute_with_jni_env(self.jvm, |env| {
408            let table_name_str = format!(
409                "{}.{}",
410                table.namespace().clone().inner().into_iter().join("."),
411                table.name()
412            );
413
414            let table_name_jstr = env.new_string(&table_name_str).unwrap();
415
416            let exists =
417                call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
418                &table_name_jstr)
419                .with_context(|| {
420                    format!("Failed to check iceberg table exists: {table_name_str}")
421                })?;
422
423            Ok(exists)
424        })
425        .map_err(|e| {
426            iceberg::Error::new(
427                iceberg::ErrorKind::Unexpected,
428                "Failed to check iceberg table exists.",
429            )
430            .with_source(e)
431        })
432    }
433
434    /// Rename a table in the catalog.
435    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
436        todo!()
437    }
438
439    /// Update a table to the catalog.
440    async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
441        execute_with_jni_env(self.jvm, |env| {
442            let requirements = commit.take_requirements();
443            let updates = commit.take_updates();
444            let request = CommitTableRequest {
445                identifier: commit.identifier().clone(),
446                requirements,
447                updates,
448            };
449            let request_str = serde_json::to_string(&request)?;
450
451            let request_jni_str = env.new_string(&request_str).with_context(|| {
452                format!("Failed to create jni string from request json: {request_str}.")
453            })?;
454
455            let result_json =
456                call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
457                &request_jni_str)
458                .with_context(|| {
459                    format!("Failed to update iceberg table: {}", commit.identifier())
460                })?;
461
462            let rust_json_str = jobj_to_str(env, result_json)?;
463
464            let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
465
466            tracing::info!(
467                "Table metadata location of {} is {}",
468                commit.identifier(),
469                response.metadata_location
470            );
471
472            let table_metadata = response.metadata;
473
474            let file_io = FileIO::from_path(&response.metadata_location)?
475                .with_props(self.file_io_props.iter())
476                .build()?;
477
478            Ok(Table::builder()
479                .file_io(file_io)
480                .identifier(commit.identifier().clone())
481                .metadata(table_metadata)
482                .build()?)
483        })
484        .map_err(|e| {
485            iceberg::Error::new(
486                iceberg::ErrorKind::Unexpected,
487                "Failed to update iceberg table.",
488            )
489            .with_source(e)
490        })
491    }
492}
493
494impl Drop for JniCatalog {
495    fn drop(&mut self) {
496        let _ = execute_with_jni_env(self.jvm, |env| {
497            call_method!(env, self.java_catalog.as_obj(), {void close()})
498                .with_context(|| "Failed to close iceberg catalog".to_owned())?;
499            Ok(())
500        })
501        .inspect_err(
502            |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
503        );
504    }
505}
506
507impl JniCatalog {
508    fn build(
509        file_io_props: HashMap<String, String>,
510        name: impl ToString,
511        catalog_impl: impl ToString,
512        java_catalog_props: HashMap<String, String>,
513    ) -> ConnectorResult<Self> {
514        let jvm = JVM.get_or_init()?;
515
516        execute_with_jni_env(jvm, |env| {
517            // Convert props to string array
518            let props = env.new_object_array(
519                (java_catalog_props.len() * 2) as i32,
520                "java/lang/String",
521                JObject::null(),
522            )?;
523            for (i, (key, value)) in java_catalog_props.iter().enumerate() {
524                let key_j_str = env.new_string(key)?;
525                let value_j_str = env.new_string(value)?;
526                env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
527                env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
528            }
529
530            let jni_catalog_wrapper = env
531                .call_static_method(
532                    "com/risingwave/connector/catalog/JniCatalogWrapper",
533                    "create",
534                    "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
535                    &[
536                        (&env.new_string(name.to_string()).unwrap()).into(),
537                        (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
538                        (&props).into(),
539                    ],
540                )?;
541
542            let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
543
544            Ok(Self {
545                java_catalog: jni_catalog,
546                jvm,
547                file_io_props,
548            })
549        })
550            .map_err(Into::into)
551    }
552
553    pub fn build_catalog(
554        file_io_props: HashMap<String, String>,
555        name: impl ToString,
556        catalog_impl: impl ToString,
557        java_catalog_props: HashMap<String, String>,
558    ) -> ConnectorResult<Arc<dyn Catalog>> {
559        let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
560        Ok(Arc::new(catalog) as Arc<dyn Catalog>)
561    }
562}