risingwave_jni_core/
jvm_runtime.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::c_void;
16use std::path::PathBuf;
17
18use anyhow::{Context, bail};
19use fs_err as fs;
20use fs_err::PathExt;
21use jni::objects::{JObject, JString};
22use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
23use risingwave_common::global_jvm::{JVM_BUILDER, Jvm, JvmBuilder};
24use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::opendal_schema_history::*;
29use crate::{call_method, call_static_method};
30
31/// Use 10% of compute total memory by default. Compute node uses 0.7 * system memory by default.
32const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
33
34fn locate_libs_path() -> anyhow::Result<PathBuf> {
35    let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
36        PathBuf::from(libs_path)
37    } else {
38        tracing::info!(
39            "environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead"
40        );
41        std::env::current_exe()
42            .and_then(|p| p.fs_err_canonicalize()) // resolve symlink of the current executable
43            .context("unable to get path of the executable")?
44            .parent()
45            .expect("not root")
46            .join("libs")
47    };
48    Ok(libs_path)
49}
50
51fn build_jvm_with_native_registration() -> anyhow::Result<JavaVM> {
52    let libs_path = locate_libs_path().context("failed to locate connector libs")?;
53    tracing::info!(path = %libs_path.display(), "located connector libs");
54
55    let mut class_vec = vec![];
56
57    let entries = fs::read_dir(&libs_path).context(if cfg!(debug_assertions) {
58        "failed to read connector libs; \
59        for RiseDev users, please check if ENABLE_BUILD_RW_CONNECTOR is set with `risedev configure`"
60    } else {
61        "failed to read connector libs, \
62        please check if env var CONNECTOR_LIBS_PATH is correctly configured"
63    })?;
64    for entry in entries.flatten() {
65        let entry_path = entry.path();
66        if entry_path.file_name().is_some() {
67            let path = fs::canonicalize(entry_path)
68                .expect("invalid entry_path obtained from fs::read_dir");
69            class_vec.push(path.to_str().unwrap().to_owned());
70        }
71    }
72
73    // move risingwave-source-cdc to the head of classpath, because we have some patched Debezium classes
74    // in this jar which needs to be loaded first.
75    let mut new_class_vec = Vec::with_capacity(class_vec.len());
76    for path in class_vec {
77        if path.contains("risingwave-source-cdc") {
78            new_class_vec.insert(0, path.clone());
79        } else {
80            new_class_vec.push(path.clone());
81        }
82    }
83    class_vec = new_class_vec;
84
85    let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
86        heap_size
87    } else {
88        format!(
89            "{}",
90            (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
91        )
92    };
93
94    // FIXME: passing custom arguments to the embedded jvm when compute node start
95    // Build the VM properties
96    let args_builder = InitArgsBuilder::new()
97        // Pass the JNI API version (default is 8)
98        .version(JNIVersion::V8)
99        .option("-Dis_embedded_connector=true")
100        .option(format!("-Djava.class.path={}", class_vec.join(":")))
101        .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
102        .option("-Xms16m")
103        .option(format!("-Xmx{}", jvm_heap_size));
104
105    tracing::info!("JVM args: {:?}", args_builder);
106    let jvm_args = args_builder.build().context("invalid jvm args")?;
107
108    // Create a new VM
109    let jvm = match JavaVM::new(jvm_args) {
110        Err(err) => {
111            tracing::error!(error = ?err.as_report(), "fail to new JVM");
112            bail!("fail to new JVM");
113        }
114        Ok(jvm) => jvm,
115    };
116
117    tracing::info!("initialize JVM successfully");
118
119    let result: std::result::Result<(), jni::errors::Error> = try {
120        let mut env = jvm_env(&jvm)?;
121        register_java_binding_native_methods(&mut env)?;
122    };
123
124    result.context("failed to register native method")?;
125
126    Ok(jvm)
127}
128
129pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
130    jvm.attach_current_thread()
131        .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
132}
133
134pub fn register_java_binding_native_methods(
135    env: &mut JNIEnv<'_>,
136) -> Result<(), jni::errors::Error> {
137    let binding_class = env
138        .find_class(gen_class_name!(com.risingwave.java.binding.Binding))
139        .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
140    use crate::*;
141    macro_rules! gen_native_method_array {
142        () => {{
143            $crate::for_all_native_methods! {gen_native_method_array}
144        }};
145        ({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
146            [
147                $(
148                    $crate::gen_native_method_entry! {
149                        Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
150                    },
151                )*
152            ]
153        }
154    }
155    env.register_native_methods(binding_class, &gen_native_method_array!())
156        .inspect_err(
157            |e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
158        )?;
159
160    tracing::info!("register native methods for jvm successfully");
161    Ok(())
162}
163
164/// Load JVM memory statistics from the runtime. If JVM is not initialized or fail to initialize,
165/// return zero.
166pub fn load_jvm_memory_stats() -> (usize, usize) {
167    match Jvm::get() {
168        Some(jvm) => {
169            let result: Result<(usize, usize), anyhow::Error> = try {
170                execute_with_jni_env(jvm, |env| {
171                    let runtime_instance = crate::call_static_method!(
172                        env,
173                        {Runtime},
174                        {Runtime getRuntime()}
175                    )?;
176
177                    let total_memory =
178                        call_method!(env, runtime_instance.as_ref(), {long totalMemory()})?;
179                    let free_memory =
180                        call_method!(env, runtime_instance.as_ref(), {long freeMemory()})?;
181
182                    Ok((total_memory as usize, (total_memory - free_memory) as usize))
183                })?
184            };
185            match result {
186                Ok(ret) => ret,
187                Err(e) => {
188                    error!(error = ?e.as_report(), "failed to collect jvm stats");
189                    (0, 0)
190                }
191            }
192        }
193        _ => (0, 0),
194    }
195}
196
197pub fn execute_with_jni_env<T>(
198    jvm: Jvm,
199    f: impl FnOnce(&mut JNIEnv<'_>) -> anyhow::Result<T>,
200) -> anyhow::Result<T> {
201    let mut env = jvm
202        .attach_current_thread()
203        .with_context(|| "Failed to attach current rust thread to jvm")?;
204
205    // set context class loader for the thread
206    // java.lang.Thread.currentThread()
207    //     .setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
208
209    let thread = crate::call_static_method!(
210        env,
211        {Thread},
212        {Thread currentThread()}
213    )?;
214
215    let system_class_loader = crate::call_static_method!(
216        env,
217        {ClassLoader},
218        {ClassLoader getSystemClassLoader()}
219    )?;
220
221    crate::call_method!(
222        env,
223        thread,
224        {void setContextClassLoader(ClassLoader)},
225        &system_class_loader
226    )?;
227
228    let ret = f(&mut env);
229
230    match env.exception_check() {
231        Ok(true) => {
232            let exception = env.exception_occurred().inspect_err(|e| {
233                tracing::warn!(error = %e.as_report(), "Failed to get jvm exception");
234            })?;
235            env.exception_describe().inspect_err(|e| {
236                tracing::warn!(error = %e.as_report(), "Failed to describe jvm exception");
237            })?;
238            env.exception_clear().inspect_err(|e| {
239                tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
240            })?;
241            let message = call_method!(env, exception, {String getMessage()})?;
242            let message = jobj_to_str(&mut env, message)?;
243            return Err(anyhow::anyhow!("Caught Java Exception: {}", message));
244        }
245        Ok(false) => {
246            // No exception, do nothing
247        }
248        Err(e) => {
249            tracing::warn!(error = %e.as_report(), "Failed to check exception");
250        }
251    }
252
253    ret
254}
255
256/// A helper method to convert an java object to rust string.
257pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
258    if !env.is_instance_of(&obj, "java/lang/String")? {
259        bail!("Input object is not a java string and can't be converted!")
260    }
261    let jstr = JString::from(obj);
262    let java_str = env.get_string(&jstr)?;
263    Ok(java_str.to_str()?.to_owned())
264}
265
266/// Dumps the JVM stack traces.
267///
268/// # Returns
269///
270/// - `Ok(None)` if JVM is not initialized.
271/// - `Ok(Some(String))` if JVM is initialized and stack traces are dumped.
272/// - `Err` if failed to dump stack traces.
273pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
274    match Jvm::get() {
275        None => Ok(None),
276        Some(jvm) => execute_with_jni_env(jvm, |env| {
277            let result = call_static_method!(
278                env,
279                {com.risingwave.connector.api.Monitor},
280                {String dumpStackTrace()}
281            )
282            .with_context(|| "Failed to call Java function")?;
283            let result = JString::from(result);
284            let result = env
285                .get_string(&result)
286                .with_context(|| "Failed to convert JString")?;
287            let result = result
288                .to_str()
289                .with_context(|| "Failed to convert JavaStr")?;
290            Ok(Some(result.to_owned()))
291        }),
292    }
293}
294
295/// Register the JVM initialization closure.
296#[linkme::distributed_slice(JVM_BUILDER)]
297static REGISTERED_JVM_BUILDER: JvmBuilder = build_jvm_with_native_registration;