risingwave_jni_core/
jvm_runtime.rs1use std::ffi::c_void;
16use std::path::PathBuf;
17
18use anyhow::{Context, bail};
19use fs_err as fs;
20use fs_err::PathExt;
21use jni::objects::{JObject, JString};
22use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
23use risingwave_common::global_jvm::{JVM_BUILDER, Jvm, JvmBuilder};
24use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::opendal_schema_history::*;
29use crate::{call_method, call_static_method};
30
31const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
33
34fn locate_libs_path() -> anyhow::Result<PathBuf> {
35 let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
36 PathBuf::from(libs_path)
37 } else {
38 tracing::info!(
39 "environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead"
40 );
41 std::env::current_exe()
42 .and_then(|p| p.fs_err_canonicalize()) .context("unable to get path of the executable")?
44 .parent()
45 .expect("not root")
46 .join("libs")
47 };
48 Ok(libs_path)
49}
50
51fn build_jvm_with_native_registration() -> anyhow::Result<JavaVM> {
52 let libs_path = locate_libs_path().context("failed to locate connector libs")?;
53 tracing::info!(path = %libs_path.display(), "located connector libs");
54
55 let mut class_vec = vec![];
56
57 let entries = fs::read_dir(&libs_path).context(if cfg!(debug_assertions) {
58 "failed to read connector libs; \
59 for RiseDev users, please check if ENABLE_BUILD_RW_CONNECTOR is set with `risedev configure`"
60 } else {
61 "failed to read connector libs, \
62 please check if env var CONNECTOR_LIBS_PATH is correctly configured"
63 })?;
64 for entry in entries.flatten() {
65 let entry_path = entry.path();
66 if entry_path.file_name().is_some() {
67 let path = fs::canonicalize(entry_path)
68 .expect("invalid entry_path obtained from fs::read_dir");
69 class_vec.push(path.to_str().unwrap().to_owned());
70 }
71 }
72
73 let mut new_class_vec = Vec::with_capacity(class_vec.len());
76 for path in class_vec {
77 if path.contains("risingwave-source-cdc") {
78 new_class_vec.insert(0, path.clone());
79 } else {
80 new_class_vec.push(path.clone());
81 }
82 }
83 class_vec = new_class_vec;
84
85 let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
86 heap_size
87 } else {
88 format!(
89 "{}",
90 (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
91 )
92 };
93
94 let mut args_builder = InitArgsBuilder::new()
97 .version(JNIVersion::V8)
99 .option("-Dis_embedded_connector=true")
100 .option(format!("-Djava.class.path={}", class_vec.join(":")))
101 .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
102 .option("-Xms16m")
103 .option(format!("-Xmx{}", jvm_heap_size));
104
105 let exit_on_oom = std::env::var("JVM_EXIT_ON_OOM")
110 .map(|v| !matches!(v.to_ascii_lowercase().as_str(), "false" | "0" | ""))
111 .unwrap_or(true);
112 if exit_on_oom {
113 args_builder = args_builder.option("-XX:+ExitOnOutOfMemoryError");
114 }
115
116 tracing::info!("JVM args: {:?}", args_builder);
117 let jvm_args = args_builder.build().context("invalid jvm args")?;
118
119 let jvm = match JavaVM::new(jvm_args) {
121 Err(err) => {
122 tracing::error!(error = ?err.as_report(), "fail to new JVM");
123 bail!("fail to new JVM");
124 }
125 Ok(jvm) => jvm,
126 };
127
128 tracing::info!("initialize JVM successfully");
129
130 let result: std::result::Result<(), jni::errors::Error> = try {
131 let mut env = jvm_env(&jvm)?;
132 register_java_binding_native_methods(&mut env)?;
133 };
134
135 result.context("failed to register native method")?;
136
137 Ok(jvm)
138}
139
140pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
141 jvm.attach_current_thread()
142 .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
143}
144
145pub fn register_java_binding_native_methods(
146 env: &mut JNIEnv<'_>,
147) -> Result<(), jni::errors::Error> {
148 let binding_class = env
149 .find_class(gen_class_name!(com.risingwave.java.binding.Binding))
150 .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
151 use crate::*;
152 macro_rules! gen_native_method_array {
153 () => {{
154 $crate::for_all_native_methods! {gen_native_method_array}
155 }};
156 ({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
157 [
158 $(
159 $crate::gen_native_method_entry! {
160 Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
161 },
162 )*
163 ]
164 }
165 }
166 env.register_native_methods(binding_class, &gen_native_method_array!())
167 .inspect_err(
168 |e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
169 )?;
170
171 tracing::info!("register native methods for jvm successfully");
172 Ok(())
173}
174
175pub fn load_jvm_memory_stats() -> (usize, usize) {
178 match Jvm::get() {
179 Some(jvm) => {
180 let result: Result<(usize, usize), anyhow::Error> = try {
181 execute_with_jni_env(jvm, |env| {
182 let runtime_instance = crate::call_static_method!(
183 env,
184 {Runtime},
185 {Runtime getRuntime()}
186 )?;
187
188 let total_memory =
189 call_method!(env, runtime_instance.as_ref(), {long totalMemory()})?;
190 let free_memory =
191 call_method!(env, runtime_instance.as_ref(), {long freeMemory()})?;
192
193 Ok((total_memory as usize, (total_memory - free_memory) as usize))
194 })?
195 };
196 match result {
197 Ok(ret) => ret,
198 Err(e) => {
199 error!(error = ?e.as_report(), "failed to collect jvm stats");
200 (0, 0)
201 }
202 }
203 }
204 _ => (0, 0),
205 }
206}
207
208pub fn execute_with_jni_env<T>(
209 jvm: Jvm,
210 f: impl FnOnce(&mut JNIEnv<'_>) -> anyhow::Result<T>,
211) -> anyhow::Result<T> {
212 let mut env = jvm
213 .attach_current_thread()
214 .with_context(|| "Failed to attach current rust thread to jvm")?;
215
216 let thread = crate::call_static_method!(
221 env,
222 {Thread},
223 {Thread currentThread()}
224 )?;
225
226 let system_class_loader = crate::call_static_method!(
227 env,
228 {ClassLoader},
229 {ClassLoader getSystemClassLoader()}
230 )?;
231
232 crate::call_method!(
233 env,
234 thread,
235 {void setContextClassLoader(ClassLoader)},
236 &system_class_loader
237 )?;
238
239 let ret = f(&mut env);
240
241 match env.exception_check() {
242 Ok(true) => {
243 let exception = env.exception_occurred().inspect_err(|e| {
244 tracing::warn!(error = %e.as_report(), "Failed to get jvm exception");
245 })?;
246 env.exception_describe().inspect_err(|e| {
247 tracing::warn!(error = %e.as_report(), "Failed to describe jvm exception");
248 })?;
249 env.exception_clear().inspect_err(|e| {
250 tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
251 })?;
252 let message = call_method!(env, exception, {String getMessage()})?;
253 let message = jobj_to_str(&mut env, message)?;
254 return Err(anyhow::anyhow!("Caught Java Exception: {}", message));
255 }
256 Ok(false) => {
257 }
259 Err(e) => {
260 tracing::warn!(error = %e.as_report(), "Failed to check exception");
261 }
262 }
263
264 ret
265}
266
267pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
269 if !env.is_instance_of(&obj, "java/lang/String")? {
270 bail!("Input object is not a java string and can't be converted!")
271 }
272 let jstr = JString::from(obj);
273 let java_str = env.get_string(&jstr)?;
274 Ok(java_str.to_str()?.to_owned())
275}
276
277pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
285 match Jvm::get() {
286 None => Ok(None),
287 Some(jvm) => execute_with_jni_env(jvm, |env| {
288 let result = call_static_method!(
289 env,
290 {com.risingwave.connector.api.Monitor},
291 {String dumpStackTrace()}
292 )
293 .with_context(|| "Failed to call Java function")?;
294 let result = JString::from(result);
295 let result = env
296 .get_string(&result)
297 .with_context(|| "Failed to convert JString")?;
298 let result = result
299 .to_str()
300 .with_context(|| "Failed to convert JavaStr")?;
301 Ok(Some(result.to_owned()))
302 }),
303 }
304}
305
306#[linkme::distributed_slice(JVM_BUILDER)]
308static REGISTERED_JVM_BUILDER: JvmBuilder = build_jvm_with_native_registration;