risingwave_jni_core/
opendal_schema_history.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::sync::{Arc, OnceLock};
17use std::time::Duration;
18
19use anyhow::anyhow;
20use jni::objects::{JByteArray, JObject, JString};
21use risingwave_common::config::ObjectStoreConfig;
22use risingwave_common::{DATA_DIRECTORY, STATE_STORE_URL};
23use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
24use risingwave_object_store::object::{ObjectError, ObjectStoreImpl, build_remote_object_store};
25use thiserror_ext::AsReport;
26use tokio::sync::OnceCell;
27
28use crate::{EnvParam, JAVA_BINDING_ASYNC_RUNTIME, execute_and_catch, to_guarded_slice};
29
30static OBJECT_STORE_INSTANCE: OnceCell<Arc<ObjectStoreImpl>> = OnceCell::const_new();
31
32const PUT_OBJECT_TOTAL_TIMEOUT_ENV: &str = "RW_CDC_SCHEMA_HISTORY_PUT_OBJECT_TIMEOUT_MS";
33const DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS: u64 = 3_000;
34
35/// Total timeout for `Binding_putObject`.
36///
37/// - Defaults to 3 seconds.
38/// - Set `RW_CDC_SCHEMA_HISTORY_PUT_OBJECT_TIMEOUT_MS=0` to disable the timeout.
39fn put_object_total_timeout() -> Option<Duration> {
40    static TIMEOUT: OnceLock<Option<Duration>> = OnceLock::new();
41    *TIMEOUT.get_or_init(|| {
42        let ms = match std::env::var(PUT_OBJECT_TOTAL_TIMEOUT_ENV) {
43            Ok(v) => match v.trim().parse::<u64>() {
44                Ok(ms) => ms,
45                Err(e) => {
46                    tracing::warn!(
47                        error = %e.as_report(),
48                        value = %v,
49                        "Invalid {}, falling back to default {}ms",
50                        PUT_OBJECT_TOTAL_TIMEOUT_ENV,
51                        DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS,
52                    );
53                    DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS
54                }
55            },
56            Err(_) => DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS,
57        };
58
59        if ms == 0 {
60            None
61        } else {
62            Some(Duration::from_millis(ms))
63        }
64    })
65}
66
67/// Security safeguard: Check if file has .dat extension
68/// This prevents accidental overwriting of Hummock data or other critical files.
69fn validate_dat_file_extension(path: &str) -> Result<(), String> {
70    if path.is_empty() {
71        return Err("File path cannot be empty".to_owned());
72    }
73
74    if !path.ends_with(".dat") {
75        return Err(format!(
76            "Security violation: Only .dat files are allowed for schema history operations, got: {}",
77            path
78        ));
79    }
80
81    Ok(())
82}
83
84// schema history is internal state, all data is stored under the DATA_DIRECTORY directory.
85fn prepend_data_directory(path: &str) -> String {
86    let data_dir = DATA_DIRECTORY.get().map(|s| s.as_str()).expect(
87        "DATA_DIRECTORY is not set. This is dangerous in cloud environments as it can cause data conflicts between multiple instances sharing the same bucket. Please ensure data_directory is properly configured."
88    );
89
90    if data_dir.ends_with('/') || path.starts_with('/') {
91        format!("{}{}", data_dir, path)
92    } else {
93        format!("{}/{}", data_dir, path)
94    }
95}
96
97async fn get_object_store() -> Arc<ObjectStoreImpl> {
98    OBJECT_STORE_INSTANCE
99        .get_or_init(|| async {
100            let hummock_url = STATE_STORE_URL.get().unwrap();
101            let object_store = build_remote_object_store(
102                hummock_url.strip_prefix("hummock+").unwrap_or("memory"),
103                Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()),
104                "rw-cdc-schema-history",
105                Arc::new(ObjectStoreConfig::default()),
106            )
107            .await;
108            Arc::new(object_store)
109        })
110        .await
111        .clone()
112}
113
114/// Initialize `STATE_STORE_URL` and `DATA_DIRECTORY` for integration tests.
115/// Must be called before any schema history operations if compute node is not running.
116#[unsafe(no_mangle)]
117pub extern "system" fn Java_com_risingwave_java_binding_Binding_initObjectStoreForTest(
118    env: EnvParam<'_>,
119    state_store_url: JString<'_>,
120    data_directory: JString<'_>,
121) {
122    execute_and_catch(env, move |env| {
123        let state_store_url_str = env.get_string(&state_store_url).map_err(|e| anyhow!(e))?;
124        let state_store_url_str: Cow<'_, str> = (&state_store_url_str).into();
125
126        let data_directory_str = env.get_string(&data_directory).map_err(|e| anyhow!(e))?;
127        let data_directory_str: Cow<'_, str> = (&data_directory_str).into();
128
129        // Set the global variables (only if not already set)
130        let _ = STATE_STORE_URL.set(state_store_url_str.to_string());
131        let _ = DATA_DIRECTORY.set(data_directory_str.to_string());
132
133        Ok(())
134    });
135}
136
137#[unsafe(no_mangle)]
138pub extern "system" fn Java_com_risingwave_java_binding_Binding_putObject(
139    env: EnvParam<'_>,
140    object_name: JString<'_>,
141    data: JByteArray<'_>,
142) {
143    execute_and_catch(env, move |env| {
144        let object_name = env.get_string(&object_name).map_err(|e| anyhow!(e))?;
145        let object_name: Cow<'_, str> = (&object_name).into();
146        // Security check: validate file extension before any operation
147        validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
148        let object_name = prepend_data_directory(&object_name);
149        let data_guard = to_guarded_slice(&data, env).map_err(|e| anyhow!(e))?;
150        let data: Vec<u8> = data_guard.slice.to_vec();
151        let total_timeout = put_object_total_timeout();
152        JAVA_BINDING_ASYNC_RUNTIME
153            .block_on(async {
154                let f = async {
155                    let object_store = get_object_store().await;
156                    object_store.upload(&object_name, data.into()).await
157                };
158
159                match total_timeout {
160                    Some(timeout) => match tokio::time::timeout(timeout, f).await {
161                        Ok(res) => res,
162                        Err(_) => Err(ObjectError::timeout(format!(
163                            "[Schema history] putObject timed out after {:?}. You can adjust it via {} (ms), or set it to 0 to disable.",
164                            timeout, PUT_OBJECT_TOTAL_TIMEOUT_ENV
165                        ))),
166                    },
167                    None => f.await,
168                }
169            })
170            .map_err(|e| anyhow!(e))?;
171        Ok(())
172    });
173}
174
175#[unsafe(no_mangle)]
176pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObject<'a>(
177    env: EnvParam<'a>,
178    object_name: JString<'a>,
179) -> JByteArray<'a> {
180    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
181        let object_name = env.get_string(&object_name)?;
182        let object_name: Cow<'_, str> = (&object_name).into();
183
184        // Security check: validate file extension before any operation
185        validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
186        let object_name = prepend_data_directory(&object_name);
187        let result = JAVA_BINDING_ASYNC_RUNTIME
188            .block_on(async {
189                let object_store = get_object_store().await;
190                object_store.read(&object_name, ..).await
191            })
192            .map_err(|e| anyhow!(e))?;
193
194        Ok(env.byte_array_from_slice(&result)?)
195    })
196}
197
198#[unsafe(no_mangle)]
199pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObjectStoreType<'a>(
200    env: EnvParam<'a>,
201) -> JString<'a> {
202    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
203        let media_type = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
204            let object_store = get_object_store().await;
205            object_store.media_type().to_owned()
206        });
207        Ok(env.new_string(media_type)?)
208    })
209}
210
211fn strip_data_directory_prefix(path: &str) -> Result<String, String> {
212    let data_directory = DATA_DIRECTORY
213        .get()
214        .map(|s| s.as_str())
215        .ok_or("expect DATA_DIRECTORY")?;
216    let relative_path = path.strip_prefix(data_directory).ok_or_else(|| {
217        format!(
218            "DATA_DIRECTORY {} is not prefix of path {}",
219            data_directory, path
220        )
221    })?;
222    let stripped = if let Some(stripped) = relative_path.strip_prefix('/') {
223        stripped.to_owned()
224    } else {
225        relative_path.to_owned()
226    };
227    Ok(stripped)
228}
229
230#[unsafe(no_mangle)]
231pub extern "system" fn Java_com_risingwave_java_binding_Binding_listObject<'a>(
232    env: EnvParam<'a>,
233    dir: JString<'a>,
234) -> jni::sys::jobjectArray {
235    **execute_and_catch(env, move |env: &mut EnvParam<'_>| {
236        let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
237        let dir: Cow<'_, str> = (&dir).into();
238
239        // Note: listObject operates on directories, individual file security is checked in putObject/getObject
240
241        let dir = prepend_data_directory(&dir);
242
243        let files: Vec<String> = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
244            let object_store = get_object_store().await;
245            let mut prefix_stripped_paths = Vec::new();
246            let mut stream = object_store
247                .list(&dir, None, None)
248                .await
249                .map_err(|e| anyhow!(e))?;
250            use futures::StreamExt;
251            while let Some(obj) = stream.next().await {
252                let obj = obj.map_err(|e| anyhow!(e))?;
253                // Additional security: only return files that pass validation
254                // Remove the data directory prefix for validation
255                let relative_path =
256                    strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
257                if validate_dat_file_extension(&relative_path).is_ok() {
258                    prefix_stripped_paths.push(relative_path);
259                } else {
260                    tracing::warn!("Filtering out non-.dat file from list: {}", obj.key);
261                }
262            }
263            Ok::<_, anyhow::Error>(prefix_stripped_paths)
264        })?;
265
266        let string_class = env.find_class("java/lang/String").map_err(|e| anyhow!(e))?;
267        let array = env
268            .new_object_array(files.len() as i32, string_class, JObject::null())
269            .map_err(|e| anyhow!(e))?;
270        for (i, file) in files.iter().enumerate() {
271            let jstr = env.new_string(file).map_err(|e| anyhow!(e))?;
272            env.set_object_array_element(&array, i as i32, &jstr)
273                .map_err(|e| anyhow!(e))?;
274        }
275        Ok(array)
276    })
277}
278
279#[unsafe(no_mangle)]
280pub extern "system" fn Java_com_risingwave_java_binding_Binding_deleteObjects<'a>(
281    env: EnvParam<'a>,
282    dir: JString<'a>,
283) {
284    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
285        let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
286        let dir: Cow<'_, str> = (&dir).into();
287
288        // Note: deleteObjects operates on directories, individual file security is checked in putObject/getObject
289
290        let dir = prepend_data_directory(&dir);
291
292        JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
293            let object_store = get_object_store().await;
294            let mut keys = Vec::new();
295            let mut stream = object_store
296                .list(&dir, None, None)
297                .await
298                .map_err(|e| anyhow!(e))?;
299            use futures::StreamExt;
300            while let Some(obj) = stream.next().await {
301                let obj = obj.map_err(|e| anyhow!(e))?;
302                // Additional security: only delete files that pass validation
303                // Remove the data directory prefix for validation
304                let relative_path =
305                    strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
306                if validate_dat_file_extension(&relative_path).is_ok() {
307                    keys.push(obj.key);
308                } else {
309                    tracing::warn!("Skipping deletion of non-.dat file: {}", obj.key);
310                }
311            }
312            tracing::debug!(?keys, "Deleting schema history files");
313            object_store
314                .delete_objects(&keys)
315                .await
316                .map_err(|e| anyhow!(e))?;
317            Ok::<_, anyhow::Error>(())
318        })?;
319        Ok(())
320    });
321}