risingwave_jni_core/
jvm_runtime.rs1use std::ffi::c_void;
16use std::path::PathBuf;
17use std::sync::OnceLock;
18
19use anyhow::{Context, bail};
20use fs_err as fs;
21use fs_err::PathExt;
22use jni::objects::{JObject, JString};
23use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
24use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::{call_method, call_static_method};
29
30const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
32
33pub static JVM: JavaVmWrapper = JavaVmWrapper;
34static INSTANCE: OnceLock<JavaVM> = OnceLock::new();
35
36pub struct JavaVmWrapper;
37
38impl JavaVmWrapper {
39 pub fn get_or_init(&self) -> anyhow::Result<&'static JavaVM> {
42 match INSTANCE.get_or_try_init(Self::inner_new) {
43 Ok(jvm) => Ok(jvm),
44 Err(e) => {
45 error!(error = %e.as_report(), "jvm not initialized properly");
46 Err(e.context("jvm not initialized properly"))
47 }
48 }
49 }
50
51 fn get(&self) -> Option<&'static JavaVM> {
55 INSTANCE.get()
56 }
57
58 fn locate_libs_path() -> anyhow::Result<PathBuf> {
59 let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
60 PathBuf::from(libs_path)
61 } else {
62 tracing::info!(
63 "environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead"
64 );
65 std::env::current_exe()
66 .and_then(|p| p.fs_err_canonicalize()) .context("unable to get path of the executable")?
68 .parent()
69 .expect("not root")
70 .join("libs")
71 };
72
73 Ok(libs_path)
75 }
76
77 fn inner_new() -> anyhow::Result<JavaVM> {
78 let libs_path = Self::locate_libs_path().context("failed to locate connector libs")?;
79 tracing::info!(path = %libs_path.display(), "located connector libs");
80
81 let mut class_vec = vec![];
82
83 let entries = fs::read_dir(&libs_path).context(if cfg!(debug_assertions) {
84 "failed to read connector libs; \
85 for RiseDev users, please check if ENABLE_BUILD_RW_CONNECTOR is set with `risedev configure`
86 "
87 } else {
88 "failed to read connector libs, \
89 please check if env var CONNECTOR_LIBS_PATH is correctly configured"
90 })?;
91 for entry in entries.flatten() {
92 let entry_path = entry.path();
93 if entry_path.file_name().is_some() {
94 let path = fs::canonicalize(entry_path)
95 .expect("invalid entry_path obtained from fs::read_dir");
96 class_vec.push(path.to_str().unwrap().to_owned());
97 }
98 }
99
100 let mut new_class_vec = Vec::with_capacity(class_vec.len());
103 for path in class_vec {
104 if path.contains("risingwave-source-cdc") {
105 new_class_vec.insert(0, path.clone());
106 } else {
107 new_class_vec.push(path.clone());
108 }
109 }
110 class_vec = new_class_vec;
111
112 let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
113 heap_size
114 } else {
115 format!(
116 "{}",
117 (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
118 )
119 };
120
121 let args_builder = InitArgsBuilder::new()
124 .version(JNIVersion::V8)
126 .option("-Dis_embedded_connector=true")
127 .option(format!("-Djava.class.path={}", class_vec.join(":")))
128 .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
129 .option("-Xms16m")
130 .option(format!("-Xmx{}", jvm_heap_size));
131
132 tracing::info!("JVM args: {:?}", args_builder);
133 let jvm_args = args_builder.build().context("invalid jvm args")?;
134
135 let jvm = match JavaVM::new(jvm_args) {
137 Err(err) => {
138 tracing::error!(error = ?err.as_report(), "fail to new JVM");
139 bail!("fail to new JVM");
140 }
141 Ok(jvm) => jvm,
142 };
143
144 tracing::info!("initialize JVM successfully");
145
146 let result: std::result::Result<(), jni::errors::Error> = try {
147 let mut env = jvm_env(&jvm)?;
148 register_java_binding_native_methods(&mut env)?;
149 };
150
151 result.context("failed to register native method")?;
152
153 Ok(jvm)
154 }
155}
156
157pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
158 jvm.attach_current_thread()
159 .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
160}
161
162pub fn register_java_binding_native_methods(
163 env: &mut JNIEnv<'_>,
164) -> Result<(), jni::errors::Error> {
165 let binding_class = env
166 .find_class(gen_class_name!(com.risingwave.java.binding.Binding))
167 .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
168 use crate::*;
169 macro_rules! gen_native_method_array {
170 () => {{
171 $crate::for_all_native_methods! {gen_native_method_array}
172 }};
173 ({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
174 [
175 $(
176 $crate::gen_native_method_entry! {
177 Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
178 },
179 )*
180 ]
181 }
182 }
183 env.register_native_methods(binding_class, &gen_native_method_array!())
184 .inspect_err(
185 |e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
186 )?;
187
188 tracing::info!("register native methods for jvm successfully");
189 Ok(())
190}
191
192pub fn load_jvm_memory_stats() -> (usize, usize) {
195 match JVM.get() {
196 Some(jvm) => {
197 let result: Result<(usize, usize), anyhow::Error> = try {
198 execute_with_jni_env(jvm, |env| {
199 let runtime_instance = crate::call_static_method!(
200 env,
201 {Runtime},
202 {Runtime getRuntime()}
203 )?;
204
205 let total_memory =
206 call_method!(env, runtime_instance.as_ref(), {long totalMemory()})?;
207 let free_memory =
208 call_method!(env, runtime_instance.as_ref(), {long freeMemory()})?;
209
210 Ok((total_memory as usize, (total_memory - free_memory) as usize))
211 })?
212 };
213 match result {
214 Ok(ret) => ret,
215 Err(e) => {
216 error!(error = ?e.as_report(), "failed to collect jvm stats");
217 (0, 0)
218 }
219 }
220 }
221 _ => (0, 0),
222 }
223}
224
225pub fn execute_with_jni_env<T>(
226 jvm: &JavaVM,
227 f: impl FnOnce(&mut JNIEnv<'_>) -> anyhow::Result<T>,
228) -> anyhow::Result<T> {
229 let mut env = jvm
230 .attach_current_thread()
231 .with_context(|| "Failed to attach current rust thread to jvm")?;
232
233 let thread = crate::call_static_method!(
238 env,
239 {Thread},
240 {Thread currentThread()}
241 )?;
242
243 let system_class_loader = crate::call_static_method!(
244 env,
245 {ClassLoader},
246 {ClassLoader getSystemClassLoader()}
247 )?;
248
249 crate::call_method!(
250 env,
251 thread,
252 {void setContextClassLoader(ClassLoader)},
253 &system_class_loader
254 )?;
255
256 let ret = f(&mut env);
257
258 match env.exception_check() {
259 Ok(true) => {
260 let exception = env.exception_occurred().inspect_err(|e| {
261 tracing::warn!(error = %e.as_report(), "Failed to get jvm exception");
262 })?;
263 env.exception_describe().inspect_err(|e| {
264 tracing::warn!(error = %e.as_report(), "Failed to describe jvm exception");
265 })?;
266 env.exception_clear().inspect_err(|e| {
267 tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
268 })?;
269 let message = call_method!(env, exception, {String getMessage()})?;
270 let message = jobj_to_str(&mut env, message)?;
271 return Err(anyhow::anyhow!("Caught Java Exception: {}", message));
272 }
273 Ok(false) => {
274 }
276 Err(e) => {
277 tracing::warn!(error = %e.as_report(), "Failed to check exception");
278 }
279 }
280
281 ret
282}
283
284pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
286 if !env.is_instance_of(&obj, "java/lang/String")? {
287 bail!("Input object is not a java string and can't be converted!")
288 }
289 let jstr = JString::from(obj);
290 let java_str = env.get_string(&jstr)?;
291 Ok(java_str.to_str()?.to_owned())
292}
293
294pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
302 match JVM.get() {
303 None => Ok(None),
304 Some(jvm) => execute_with_jni_env(jvm, |env| {
305 let result = call_static_method!(
306 env,
307 {com.risingwave.connector.api.Monitor},
308 {String dumpStackTrace()}
309 )
310 .with_context(|| "Failed to call Java function")?;
311 let result = JString::from(result);
312 let result = env
313 .get_string(&result)
314 .with_context(|| "Failed to convert JString")?;
315 let result = result
316 .to_str()
317 .with_context(|| "Failed to convert JavaStr")?;
318 Ok(Some(result.to_owned()))
319 }),
320 }
321}