risingwave_connector/connector_common/iceberg/
jni_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provide jni catalog.
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use anyhow::Context;
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
25use iceberg::table::Table;
26use iceberg::{
27    Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
28    TableUpdate,
29};
30use itertools::Itertools;
31use jni::JavaVM;
32use jni::objects::{GlobalRef, JObject};
33use risingwave_common::bail;
34use risingwave_jni_core::call_method;
35use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env, jobj_to_str};
36use serde::{Deserialize, Serialize};
37use thiserror_ext::AsReport;
38
39use crate::error::ConnectorResult;
40
41#[derive(Debug, Deserialize)]
42#[serde(rename_all = "kebab-case")]
43struct LoadTableResponse {
44    pub metadata_location: Option<String>,
45    pub metadata: TableMetadata,
46    pub _config: Option<HashMap<String, String>>,
47}
48
49#[derive(Debug, Serialize)]
50#[serde(rename_all = "kebab-case")]
51struct CreateTableRequest {
52    /// The name of the table.
53    pub name: String,
54    /// The location of the table.
55    pub location: Option<String>,
56    /// The schema of the table.
57    pub schema: Schema,
58    /// The partition spec of the table, could be None.
59    pub partition_spec: Option<UnboundPartitionSpec>,
60    /// The sort order of the table.
61    pub write_order: Option<SortOrder>,
62    /// The properties of the table.
63    pub properties: HashMap<String, String>,
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67struct CommitTableRequest {
68    identifier: TableIdent,
69    requirements: Vec<TableRequirement>,
70    updates: Vec<TableUpdate>,
71}
72
73#[derive(Debug, Serialize, Deserialize)]
74#[serde(rename_all = "kebab-case")]
75struct CommitTableResponse {
76    metadata_location: String,
77    metadata: TableMetadata,
78}
79
80#[derive(Debug, Serialize, Deserialize)]
81#[serde(rename_all = "kebab-case")]
82struct ListNamespacesResponse {
83    namespaces: Vec<NamespaceIdent>,
84    next_page_token: Option<String>,
85}
86
87#[derive(Debug, Serialize, Deserialize)]
88#[serde(rename_all = "kebab-case")]
89struct ListTablesResponse {
90    identifiers: Vec<TableIdent>,
91    next_page_token: Option<String>,
92}
93
94impl From<&TableCreation> for CreateTableRequest {
95    fn from(value: &TableCreation) -> Self {
96        Self {
97            name: value.name.clone(),
98            location: value.location.clone(),
99            schema: value.schema.clone(),
100            partition_spec: value.partition_spec.clone(),
101            write_order: value.sort_order.clone(),
102            properties: value.properties.clone(),
103        }
104    }
105}
106
107#[derive(Debug)]
108pub struct JniCatalog {
109    java_catalog: GlobalRef,
110    jvm: &'static JavaVM,
111    file_io_props: HashMap<String, String>,
112}
113
114#[async_trait]
115impl Catalog for JniCatalog {
116    /// List namespaces from the catalog.
117    async fn list_namespaces(
118        &self,
119        _parent: Option<&NamespaceIdent>,
120    ) -> iceberg::Result<Vec<NamespaceIdent>> {
121        execute_with_jni_env(self.jvm, |env| {
122            let result_json =
123                call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
124                    .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
125
126            let rust_json_str = jobj_to_str(env, result_json)?;
127
128            let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
129
130            Ok(resp.namespaces)
131        })
132        .map_err(|e| {
133            iceberg::Error::new(
134                iceberg::ErrorKind::Unexpected,
135                "Failed to list iceberg namespaces.",
136            )
137            .with_source(e)
138        })
139    }
140
141    /// Create a new namespace inside the catalog.
142    async fn create_namespace(
143        &self,
144        namespace: &iceberg::NamespaceIdent,
145        _properties: HashMap<String, String>,
146    ) -> iceberg::Result<iceberg::Namespace> {
147        execute_with_jni_env(self.jvm, |env| {
148            let namespace_jstr = if namespace.is_empty() {
149                env.new_string("").unwrap()
150            } else {
151                if namespace.len() > 1 {
152                    bail!("Namespace with more than one level is not supported!")
153                }
154                env.new_string(&namespace[0]).unwrap()
155            };
156
157            call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
158                &namespace_jstr)
159            .with_context(|| format!("Failed to create namespace: {namespace}"))?;
160
161            Ok(Namespace::new(namespace.clone()))
162        })
163        .map_err(|e| {
164            iceberg::Error::new(
165                iceberg::ErrorKind::Unexpected,
166                "Failed to create namespace.",
167            )
168            .with_source(e)
169        })
170    }
171
172    /// Get a namespace information from the catalog.
173    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
174        todo!()
175    }
176
177    /// Check if namespace exists in catalog.
178    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
179        execute_with_jni_env(self.jvm, |env| {
180            let namespace_jstr = if namespace.is_empty() {
181                env.new_string("").unwrap()
182            } else {
183                if namespace.len() > 1 {
184                    bail!("Namespace with more than one level is not supported!")
185                }
186                env.new_string(&namespace[0]).unwrap()
187            };
188
189            let exists =
190                call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
191                &namespace_jstr)
192                .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
193
194            Ok(exists)
195        })
196        .map_err(|e| {
197            iceberg::Error::new(
198                iceberg::ErrorKind::Unexpected,
199                "Failed to check namespace exists.",
200            )
201            .with_source(e)
202        })
203    }
204
205    /// Drop a namespace from the catalog.
206    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
207        todo!()
208    }
209
210    /// List tables from namespace.
211    async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
212        execute_with_jni_env(self.jvm, |env| {
213            let namespace_jstr = if namespace.is_empty() {
214                env.new_string("").unwrap()
215            } else {
216                if namespace.len() > 1 {
217                    bail!("Namespace with more than one level is not supported!")
218                }
219                env.new_string(&namespace[0]).unwrap()
220            };
221
222            let result_json =
223                call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
224                &namespace_jstr)
225                .with_context(|| {
226                    format!("Failed to list iceberg tables in namespace: {}", namespace)
227                })?;
228
229            let rust_json_str = jobj_to_str(env, result_json)?;
230
231            let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
232
233            Ok(resp.identifiers)
234        })
235        .map_err(|e| {
236            iceberg::Error::new(
237                iceberg::ErrorKind::Unexpected,
238                "Failed to list iceberg  tables.",
239            )
240            .with_source(e)
241        })
242    }
243
244    async fn update_namespace(
245        &self,
246        _namespace: &NamespaceIdent,
247        _properties: HashMap<String, String>,
248    ) -> iceberg::Result<()> {
249        todo!()
250    }
251
252    /// Create a new table inside the namespace.
253    async fn create_table(
254        &self,
255        namespace: &NamespaceIdent,
256        creation: TableCreation,
257    ) -> iceberg::Result<Table> {
258        execute_with_jni_env(self.jvm, |env| {
259            let namespace_jstr = if namespace.is_empty() {
260                env.new_string("").unwrap()
261            } else {
262                if namespace.len() > 1 {
263                    bail!("Namespace with more than one level is not supported!")
264                }
265                env.new_string(&namespace[0]).unwrap()
266            };
267
268            let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
269
270            let creation_jstr = env.new_string(&creation_str).unwrap();
271
272            let result_json =
273                call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
274                &namespace_jstr, &creation_jstr)
275                .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
276
277            let rust_json_str = jobj_to_str(env, result_json)?;
278
279            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
280
281            let metadata_location = resp.metadata_location.ok_or_else(|| {
282                iceberg::Error::new(
283                    iceberg::ErrorKind::FeatureUnsupported,
284                    "Loading uncommitted table is not supported!",
285                )
286            })?;
287
288            let table_metadata = resp.metadata;
289
290            let file_io = FileIO::from_path(&metadata_location)?
291                .with_props(self.file_io_props.iter())
292                .build()?;
293
294            Ok(Table::builder()
295                .file_io(file_io)
296                .identifier(TableIdent::new(namespace.clone(), creation.name))
297                .metadata(table_metadata)
298                .build())
299        })
300        .map_err(|e| {
301            iceberg::Error::new(
302                iceberg::ErrorKind::Unexpected,
303                "Failed to create iceberg table.",
304            )
305            .with_source(e)
306        })?
307    }
308
309    /// Load table from the catalog.
310    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
311        execute_with_jni_env(self.jvm, |env| {
312            let table_name_str = format!(
313                "{}.{}",
314                table.namespace().clone().inner().into_iter().join("."),
315                table.name()
316            );
317
318            let table_name_jstr = env.new_string(&table_name_str).unwrap();
319
320            let result_json =
321                call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
322                &table_name_jstr)
323                .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
324
325            let rust_json_str = jobj_to_str(env, result_json)?;
326
327            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
328
329            let metadata_location = resp.metadata_location.ok_or_else(|| {
330                iceberg::Error::new(
331                    iceberg::ErrorKind::FeatureUnsupported,
332                    "Loading uncommitted table is not supported!",
333                )
334            })?;
335
336            tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
337
338            let table_metadata = resp.metadata;
339
340            let file_io = FileIO::from_path(&metadata_location)?
341                .with_props(self.file_io_props.iter())
342                .build()?;
343
344            Ok(Table::builder()
345                .file_io(file_io)
346                .identifier(table.clone())
347                .metadata(table_metadata)
348                .build())
349        })
350        .map_err(|e| {
351            iceberg::Error::new(
352                iceberg::ErrorKind::Unexpected,
353                "Failed to load iceberg table.",
354            )
355            .with_source(e)
356        })?
357    }
358
359    /// Drop a table from the catalog.
360    async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
361        execute_with_jni_env(self.jvm, |env| {
362            let table_name_str = format!(
363                "{}.{}",
364                table.namespace().clone().inner().into_iter().join("."),
365                table.name()
366            );
367
368            let table_name_jstr = env.new_string(&table_name_str).unwrap();
369
370            call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
371            &table_name_jstr)
372            .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
373
374            Ok(())
375        })
376        .map_err(|e| {
377            iceberg::Error::new(
378                iceberg::ErrorKind::Unexpected,
379                "Failed to drop iceberg table.",
380            )
381            .with_source(e)
382        })
383    }
384
385    /// Check if a table exists in the catalog.
386    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
387        execute_with_jni_env(self.jvm, |env| {
388            let table_name_str = format!(
389                "{}.{}",
390                table.namespace().clone().inner().into_iter().join("."),
391                table.name()
392            );
393
394            let table_name_jstr = env.new_string(&table_name_str).unwrap();
395
396            let exists =
397                call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
398                &table_name_jstr)
399                .with_context(|| {
400                    format!("Failed to check iceberg table exists: {table_name_str}")
401                })?;
402
403            Ok(exists)
404        })
405        .map_err(|e| {
406            iceberg::Error::new(
407                iceberg::ErrorKind::Unexpected,
408                "Failed to check iceberg table exists.",
409            )
410            .with_source(e)
411        })
412    }
413
414    /// Rename a table in the catalog.
415    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
416        todo!()
417    }
418
419    /// Update a table to the catalog.
420    async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
421        execute_with_jni_env(self.jvm, |env| {
422            let requirements = commit.take_requirements();
423            let updates = commit.take_updates();
424            let request = CommitTableRequest {
425                identifier: commit.identifier().clone(),
426                requirements,
427                updates,
428            };
429            let request_str = serde_json::to_string(&request)?;
430
431            let request_jni_str = env.new_string(&request_str).with_context(|| {
432                format!("Failed to create jni string from request json: {request_str}.")
433            })?;
434
435            let result_json =
436                call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
437                &request_jni_str)
438                .with_context(|| {
439                    format!("Failed to update iceberg table: {}", commit.identifier())
440                })?;
441
442            let rust_json_str = jobj_to_str(env, result_json)?;
443
444            let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
445
446            tracing::info!(
447                "Table metadata location of {} is {}",
448                commit.identifier(),
449                response.metadata_location
450            );
451
452            let table_metadata = response.metadata;
453
454            let file_io = FileIO::from_path(&response.metadata_location)?
455                .with_props(self.file_io_props.iter())
456                .build()?;
457
458            Ok(Table::builder()
459                .file_io(file_io)
460                .identifier(commit.identifier().clone())
461                .metadata(table_metadata)
462                .build()?)
463        })
464        .map_err(|e| {
465            iceberg::Error::new(
466                iceberg::ErrorKind::Unexpected,
467                "Failed to update iceberg table.",
468            )
469            .with_source(e)
470        })
471    }
472}
473
474impl Drop for JniCatalog {
475    fn drop(&mut self) {
476        let _ = execute_with_jni_env(self.jvm, |env| {
477            call_method!(env, self.java_catalog.as_obj(), {void close()})
478                .with_context(|| "Failed to close iceberg catalog".to_owned())?;
479            Ok(())
480        })
481        .inspect_err(
482            |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
483        );
484    }
485}
486
487impl JniCatalog {
488    fn build(
489        file_io_props: HashMap<String, String>,
490        name: impl ToString,
491        catalog_impl: impl ToString,
492        java_catalog_props: HashMap<String, String>,
493    ) -> ConnectorResult<Self> {
494        let jvm = JVM.get_or_init()?;
495
496        execute_with_jni_env(jvm, |env| {
497            // Convert props to string array
498            let props = env.new_object_array(
499                (java_catalog_props.len() * 2) as i32,
500                "java/lang/String",
501                JObject::null(),
502            )?;
503            for (i, (key, value)) in java_catalog_props.iter().enumerate() {
504                let key_j_str = env.new_string(key)?;
505                let value_j_str = env.new_string(value)?;
506                env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
507                env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
508            }
509
510            let jni_catalog_wrapper = env
511                .call_static_method(
512                    "com/risingwave/connector/catalog/JniCatalogWrapper",
513                    "create",
514                    "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
515                    &[
516                        (&env.new_string(name.to_string()).unwrap()).into(),
517                        (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
518                        (&props).into(),
519                    ],
520                )?;
521
522            let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
523
524            Ok(Self {
525                java_catalog: jni_catalog,
526                jvm,
527                file_io_props,
528            })
529        })
530            .map_err(Into::into)
531    }
532
533    pub fn build_catalog(
534        file_io_props: HashMap<String, String>,
535        name: impl ToString,
536        catalog_impl: impl ToString,
537        java_catalog_props: HashMap<String, String>,
538    ) -> ConnectorResult<Arc<dyn Catalog>> {
539        let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
540        Ok(Arc::new(catalog) as Arc<dyn Catalog>)
541    }
542}