risingwave_jni_core/
opendal_schema_history.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use jni::objects::{JByteArray, JObject, JString};
20use risingwave_common::config::ObjectStoreConfig;
21use risingwave_common::{DATA_DIRECTORY, STATE_STORE_URL};
22use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
23use risingwave_object_store::object::{ObjectStoreImpl, build_remote_object_store};
24use tokio::sync::OnceCell;
25
26use crate::{EnvParam, JAVA_BINDING_ASYNC_RUNTIME, execute_and_catch, to_guarded_slice};
27
28static OBJECT_STORE_INSTANCE: OnceCell<Arc<ObjectStoreImpl>> = OnceCell::const_new();
29
30/// Security safeguard: Check if file has .dat extension
31/// This prevents accidental overwriting of Hummock data or other critical files.
32fn validate_dat_file_extension(path: &str) -> Result<(), String> {
33    if path.is_empty() {
34        return Err("File path cannot be empty".to_owned());
35    }
36
37    if !path.ends_with(".dat") {
38        return Err(format!(
39            "Security violation: Only .dat files are allowed for schema history operations, got: {}",
40            path
41        ));
42    }
43
44    Ok(())
45}
46
47// schema history is internal state, all data is stored under the DATA_DIRECTORY directory.
48fn prepend_data_directory(path: &str) -> String {
49    let data_dir = DATA_DIRECTORY.get().map(|s| s.as_str()).expect(
50        "DATA_DIRECTORY is not set. This is dangerous in cloud environments as it can cause data conflicts between multiple instances sharing the same bucket. Please ensure data_directory is properly configured."
51    );
52
53    if data_dir.ends_with('/') || path.starts_with('/') {
54        format!("{}{}", data_dir, path)
55    } else {
56        format!("{}/{}", data_dir, path)
57    }
58}
59
60async fn get_object_store() -> Arc<ObjectStoreImpl> {
61    OBJECT_STORE_INSTANCE
62        .get_or_init(|| async {
63            let hummock_url = STATE_STORE_URL.get().unwrap();
64            let object_store = build_remote_object_store(
65                hummock_url.strip_prefix("hummock+").unwrap_or("memory"),
66                Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()),
67                "rw-cdc-schema-history",
68                Arc::new(ObjectStoreConfig::default()),
69            )
70            .await;
71            Arc::new(object_store)
72        })
73        .await
74        .clone()
75}
76
77/// Initialize `STATE_STORE_URL` and `DATA_DIRECTORY` for integration tests.
78/// Must be called before any schema history operations if compute node is not running.
79#[unsafe(no_mangle)]
80pub extern "system" fn Java_com_risingwave_java_binding_Binding_initObjectStoreForTest(
81    env: EnvParam<'_>,
82    state_store_url: JString<'_>,
83    data_directory: JString<'_>,
84) {
85    execute_and_catch(env, move |env| {
86        let state_store_url_str = env.get_string(&state_store_url).map_err(|e| anyhow!(e))?;
87        let state_store_url_str: Cow<'_, str> = (&state_store_url_str).into();
88
89        let data_directory_str = env.get_string(&data_directory).map_err(|e| anyhow!(e))?;
90        let data_directory_str: Cow<'_, str> = (&data_directory_str).into();
91
92        // Set the global variables (only if not already set)
93        let _ = STATE_STORE_URL.set(state_store_url_str.to_string());
94        let _ = DATA_DIRECTORY.set(data_directory_str.to_string());
95
96        Ok(())
97    });
98}
99
100#[unsafe(no_mangle)]
101pub extern "system" fn Java_com_risingwave_java_binding_Binding_putObject(
102    env: EnvParam<'_>,
103    object_name: JString<'_>,
104    data: JByteArray<'_>,
105) {
106    execute_and_catch(env, move |env| {
107        let object_name = env.get_string(&object_name).map_err(|e| anyhow!(e))?;
108        let object_name: Cow<'_, str> = (&object_name).into();
109        // Security check: validate file extension before any operation
110        validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
111        let object_name = prepend_data_directory(&object_name);
112        let data_guard = to_guarded_slice(&data, env).map_err(|e| anyhow!(e))?;
113        let data: Vec<u8> = data_guard.slice.to_vec();
114        JAVA_BINDING_ASYNC_RUNTIME
115            .block_on(async {
116                let object_store = get_object_store().await;
117                object_store.upload(&object_name, data.into()).await
118            })
119            .map_err(|e| anyhow!(e))?;
120        Ok(())
121    });
122}
123
124#[unsafe(no_mangle)]
125pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObject<'a>(
126    env: EnvParam<'a>,
127    object_name: JString<'a>,
128) -> JByteArray<'a> {
129    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
130        let object_name = env.get_string(&object_name)?;
131        let object_name: Cow<'_, str> = (&object_name).into();
132
133        // Security check: validate file extension before any operation
134        validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
135        let object_name = prepend_data_directory(&object_name);
136        let result = JAVA_BINDING_ASYNC_RUNTIME
137            .block_on(async {
138                let object_store = get_object_store().await;
139                object_store.read(&object_name, ..).await
140            })
141            .map_err(|e| anyhow!(e))?;
142
143        Ok(env.byte_array_from_slice(&result)?)
144    })
145}
146
147#[unsafe(no_mangle)]
148pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObjectStoreType<'a>(
149    env: EnvParam<'a>,
150) -> JString<'a> {
151    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
152        let media_type = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
153            let object_store = get_object_store().await;
154            object_store.media_type().to_owned()
155        });
156        Ok(env.new_string(media_type)?)
157    })
158}
159
160fn strip_data_directory_prefix(path: &str) -> Result<String, String> {
161    let data_directory = DATA_DIRECTORY
162        .get()
163        .map(|s| s.as_str())
164        .ok_or("expect DATA_DIRECTORY")?;
165    let relative_path = path.strip_prefix(data_directory).ok_or_else(|| {
166        format!(
167            "DATA_DIRECTORY {} is not prefix of path {}",
168            data_directory, path
169        )
170    })?;
171    let stripped = if let Some(stripped) = relative_path.strip_prefix('/') {
172        stripped.to_owned()
173    } else {
174        relative_path.to_owned()
175    };
176    Ok(stripped)
177}
178
179#[unsafe(no_mangle)]
180pub extern "system" fn Java_com_risingwave_java_binding_Binding_listObject<'a>(
181    env: EnvParam<'a>,
182    dir: JString<'a>,
183) -> jni::sys::jobjectArray {
184    **execute_and_catch(env, move |env: &mut EnvParam<'_>| {
185        let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
186        let dir: Cow<'_, str> = (&dir).into();
187
188        // Note: listObject operates on directories, individual file security is checked in putObject/getObject
189
190        let dir = prepend_data_directory(&dir);
191
192        let files: Vec<String> = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
193            let object_store = get_object_store().await;
194            let mut prefix_stripped_paths = Vec::new();
195            let mut stream = object_store
196                .list(&dir, None, None)
197                .await
198                .map_err(|e| anyhow!(e))?;
199            use futures::StreamExt;
200            while let Some(obj) = stream.next().await {
201                let obj = obj.map_err(|e| anyhow!(e))?;
202                // Additional security: only return files that pass validation
203                // Remove the data directory prefix for validation
204                let relative_path =
205                    strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
206                if validate_dat_file_extension(&relative_path).is_ok() {
207                    prefix_stripped_paths.push(relative_path);
208                } else {
209                    tracing::warn!("Filtering out non-.dat file from list: {}", obj.key);
210                }
211            }
212            Ok::<_, anyhow::Error>(prefix_stripped_paths)
213        })?;
214
215        let string_class = env.find_class("java/lang/String").map_err(|e| anyhow!(e))?;
216        let array = env
217            .new_object_array(files.len() as i32, string_class, JObject::null())
218            .map_err(|e| anyhow!(e))?;
219        for (i, file) in files.iter().enumerate() {
220            let jstr = env.new_string(file).map_err(|e| anyhow!(e))?;
221            env.set_object_array_element(&array, i as i32, &jstr)
222                .map_err(|e| anyhow!(e))?;
223        }
224        Ok(array)
225    })
226}
227
228#[unsafe(no_mangle)]
229pub extern "system" fn Java_com_risingwave_java_binding_Binding_deleteObjects<'a>(
230    env: EnvParam<'a>,
231    dir: JString<'a>,
232) {
233    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
234        let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
235        let dir: Cow<'_, str> = (&dir).into();
236
237        // Note: deleteObjects operates on directories, individual file security is checked in putObject/getObject
238
239        let dir = prepend_data_directory(&dir);
240
241        JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
242            let object_store = get_object_store().await;
243            let mut keys = Vec::new();
244            let mut stream = object_store
245                .list(&dir, None, None)
246                .await
247                .map_err(|e| anyhow!(e))?;
248            use futures::StreamExt;
249            while let Some(obj) = stream.next().await {
250                let obj = obj.map_err(|e| anyhow!(e))?;
251                // Additional security: only delete files that pass validation
252                // Remove the data directory prefix for validation
253                let relative_path =
254                    strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
255                if validate_dat_file_extension(&relative_path).is_ok() {
256                    keys.push(obj.key);
257                } else {
258                    tracing::warn!("Skipping deletion of non-.dat file: {}", obj.key);
259                }
260            }
261            tracing::debug!(?keys, "Deleting schema history files");
262            object_store
263                .delete_objects(&keys)
264                .await
265                .map_err(|e| anyhow!(e))?;
266            Ok::<_, anyhow::Error>(())
267        })?;
268        Ok(())
269    });
270}