risingwave_jni_core/
jvm_runtime.rsuse std::ffi::c_void;
use std::path::PathBuf;
use std::sync::OnceLock;
use anyhow::{bail, Context};
use fs_err as fs;
use fs_err::PathExt;
use jni::objects::{JObject, JString};
use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use thiserror_ext::AsReport;
use tracing::error;
use crate::{call_method, call_static_method};
const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
pub static JVM: JavaVmWrapper = JavaVmWrapper;
static INSTANCE: OnceLock<JavaVM> = OnceLock::new();
pub struct JavaVmWrapper;
impl JavaVmWrapper {
pub fn get_or_init(&self) -> anyhow::Result<&'static JavaVM> {
match INSTANCE.get_or_try_init(Self::inner_new) {
Ok(jvm) => Ok(jvm),
Err(e) => {
error!(error = %e.as_report(), "jvm not initialized properly");
Err(e.context("jvm not initialized properly"))
}
}
}
fn get(&self) -> Option<&'static JavaVM> {
INSTANCE.get()
}
fn locate_libs_path() -> anyhow::Result<PathBuf> {
let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
PathBuf::from(libs_path)
} else {
tracing::info!("environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead");
std::env::current_exe()
.and_then(|p| p.fs_err_canonicalize()) .context("unable to get path of the executable")?
.parent()
.expect("not root")
.join("libs")
};
Ok(libs_path)
}
fn inner_new() -> anyhow::Result<JavaVM> {
let libs_path = Self::locate_libs_path().context("failed to locate connector libs")?;
tracing::info!(path = %libs_path.display(), "located connector libs");
let mut class_vec = vec![];
let entries = fs::read_dir(&libs_path).context("failed to read connector libs")?;
for entry in entries.flatten() {
let entry_path = entry.path();
if entry_path.file_name().is_some() {
let path = fs::canonicalize(entry_path)
.expect("invalid entry_path obtained from fs::read_dir");
class_vec.push(path.to_str().unwrap().to_string());
}
}
let mut new_class_vec = Vec::with_capacity(class_vec.len());
for path in class_vec {
if path.contains("risingwave-source-cdc") {
new_class_vec.insert(0, path.clone());
} else {
new_class_vec.push(path.clone());
}
}
class_vec = new_class_vec;
let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
heap_size
} else {
format!(
"{}",
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
)
};
let args_builder = InitArgsBuilder::new()
.version(JNIVersion::V8)
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));
tracing::info!("JVM args: {:?}", args_builder);
let jvm_args = args_builder.build().context("invalid jvm args")?;
let jvm = match JavaVM::new(jvm_args) {
Err(err) => {
tracing::error!(error = ?err.as_report(), "fail to new JVM");
bail!("fail to new JVM");
}
Ok(jvm) => jvm,
};
tracing::info!("initialize JVM successfully");
let result: std::result::Result<(), jni::errors::Error> = try {
let mut env = jvm_env(&jvm)?;
register_java_binding_native_methods(&mut env)?;
};
result.context("failed to register native method")?;
Ok(jvm)
}
}
pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
jvm.attach_current_thread()
.inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
}
pub fn register_java_binding_native_methods(
env: &mut JNIEnv<'_>,
) -> Result<(), jni::errors::Error> {
let binding_class = env
.find_class(gen_class_name!(com.risingwave.java.binding.Binding))
.inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
use crate::*;
macro_rules! gen_native_method_array {
() => {{
$crate::for_all_native_methods! {gen_native_method_array}
}};
({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
[
$(
$crate::gen_native_method_entry! {
Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
},
)*
]
}
}
env.register_native_methods(binding_class, &gen_native_method_array!())
.inspect_err(
|e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
)?;
tracing::info!("register native methods for jvm successfully");
Ok(())
}
pub fn load_jvm_memory_stats() -> (usize, usize) {
match JVM.get() {
Some(jvm) => {
let result: Result<(usize, usize), anyhow::Error> = try {
execute_with_jni_env(jvm, |env| {
let runtime_instance = crate::call_static_method!(
env,
{Runtime},
{Runtime getRuntime()}
)?;
let total_memory =
call_method!(env, runtime_instance.as_ref(), {long totalMemory()})?;
let free_memory =
call_method!(env, runtime_instance.as_ref(), {long freeMemory()})?;
Ok((total_memory as usize, (total_memory - free_memory) as usize))
})?
};
match result {
Ok(ret) => ret,
Err(e) => {
error!(error = ?e.as_report(), "failed to collect jvm stats");
(0, 0)
}
}
}
_ => (0, 0),
}
}
pub fn execute_with_jni_env<T>(
jvm: &JavaVM,
f: impl FnOnce(&mut JNIEnv<'_>) -> anyhow::Result<T>,
) -> anyhow::Result<T> {
let mut env = jvm
.attach_current_thread()
.with_context(|| "Failed to attach current rust thread to jvm")?;
let thread = crate::call_static_method!(
env,
{Thread},
{Thread currentThread()}
)?;
let system_class_loader = crate::call_static_method!(
env,
{ClassLoader},
{ClassLoader getSystemClassLoader()}
)?;
crate::call_method!(
env,
thread,
{void setContextClassLoader(ClassLoader)},
&system_class_loader
)?;
let ret = f(&mut env);
match env.exception_check() {
Ok(true) => {
env.exception_describe().inspect_err(|e| {
tracing::warn!(error = %e.as_report(), "Failed to describe jvm exception");
})?;
env.exception_clear().inspect_err(|e| {
tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
})?;
}
Ok(false) => {
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Failed to check exception");
}
}
ret
}
pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
if !env.is_instance_of(&obj, "java/lang/String")? {
bail!("Input object is not a java string and can't be converted!")
}
let jstr = JString::from(obj);
let java_str = env.get_string(&jstr)?;
Ok(java_str.to_str()?.to_string())
}
pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
match JVM.get() {
None => Ok(None),
Some(jvm) => execute_with_jni_env(jvm, |env| {
let result = call_static_method!(
env,
{com.risingwave.connector.api.Monitor},
{String dumpStackTrace()}
)
.with_context(|| "Failed to call Java function")?;
let result = JString::from(result);
let result = env
.get_string(&result)
.with_context(|| "Failed to convert JString")?;
let result = result
.to_str()
.with_context(|| "Failed to convert JavaStr")?;
Ok(Some(result.to_string()))
}),
}
}