risingwave_connector/connector_common/iceberg/
jni_catalog.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provide jni catalog.
16
17#![expect(
18    clippy::disallowed_types,
19    reason = "construct iceberg::Error to implement the trait"
20)]
21
22use std::collections::HashMap;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use anyhow::Context;
27use async_trait::async_trait;
28use iceberg::io::FileIO;
29use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
33    TableUpdate,
34};
35use itertools::Itertools;
36use jni::objects::{GlobalRef, JObject};
37use risingwave_common::global_jvm::Jvm;
38use risingwave_jni_core::call_method;
39use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str};
40use serde::{Deserialize, Serialize};
41use thiserror_ext::AsReport;
42
43use crate::error::ConnectorResult;
44
45#[derive(Debug, Deserialize)]
46#[serde(rename_all = "kebab-case")]
47struct LoadTableResponse {
48    pub metadata_location: Option<String>,
49    pub metadata: TableMetadata,
50    pub _config: Option<HashMap<String, String>>,
51}
52
53#[derive(Debug, Serialize)]
54#[serde(rename_all = "kebab-case")]
55struct CreateTableRequest {
56    /// The name of the table.
57    pub name: String,
58    /// The location of the table.
59    pub location: Option<String>,
60    /// The schema of the table.
61    pub schema: Schema,
62    /// The partition spec of the table, could be None.
63    pub partition_spec: Option<UnboundPartitionSpec>,
64    /// The sort order of the table.
65    pub write_order: Option<SortOrder>,
66    /// The properties of the table.
67    pub properties: HashMap<String, String>,
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71struct CommitTableRequest {
72    identifier: TableIdent,
73    requirements: Vec<TableRequirement>,
74    updates: Vec<TableUpdate>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78#[serde(rename_all = "kebab-case")]
79struct CommitTableResponse {
80    metadata_location: String,
81    metadata: TableMetadata,
82}
83
84#[derive(Debug, Serialize, Deserialize)]
85#[serde(rename_all = "kebab-case")]
86struct ListNamespacesResponse {
87    namespaces: Vec<NamespaceIdent>,
88    next_page_token: Option<String>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
92#[serde(rename_all = "kebab-case")]
93struct ListTablesResponse {
94    identifiers: Vec<TableIdent>,
95    next_page_token: Option<String>,
96}
97
98impl From<&TableCreation> for CreateTableRequest {
99    fn from(value: &TableCreation) -> Self {
100        Self {
101            name: value.name.clone(),
102            location: value.location.clone(),
103            schema: value.schema.clone(),
104            partition_spec: value.partition_spec.clone(),
105            write_order: value.sort_order.clone(),
106            properties: value.properties.clone(),
107        }
108    }
109}
110
111fn namespace_to_string(namespace: &NamespaceIdent) -> String {
112    namespace.iter().join(".")
113}
114
115#[derive(Debug)]
116pub struct JniCatalog {
117    java_catalog: GlobalRef,
118    jvm: Jvm,
119    file_io_props: HashMap<String, String>,
120}
121
122#[async_trait]
123impl Catalog for JniCatalog {
124    /// List namespaces from the catalog.
125    async fn list_namespaces(
126        &self,
127        _parent: Option<&NamespaceIdent>,
128    ) -> iceberg::Result<Vec<NamespaceIdent>> {
129        execute_with_jni_env(self.jvm, |env| {
130            let result_json =
131                call_method!(env, self.java_catalog.as_obj(), {String listNamespaces()})
132                    .with_context(|| "Failed to list iceberg namespaces".to_owned())?;
133
134            let rust_json_str = jobj_to_str(env, result_json)?;
135
136            let resp: ListNamespacesResponse = serde_json::from_str(&rust_json_str)?;
137
138            Ok(resp.namespaces)
139        })
140        .map_err(|e| {
141            iceberg::Error::new(
142                iceberg::ErrorKind::Unexpected,
143                "Failed to list iceberg namespaces.",
144            )
145            .with_source(e)
146        })
147    }
148
149    /// Create a new namespace inside the catalog.
150    async fn create_namespace(
151        &self,
152        namespace: &iceberg::NamespaceIdent,
153        _properties: HashMap<String, String>,
154    ) -> iceberg::Result<iceberg::Namespace> {
155        execute_with_jni_env(self.jvm, |env| {
156            let namespace_str = namespace_to_string(namespace);
157            let namespace_jstr = env.new_string(&namespace_str).unwrap();
158
159            call_method!(env, self.java_catalog.as_obj(), {void createNamespace(String)},
160                &namespace_jstr)
161            .with_context(|| format!("Failed to create namespace: {namespace}"))?;
162
163            Ok(Namespace::new(namespace.clone()))
164        })
165        .map_err(|e| {
166            iceberg::Error::new(
167                iceberg::ErrorKind::Unexpected,
168                "Failed to create namespace.",
169            )
170            .with_source(e)
171        })
172    }
173
174    /// Get a namespace information from the catalog.
175    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
176        todo!()
177    }
178
179    /// Check if namespace exists in catalog.
180    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> iceberg::Result<bool> {
181        execute_with_jni_env(self.jvm, |env| {
182            let namespace_str = namespace_to_string(namespace);
183            let namespace_jstr = env.new_string(&namespace_str).unwrap();
184
185            let exists =
186                call_method!(env, self.java_catalog.as_obj(), {boolean namespaceExists(String)},
187                &namespace_jstr)
188                .with_context(|| format!("Failed to check namespace exists: {namespace}"))?;
189
190            Ok(exists)
191        })
192        .map_err(|e| {
193            iceberg::Error::new(
194                iceberg::ErrorKind::Unexpected,
195                "Failed to check namespace exists.",
196            )
197            .with_source(e)
198        })
199    }
200
201    /// Drop a namespace from the catalog.
202    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
203        todo!()
204    }
205
206    /// List tables from namespace.
207    async fn list_tables(&self, namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
208        execute_with_jni_env(self.jvm, |env| {
209            let namespace_str = namespace_to_string(namespace);
210            let namespace_jstr = env.new_string(&namespace_str).unwrap();
211
212            let result_json =
213                call_method!(env, self.java_catalog.as_obj(), {String listTables(String)},
214                &namespace_jstr)
215                .with_context(|| {
216                    format!("Failed to list iceberg tables in namespace: {}", namespace)
217                })?;
218
219            let rust_json_str = jobj_to_str(env, result_json)?;
220
221            let resp: ListTablesResponse = serde_json::from_str(&rust_json_str)?;
222
223            Ok(resp.identifiers)
224        })
225        .map_err(|e| {
226            iceberg::Error::new(
227                iceberg::ErrorKind::Unexpected,
228                "Failed to list iceberg  tables.",
229            )
230            .with_source(e)
231        })
232    }
233
234    async fn update_namespace(
235        &self,
236        _namespace: &NamespaceIdent,
237        _properties: HashMap<String, String>,
238    ) -> iceberg::Result<()> {
239        todo!()
240    }
241
242    /// Create a new table inside the namespace.
243    async fn create_table(
244        &self,
245        namespace: &NamespaceIdent,
246        creation: TableCreation,
247    ) -> iceberg::Result<Table> {
248        execute_with_jni_env(self.jvm, |env| {
249            let namespace_str = namespace_to_string(namespace);
250            let namespace_jstr = env.new_string(&namespace_str).unwrap();
251
252            let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;
253
254            let creation_jstr = env.new_string(&creation_str).unwrap();
255
256            let result_json =
257                call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
258                &namespace_jstr, &creation_jstr)
259                .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;
260
261            let rust_json_str = jobj_to_str(env, result_json)?;
262
263            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
264
265            let metadata_location = resp.metadata_location.ok_or_else(|| {
266                iceberg::Error::new(
267                    iceberg::ErrorKind::FeatureUnsupported,
268                    "Loading uncommitted table is not supported!",
269                )
270            })?;
271
272            let table_metadata = resp.metadata;
273
274            let file_io = FileIO::from_path(&metadata_location)?
275                .with_props(self.file_io_props.iter())
276                .build()?;
277
278            Ok(Table::builder()
279                .file_io(file_io)
280                .identifier(TableIdent::new(namespace.clone(), creation.name))
281                .metadata(table_metadata)
282                .build())
283        })
284        .map_err(|e| {
285            iceberg::Error::new(
286                iceberg::ErrorKind::Unexpected,
287                "Failed to create iceberg table.",
288            )
289            .with_source(e)
290        })?
291    }
292
293    /// Load table from the catalog.
294    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
295        execute_with_jni_env(self.jvm, |env| {
296            let table_name_str = table.to_string();
297
298            let table_name_jstr = env.new_string(&table_name_str).unwrap();
299
300            let result_json =
301                call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
302                &table_name_jstr)
303                .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;
304
305            let rust_json_str = jobj_to_str(env, result_json)?;
306
307            let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;
308
309            let metadata_location = resp.metadata_location.ok_or_else(|| {
310                iceberg::Error::new(
311                    iceberg::ErrorKind::FeatureUnsupported,
312                    "Loading uncommitted table is not supported!",
313                )
314            })?;
315
316            tracing::info!("Table metadata location of {table_name_str} is {metadata_location}");
317
318            let table_metadata = resp.metadata;
319
320            let file_io = FileIO::from_path(&metadata_location)?
321                .with_props(self.file_io_props.iter())
322                .build()?;
323
324            Ok(Table::builder()
325                .file_io(file_io)
326                .identifier(table.clone())
327                .metadata(table_metadata)
328                .build())
329        })
330        .map_err(|e| {
331            iceberg::Error::new(
332                iceberg::ErrorKind::Unexpected,
333                "Failed to load iceberg table.",
334            )
335            .with_source(e)
336        })?
337    }
338
339    /// Drop a table from the catalog.
340    async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
341        let jvm = self.jvm;
342        let table = table.to_owned();
343        let java_catalog = self.java_catalog.clone();
344        // spawn blocking the drop table task, since dropping a table by default would purge the data which may take a long time.
345        tokio::task::spawn_blocking(move || -> iceberg::Result<()> {
346            execute_with_jni_env(jvm, |env| {
347                let table_name_str = table.to_string();
348
349                let table_name_jstr = env.new_string(&table_name_str).unwrap();
350
351                call_method!(env, java_catalog.as_obj(), {boolean dropTable(String)},
352                &table_name_jstr)
353                .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;
354
355                Ok(())
356            })
357            .map_err(|e| {
358                iceberg::Error::new(
359                    iceberg::ErrorKind::Unexpected,
360                    "Failed to drop iceberg table.",
361                )
362                .with_source(e)
363            })
364        })
365        .await
366        .map_err(|e| {
367            iceberg::Error::new(
368                iceberg::ErrorKind::Unexpected,
369                "Failed to drop iceberg table.",
370            )
371            .with_source(e)
372        })?
373    }
374
375    async fn register_table(
376        &self,
377        _table_ident: &TableIdent,
378        _metadata_location: String,
379    ) -> iceberg::Result<Table> {
380        Err(iceberg::Error::new(
381            iceberg::ErrorKind::Unexpected,
382            "register_table is not supported by JniCatalog",
383        ))
384    }
385
386    /// Check if a table exists in the catalog.
387    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
388        execute_with_jni_env(self.jvm, |env| {
389            let table_name_str = table.to_string();
390
391            let table_name_jstr = env.new_string(&table_name_str).unwrap();
392
393            let exists =
394                call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
395                &table_name_jstr)
396                .with_context(|| {
397                    format!("Failed to check iceberg table exists: {table_name_str}")
398                })?;
399
400            Ok(exists)
401        })
402        .map_err(|e| {
403            iceberg::Error::new(
404                iceberg::ErrorKind::Unexpected,
405                "Failed to check iceberg table exists.",
406            )
407            .with_source(e)
408        })
409    }
410
411    /// Rename a table in the catalog.
412    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
413        todo!()
414    }
415
416    /// Update a table to the catalog.
417    async fn update_table(&self, mut commit: TableCommit) -> iceberg::Result<Table> {
418        execute_with_jni_env(self.jvm, |env| {
419            let requirements = commit.take_requirements();
420            let updates = commit.take_updates();
421            let request = CommitTableRequest {
422                identifier: commit.identifier().clone(),
423                requirements,
424                updates,
425            };
426            let request_str = serde_json::to_string(&request)?;
427
428            let request_jni_str = env.new_string(&request_str).with_context(|| {
429                format!("Failed to create jni string from request json: {request_str}.")
430            })?;
431
432            let result_json =
433                call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
434                &request_jni_str)
435                .with_context(|| {
436                    format!("Failed to update iceberg table: {}", commit.identifier())
437                })?;
438
439            let rust_json_str = jobj_to_str(env, result_json)?;
440
441            let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
442
443            tracing::info!(
444                "Table metadata location of {} is {}",
445                commit.identifier(),
446                response.metadata_location
447            );
448
449            let table_metadata = response.metadata;
450
451            let file_io = FileIO::from_path(&response.metadata_location)?
452                .with_props(self.file_io_props.iter())
453                .build()?;
454
455            Ok(Table::builder()
456                .file_io(file_io)
457                .identifier(commit.identifier().clone())
458                .metadata(table_metadata)
459                .build()?)
460        })
461        .map_err(|e| {
462            iceberg::Error::new(
463                iceberg::ErrorKind::Unexpected,
464                "Failed to update iceberg table.",
465            )
466            .with_source(e)
467        })
468    }
469}
470
471impl Drop for JniCatalog {
472    fn drop(&mut self) {
473        let _ = execute_with_jni_env(self.jvm, |env| {
474            call_method!(env, self.java_catalog.as_obj(), {void close()})
475                .with_context(|| "Failed to close iceberg catalog".to_owned())?;
476            Ok(())
477        })
478        .inspect_err(
479            |e| tracing::error!(error = ?e.as_report(), "Failed to close iceberg catalog"),
480        );
481    }
482}
483
484impl JniCatalog {
485    fn build(
486        file_io_props: HashMap<String, String>,
487        name: impl ToString,
488        catalog_impl: impl ToString,
489        java_catalog_props: HashMap<String, String>,
490    ) -> ConnectorResult<Self> {
491        let jvm = Jvm::get_or_init()?;
492
493        execute_with_jni_env(jvm, |env| {
494            // Convert props to string array
495            let props = env.new_object_array(
496                (java_catalog_props.len() * 2) as i32,
497                "java/lang/String",
498                JObject::null(),
499            )?;
500            for (i, (key, value)) in java_catalog_props.iter().enumerate() {
501                let key_j_str = env.new_string(key)?;
502                let value_j_str = env.new_string(value)?;
503                env.set_object_array_element(&props, i as i32 * 2, key_j_str)?;
504                env.set_object_array_element(&props, i as i32 * 2 + 1, value_j_str)?;
505            }
506
507            let jni_catalog_wrapper = env
508                .call_static_method(
509                    "com/risingwave/connector/catalog/JniCatalogWrapper",
510                    "create",
511                    "(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/risingwave/connector/catalog/JniCatalogWrapper;",
512                    &[
513                        (&env.new_string(name.to_string()).unwrap()).into(),
514                        (&env.new_string(catalog_impl.to_string()).unwrap()).into(),
515                        (&props).into(),
516                    ],
517                )?;
518
519            let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?;
520
521            Ok(Self {
522                java_catalog: jni_catalog,
523                jvm,
524                file_io_props,
525            })
526        })
527            .map_err(Into::into)
528    }
529
530    pub fn build_catalog(
531        file_io_props: HashMap<String, String>,
532        name: impl ToString,
533        catalog_impl: impl ToString,
534        java_catalog_props: HashMap<String, String>,
535    ) -> ConnectorResult<Arc<dyn Catalog>> {
536        let catalog = Self::build(file_io_props, name, catalog_impl, java_catalog_props)?;
537        Ok(Arc::new(catalog) as Arc<dyn Catalog>)
538    }
539}