risingwave_jni_core/
opendal_schema_history.rs1use std::borrow::Cow;
16use std::sync::{Arc, OnceLock};
17use std::time::Duration;
18
19use anyhow::anyhow;
20use jni::objects::{JByteArray, JObject, JString};
21use risingwave_common::config::ObjectStoreConfig;
22use risingwave_common::{DATA_DIRECTORY, STATE_STORE_URL};
23use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
24use risingwave_object_store::object::{ObjectError, ObjectStoreImpl, build_remote_object_store};
25use thiserror_ext::AsReport;
26use tokio::sync::OnceCell;
27
28use crate::{EnvParam, JAVA_BINDING_ASYNC_RUNTIME, execute_and_catch, to_guarded_slice};
29
30static OBJECT_STORE_INSTANCE: OnceCell<Arc<ObjectStoreImpl>> = OnceCell::const_new();
31
32const PUT_OBJECT_TOTAL_TIMEOUT_ENV: &str = "RW_CDC_SCHEMA_HISTORY_PUT_OBJECT_TIMEOUT_MS";
33const DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS: u64 = 3_000;
34
35fn put_object_total_timeout() -> Option<Duration> {
40 static TIMEOUT: OnceLock<Option<Duration>> = OnceLock::new();
41 *TIMEOUT.get_or_init(|| {
42 let ms = match std::env::var(PUT_OBJECT_TOTAL_TIMEOUT_ENV) {
43 Ok(v) => match v.trim().parse::<u64>() {
44 Ok(ms) => ms,
45 Err(e) => {
46 tracing::warn!(
47 error = %e.as_report(),
48 value = %v,
49 "Invalid {}, falling back to default {}ms",
50 PUT_OBJECT_TOTAL_TIMEOUT_ENV,
51 DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS,
52 );
53 DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS
54 }
55 },
56 Err(_) => DEFAULT_PUT_OBJECT_TOTAL_TIMEOUT_MS,
57 };
58
59 if ms == 0 {
60 None
61 } else {
62 Some(Duration::from_millis(ms))
63 }
64 })
65}
66
67fn validate_dat_file_extension(path: &str) -> Result<(), String> {
70 if path.is_empty() {
71 return Err("File path cannot be empty".to_owned());
72 }
73
74 if !path.ends_with(".dat") {
75 return Err(format!(
76 "Security violation: Only .dat files are allowed for schema history operations, got: {}",
77 path
78 ));
79 }
80
81 Ok(())
82}
83
84fn prepend_data_directory(path: &str) -> String {
86 let data_dir = DATA_DIRECTORY.get().map(|s| s.as_str()).expect(
87 "DATA_DIRECTORY is not set. This is dangerous in cloud environments as it can cause data conflicts between multiple instances sharing the same bucket. Please ensure data_directory is properly configured."
88 );
89
90 if data_dir.ends_with('/') || path.starts_with('/') {
91 format!("{}{}", data_dir, path)
92 } else {
93 format!("{}/{}", data_dir, path)
94 }
95}
96
97async fn get_object_store() -> Arc<ObjectStoreImpl> {
98 OBJECT_STORE_INSTANCE
99 .get_or_init(|| async {
100 let hummock_url = STATE_STORE_URL.get().unwrap();
101 let object_store = build_remote_object_store(
102 hummock_url.strip_prefix("hummock+").unwrap_or("memory"),
103 Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()),
104 "rw-cdc-schema-history",
105 Arc::new(ObjectStoreConfig::default()),
106 )
107 .await;
108 Arc::new(object_store)
109 })
110 .await
111 .clone()
112}
113
114#[unsafe(no_mangle)]
117pub extern "system" fn Java_com_risingwave_java_binding_Binding_initObjectStoreForTest(
118 env: EnvParam<'_>,
119 state_store_url: JString<'_>,
120 data_directory: JString<'_>,
121) {
122 execute_and_catch(env, move |env| {
123 let state_store_url_str = env.get_string(&state_store_url).map_err(|e| anyhow!(e))?;
124 let state_store_url_str: Cow<'_, str> = (&state_store_url_str).into();
125
126 let data_directory_str = env.get_string(&data_directory).map_err(|e| anyhow!(e))?;
127 let data_directory_str: Cow<'_, str> = (&data_directory_str).into();
128
129 let _ = STATE_STORE_URL.set(state_store_url_str.to_string());
131 let _ = DATA_DIRECTORY.set(data_directory_str.to_string());
132
133 Ok(())
134 });
135}
136
137#[unsafe(no_mangle)]
138pub extern "system" fn Java_com_risingwave_java_binding_Binding_putObject(
139 env: EnvParam<'_>,
140 object_name: JString<'_>,
141 data: JByteArray<'_>,
142) {
143 execute_and_catch(env, move |env| {
144 let object_name = env.get_string(&object_name).map_err(|e| anyhow!(e))?;
145 let object_name: Cow<'_, str> = (&object_name).into();
146 validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
148 let object_name = prepend_data_directory(&object_name);
149 let data_guard = to_guarded_slice(&data, env).map_err(|e| anyhow!(e))?;
150 let data: Vec<u8> = data_guard.slice.to_vec();
151 let total_timeout = put_object_total_timeout();
152 JAVA_BINDING_ASYNC_RUNTIME
153 .block_on(async {
154 let f = async {
155 let object_store = get_object_store().await;
156 object_store.upload(&object_name, data.into()).await
157 };
158
159 match total_timeout {
160 Some(timeout) => match tokio::time::timeout(timeout, f).await {
161 Ok(res) => res,
162 Err(_) => Err(ObjectError::timeout(format!(
163 "[Schema history] putObject timed out after {:?}. You can adjust it via {} (ms), or set it to 0 to disable.",
164 timeout, PUT_OBJECT_TOTAL_TIMEOUT_ENV
165 ))),
166 },
167 None => f.await,
168 }
169 })
170 .map_err(|e| anyhow!(e))?;
171 Ok(())
172 });
173}
174
175#[unsafe(no_mangle)]
176pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObject<'a>(
177 env: EnvParam<'a>,
178 object_name: JString<'a>,
179) -> JByteArray<'a> {
180 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
181 let object_name = env.get_string(&object_name)?;
182 let object_name: Cow<'_, str> = (&object_name).into();
183
184 validate_dat_file_extension(&object_name).map_err(|e| anyhow!(e))?;
186 let object_name = prepend_data_directory(&object_name);
187 let result = JAVA_BINDING_ASYNC_RUNTIME
188 .block_on(async {
189 let object_store = get_object_store().await;
190 object_store.read(&object_name, ..).await
191 })
192 .map_err(|e| anyhow!(e))?;
193
194 Ok(env.byte_array_from_slice(&result)?)
195 })
196}
197
198#[unsafe(no_mangle)]
199pub extern "system" fn Java_com_risingwave_java_binding_Binding_getObjectStoreType<'a>(
200 env: EnvParam<'a>,
201) -> JString<'a> {
202 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
203 let media_type = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
204 let object_store = get_object_store().await;
205 object_store.media_type().to_owned()
206 });
207 Ok(env.new_string(media_type)?)
208 })
209}
210
211fn strip_data_directory_prefix(path: &str) -> Result<String, String> {
212 let data_directory = DATA_DIRECTORY
213 .get()
214 .map(|s| s.as_str())
215 .ok_or("expect DATA_DIRECTORY")?;
216 let relative_path = path.strip_prefix(data_directory).ok_or_else(|| {
217 format!(
218 "DATA_DIRECTORY {} is not prefix of path {}",
219 data_directory, path
220 )
221 })?;
222 let stripped = if let Some(stripped) = relative_path.strip_prefix('/') {
223 stripped.to_owned()
224 } else {
225 relative_path.to_owned()
226 };
227 Ok(stripped)
228}
229
230#[unsafe(no_mangle)]
231pub extern "system" fn Java_com_risingwave_java_binding_Binding_listObject<'a>(
232 env: EnvParam<'a>,
233 dir: JString<'a>,
234) -> jni::sys::jobjectArray {
235 **execute_and_catch(env, move |env: &mut EnvParam<'_>| {
236 let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
237 let dir: Cow<'_, str> = (&dir).into();
238
239 let dir = prepend_data_directory(&dir);
242
243 let files: Vec<String> = JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
244 let object_store = get_object_store().await;
245 let mut prefix_stripped_paths = Vec::new();
246 let mut stream = object_store
247 .list(&dir, None, None)
248 .await
249 .map_err(|e| anyhow!(e))?;
250 use futures::StreamExt;
251 while let Some(obj) = stream.next().await {
252 let obj = obj.map_err(|e| anyhow!(e))?;
253 let relative_path =
256 strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
257 if validate_dat_file_extension(&relative_path).is_ok() {
258 prefix_stripped_paths.push(relative_path);
259 } else {
260 tracing::warn!("Filtering out non-.dat file from list: {}", obj.key);
261 }
262 }
263 Ok::<_, anyhow::Error>(prefix_stripped_paths)
264 })?;
265
266 let string_class = env.find_class("java/lang/String").map_err(|e| anyhow!(e))?;
267 let array = env
268 .new_object_array(files.len() as i32, string_class, JObject::null())
269 .map_err(|e| anyhow!(e))?;
270 for (i, file) in files.iter().enumerate() {
271 let jstr = env.new_string(file).map_err(|e| anyhow!(e))?;
272 env.set_object_array_element(&array, i as i32, &jstr)
273 .map_err(|e| anyhow!(e))?;
274 }
275 Ok(array)
276 })
277}
278
279#[unsafe(no_mangle)]
280pub extern "system" fn Java_com_risingwave_java_binding_Binding_deleteObjects<'a>(
281 env: EnvParam<'a>,
282 dir: JString<'a>,
283) {
284 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
285 let dir = env.get_string(&dir).map_err(|e| anyhow!(e))?;
286 let dir: Cow<'_, str> = (&dir).into();
287
288 let dir = prepend_data_directory(&dir);
291
292 JAVA_BINDING_ASYNC_RUNTIME.block_on(async {
293 let object_store = get_object_store().await;
294 let mut keys = Vec::new();
295 let mut stream = object_store
296 .list(&dir, None, None)
297 .await
298 .map_err(|e| anyhow!(e))?;
299 use futures::StreamExt;
300 while let Some(obj) = stream.next().await {
301 let obj = obj.map_err(|e| anyhow!(e))?;
302 let relative_path =
305 strip_data_directory_prefix(&obj.key).map_err(|e| anyhow!(e))?;
306 if validate_dat_file_extension(&relative_path).is_ok() {
307 keys.push(obj.key);
308 } else {
309 tracing::warn!("Skipping deletion of non-.dat file: {}", obj.key);
310 }
311 }
312 tracing::debug!(?keys, "Deleting schema history files");
313 object_store
314 .delete_objects(&keys)
315 .await
316 .map_err(|e| anyhow!(e))?;
317 Ok::<_, anyhow::Error>(())
318 })?;
319 Ok(())
320 });
321}