risingwave_jni_core/
opendal_schema_history.rs1use std::borrow::Cow;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use jni::objects::{JByteArray, JObject, JString};
20use risingwave_common::config::ObjectStoreConfig;
21use risingwave_common::{DATA_DIRECTORY, STATE_STORE_URL};
22use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
23use risingwave_object_store::object::{ObjectStoreImpl, build_remote_object_store};
24use tokio::sync::OnceCell;
25
26use crate::{EnvParam, JAVA_BINDING_ASYNC_RUNTIME, execute_and_catch, to_guarded_slice};
27
28static OBJECT_STORE_INSTANCE: OnceCell<Arc<ObjectStoreImpl>> = OnceCell::const_new();
29
30fn validate_dat_file_extension(path: &str) -> Result<(), String> {
33 if path.is_empty() {
34 return Err("File path cannot be empty".to_owned());
35 }
36
37 if !path.ends_with(".dat") {
38 return Err(format!(
39 "Security violation: Only .dat files are allowed for schema history operations, got: {}",
40 path
41 ));
42 }
43
44 Ok(())
45}
46
47fn prepend_data_directory(path: &str) -> String {
49 let data_dir = DATA_DIRECTORY.get().map(|s| s.as_str()).expect(
50 "DATA_DIRECTORY is not set. This is dangerous in cloud environments as it can cause data conflicts between multiple instances sharing the same bucket. Please ensure data_directory is properly configured."
51 );
52
53 if data_dir.ends_with('/') || path.starts_with('/') {
54 format!("{}{}", data_dir, path)
55 } else {
56 format!("{}/{}", data_dir, path)
57 }
58}
59
60async fn get_object_store() -> Arc<ObjectStoreImpl> {
61 OBJECT_STORE_INSTANCE
62 .get_or_init(|| async {
63 let hummock_url = STATE_STORE_URL.get().unwrap();
64 let object_store = build_remote_object_store(
65 hummock_url.strip_prefix("hummock+").unwrap_or("memory"),
66 Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()),
67 "rw-cdc-schema-history",
68 Arc::new(ObjectStoreConfig::default()),
69 )
70 .await;
71 Arc::new(object_store)
72 })
73 .await
74 .clone()
75}
76
77#[unsafe(no_mangle)]
80pub extern "system" fn Java_com_risingwave_java_binding_Binding_initObjectStoreForTest(
81 env: EnvParam<'_>,
82 state_store_url: JString<'_>,
83 data_directory: JString<'_>,
84) {
85 execute_and_catch(env, move |env| {
86 let state_store_url_str = env.get_string(&state_store_url).map_err(|e| anyhow!(e))?;
87 let state_store_url_str: Cow<'_, str> = (&state_store_url_str).into();
88
89 let data_directory_str = env.get_string(&data_directory).map_err(|e| anyhow!(e))?;
90 let data_directory_str: Cow<'_, str> = (&data_directory_str).into();
91
92 let _ = STATE_STORE_URL.set(state_store_url_str.to_string());
94 let _ = DATA_DIRECTORY.set(data_directory_str.to_string());
95
96 Ok(())
97 });
98}
99
100#[unsafe(no_mangle)]
101pub extern "system" fn Java_com_risingwave_java_binding_Binding_putObject(
102 env: EnvParam<'_>,
103 object_name: JString<'_>,
104 data: JByteArray<'_>,
105) {
106 execute_and_catch(env, move |env| {
107 let object_name = env.get_string(&object_name).map_err(|e| anyhow!(e))?;
108 let object_name: Cow<'_, str> = (&object_name).into();
109 validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
111 let object_name = prepend_data_directory(&object_name);
112 let data_guard = to_guarded_slice(&data, env).map_err(|e| anyhow!(e))?;
113 let data: Vec<u8> = data_guard.slice.to_vec();
114 JAVA_BINDING_ASYNC_RUNTIME
115 .block_on(async {
116 let object_store = get_object_store().await;
117 object_store.upload(&object_name, data.into()).await
118 })
119 .map_err(|e| anyhow!(e))?;
120 Ok(())
121 });
122}
123
124#[unsafe(no_mangle)]
125pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObject<'a>(
126 env: EnvParam<'a>,
127 object_name: JString<'a>,
128) -> JByteArray<'a> {
129 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
130 let object_name = env.get_string(&object_name)?;
131 let object_name: Cow<'_, str> = (&object_name).into();
132
133 validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
135 let object_name = prepend_data_directory(&object_name);
136 let result = JAVA_BINDING_ASYNC_RUNTIME
137 .block_on(async {
138 let object_store = get_object_store().await;
139 object_store.read(&object_name, ..).await
140 })
141 .map_err(|e| anyhow!(e))?;
142
143 Ok(env.byte_array_from_slice(&result)?)
144 })
145}
146
147#[unsafe(no_mangle)]
148pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObjectStoreType<'a>(
149 env: EnvParam<'a>,
150) -> JString<'a> {
151 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
152 let media_type = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
153 let object_store = get_object_store().await;
154 object_store.media_type().to_owned()
155 });
156 Ok(env.new_string(media_type)?)
157 })
158}
159
160fn strip_data_directory_prefix(path: &str) -> Result<String, String> {
161 let data_directory = DATA_DIRECTORY
162 .get()
163 .map(|s| s.as_str())
164 .ok_or("expect DATA_DIRECTORY")?;
165 let relative_path = path.strip_prefix(data_directory).ok_or_else(|| {
166 format!(
167 "DATA_DIRECTORY {} is not prefix of path {}",
168 data_directory, path
169 )
170 })?;
171 let stripped = if let Some(stripped) = relative_path.strip_prefix('/') {
172 stripped.to_owned()
173 } else {
174 relative_path.to_owned()
175 };
176 Ok(stripped)
177}
178
179#[unsafe(no_mangle)]
180pub extern "system" fn Java_com_risingwave_java_binding_Binding_listObject<'a>(
181 env: EnvParam<'a>,
182 dir: JString<'a>,
183) -> jni::sys::jobjectArray {
184 **execute_and_catch(env, move |env: &mut EnvParam<'_>| {
185 let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
186 let dir: Cow<'_, str> = (&dir).into();
187
188 let dir = prepend_data_directory(&dir);
191
192 let files: Vec<String> = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
193 let object_store = get_object_store().await;
194 let mut prefix_stripped_paths = Vec::new();
195 let mut stream = object_store
196 .list(&dir, None, None)
197 .await
198 .map_err(|e| anyhow!(e))?;
199 use futures::StreamExt;
200 while let Some(obj) = stream.next().await {
201 let obj = obj.map_err(|e| anyhow!(e))?;
202 let relative_path =
205 strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
206 if validate_dat_file_extension(&relative_path).is_ok() {
207 prefix_stripped_paths.push(relative_path);
208 } else {
209 tracing::warn!("Filtering out non-.dat file from list: {}", obj.key);
210 }
211 }
212 Ok::<_, anyhow::Error>(prefix_stripped_paths)
213 })?;
214
215 let string_class = env.find_class("java/lang/String").map_err(|e| anyhow!(e))?;
216 let array = env
217 .new_object_array(files.len() as i32, string_class, JObject::null())
218 .map_err(|e| anyhow!(e))?;
219 for (i, file) in files.iter().enumerate() {
220 let jstr = env.new_string(file).map_err(|e| anyhow!(e))?;
221 env.set_object_array_element(&array, i as i32, &jstr)
222 .map_err(|e| anyhow!(e))?;
223 }
224 Ok(array)
225 })
226}
227
228#[unsafe(no_mangle)]
229pub extern "system" fn Java_com_risingwave_java_binding_Binding_deleteObjects<'a>(
230 env: EnvParam<'a>,
231 dir: JString<'a>,
232) {
233 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
234 let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
235 let dir: Cow<'_, str> = (&dir).into();
236
237 let dir = prepend_data_directory(&dir);
240
241 JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
242 let object_store = get_object_store().await;
243 let mut keys = Vec::new();
244 let mut stream = object_store
245 .list(&dir, None, None)
246 .await
247 .map_err(|e| anyhow!(e))?;
248 use futures::StreamExt;
249 while let Some(obj) = stream.next().await {
250 let obj = obj.map_err(|e| anyhow!(e))?;
251 let relative_path =
254 strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
255 if validate_dat_file_extension(&relative_path).is_ok() {
256 keys.push(obj.key);
257 } else {
258 tracing::warn!("Skipping deletion of non-.dat file: {}", obj.key);
259 }
260 }
261 tracing::debug!(?keys, "Deleting schema history files");
262 object_store
263 .delete_objects(&keys)
264 .await
265 .map_err(|e| anyhow!(e))?;
266 Ok::<_, anyhow::Error>(())
267 })?;
268 Ok(())
269 });
270}