risingwave_jni_core/
jvm_runtime.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::c_void;
16use std::path::PathBuf;
17use std::sync::OnceLock;
18
19use anyhow::{Context, bail};
20use fs_err as fs;
21use fs_err::PathExt;
22use jni::objects::{JObject, JString};
23use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
24use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::{call_method, call_static_method};
29
30/// Use 10% of compute total memory by default. Compute node uses 0.7 * system memory by default.
31const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
32
33pub static JVM: JavaVmWrapper = JavaVmWrapper;
34static INSTANCE: OnceLock<JavaVM> = OnceLock::new();
35
36pub struct JavaVmWrapper;
37
38impl JavaVmWrapper {
39    /// Get the initialized JVM instance. If JVM is not initialized, initialize it first.
40    /// If JVM cannot be initialized, return an error.
41    pub fn get_or_init(&self) -> anyhow::Result<&'static JavaVM> {
42        match INSTANCE.get_or_try_init(Self::inner_new) {
43            Ok(jvm) => Ok(jvm),
44            Err(e) => {
45                error!(error = %e.as_report(), "jvm not initialized properly");
46                Err(e.context("jvm not initialized properly"))
47            }
48        }
49    }
50
51    /// Get the initialized JVM instance. If JVM is not initialized, return None.
52    ///
53    /// Generally `get_or_init` should be preferred.
54    fn get(&self) -> Option<&'static JavaVM> {
55        INSTANCE.get()
56    }
57
58    fn locate_libs_path() -> anyhow::Result<PathBuf> {
59        let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
60            PathBuf::from(libs_path)
61        } else {
62            tracing::info!(
63                "environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead"
64            );
65            std::env::current_exe()
66                .and_then(|p| p.fs_err_canonicalize()) // resolve symlink of the current executable
67                .context("unable to get path of the executable")?
68                .parent()
69                .expect("not root")
70                .join("libs")
71        };
72
73        // No need to validate the path now, as it will be further checked when calling `fs::read_dir` later.
74        Ok(libs_path)
75    }
76
77    fn inner_new() -> anyhow::Result<JavaVM> {
78        let libs_path = Self::locate_libs_path().context("failed to locate connector libs")?;
79        tracing::info!(path = %libs_path.display(), "located connector libs");
80
81        let mut class_vec = vec![];
82
83        let entries = fs::read_dir(&libs_path).context(if cfg!(debug_assertions) {
84            "failed to read connector libs; \
85            for RiseDev users, please check if ENABLE_BUILD_RW_CONNECTOR is set with `risedev configure`
86            "
87        } else {
88            "failed to read connector libs, \
89            please check if env var CONNECTOR_LIBS_PATH is correctly configured"
90        })?;
91        for entry in entries.flatten() {
92            let entry_path = entry.path();
93            if entry_path.file_name().is_some() {
94                let path = fs::canonicalize(entry_path)
95                    .expect("invalid entry_path obtained from fs::read_dir");
96                class_vec.push(path.to_str().unwrap().to_owned());
97            }
98        }
99
100        // move risingwave-source-cdc to the head of classpath, because we have some patched Debezium classes
101        // in this jar which needs to be loaded first.
102        let mut new_class_vec = Vec::with_capacity(class_vec.len());
103        for path in class_vec {
104            if path.contains("risingwave-source-cdc") {
105                new_class_vec.insert(0, path.clone());
106            } else {
107                new_class_vec.push(path.clone());
108            }
109        }
110        class_vec = new_class_vec;
111
112        let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
113            heap_size
114        } else {
115            format!(
116                "{}",
117                (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
118            )
119        };
120
121        // FIXME: passing custom arguments to the embedded jvm when compute node start
122        // Build the VM properties
123        let args_builder = InitArgsBuilder::new()
124            // Pass the JNI API version (default is 8)
125            .version(JNIVersion::V8)
126            .option("-Dis_embedded_connector=true")
127            .option(format!("-Djava.class.path={}", class_vec.join(":")))
128            .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
129            .option("-Xms16m")
130            .option(format!("-Xmx{}", jvm_heap_size));
131
132        tracing::info!("JVM args: {:?}", args_builder);
133        let jvm_args = args_builder.build().context("invalid jvm args")?;
134
135        // Create a new VM
136        let jvm = match JavaVM::new(jvm_args) {
137            Err(err) => {
138                tracing::error!(error = ?err.as_report(), "fail to new JVM");
139                bail!("fail to new JVM");
140            }
141            Ok(jvm) => jvm,
142        };
143
144        tracing::info!("initialize JVM successfully");
145
146        let result: std::result::Result<(), jni::errors::Error> = try {
147            let mut env = jvm_env(&jvm)?;
148            register_java_binding_native_methods(&mut env)?;
149        };
150
151        result.context("failed to register native method")?;
152
153        Ok(jvm)
154    }
155}
156
157pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
158    jvm.attach_current_thread()
159        .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
160}
161
162pub fn register_java_binding_native_methods(
163    env: &mut JNIEnv<'_>,
164) -> Result<(), jni::errors::Error> {
165    let binding_class = env
166        .find_class(gen_class_name!(com.risingwave.java.binding.Binding))
167        .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
168    use crate::*;
169    macro_rules! gen_native_method_array {
170        () => {{
171            $crate::for_all_native_methods! {gen_native_method_array}
172        }};
173        ({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
174            [
175                $(
176                    $crate::gen_native_method_entry! {
177                        Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
178                    },
179                )*
180            ]
181        }
182    }
183    env.register_native_methods(binding_class, &gen_native_method_array!())
184        .inspect_err(
185            |e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
186        )?;
187
188    tracing::info!("register native methods for jvm successfully");
189    Ok(())
190}
191
192/// Load JVM memory statistics from the runtime. If JVM is not initialized or fail to initialize,
193/// return zero.
194pub fn load_jvm_memory_stats() -> (usize, usize) {
195    match JVM.get() {
196        Some(jvm) => {
197            let result: Result<(usize, usize), anyhow::Error> = try {
198                execute_with_jni_env(jvm, |env| {
199                    let runtime_instance = crate::call_static_method!(
200                        env,
201                        {Runtime},
202                        {Runtime getRuntime()}
203                    )?;
204
205                    let total_memory =
206                        call_method!(env, runtime_instance.as_ref(), {long totalMemory()})?;
207                    let free_memory =
208                        call_method!(env, runtime_instance.as_ref(), {long freeMemory()})?;
209
210                    Ok((total_memory as usize, (total_memory - free_memory) as usize))
211                })?
212            };
213            match result {
214                Ok(ret) => ret,
215                Err(e) => {
216                    error!(error = ?e.as_report(), "failed to collect jvm stats");
217                    (0, 0)
218                }
219            }
220        }
221        _ => (0, 0),
222    }
223}
224
225pub fn execute_with_jni_env<T>(
226    jvm: &JavaVM,
227    f: impl FnOnce(&mut JNIEnv<'_>) -> anyhow::Result<T>,
228) -> anyhow::Result<T> {
229    let mut env = jvm
230        .attach_current_thread()
231        .with_context(|| "Failed to attach current rust thread to jvm")?;
232
233    // set context class loader for the thread
234    // java.lang.Thread.currentThread()
235    //     .setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
236
237    let thread = crate::call_static_method!(
238        env,
239        {Thread},
240        {Thread currentThread()}
241    )?;
242
243    let system_class_loader = crate::call_static_method!(
244        env,
245        {ClassLoader},
246        {ClassLoader getSystemClassLoader()}
247    )?;
248
249    crate::call_method!(
250        env,
251        thread,
252        {void setContextClassLoader(ClassLoader)},
253        &system_class_loader
254    )?;
255
256    let ret = f(&mut env);
257
258    match env.exception_check() {
259        Ok(true) => {
260            let exception = env.exception_occurred().inspect_err(|e| {
261                tracing::warn!(error = %e.as_report(), "Failed to get jvm exception");
262            })?;
263            env.exception_describe().inspect_err(|e| {
264                tracing::warn!(error = %e.as_report(), "Failed to describe jvm exception");
265            })?;
266            env.exception_clear().inspect_err(|e| {
267                tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
268            })?;
269            let message = call_method!(env, exception, {String getMessage()})?;
270            let message = jobj_to_str(&mut env, message)?;
271            return Err(anyhow::anyhow!("Caught Java Exception: {}", message));
272        }
273        Ok(false) => {
274            // No exception, do nothing
275        }
276        Err(e) => {
277            tracing::warn!(error = %e.as_report(), "Failed to check exception");
278        }
279    }
280
281    ret
282}
283
284/// A helper method to convert an java object to rust string.
285pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
286    if !env.is_instance_of(&obj, "java/lang/String")? {
287        bail!("Input object is not a java string and can't be converted!")
288    }
289    let jstr = JString::from(obj);
290    let java_str = env.get_string(&jstr)?;
291    Ok(java_str.to_str()?.to_owned())
292}
293
294/// Dumps the JVM stack traces.
295///
296/// # Returns
297///
298/// - `Ok(None)` if JVM is not initialized.
299/// - `Ok(Some(String))` if JVM is initialized and stack traces are dumped.
300/// - `Err` if failed to dump stack traces.
301pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
302    match JVM.get() {
303        None => Ok(None),
304        Some(jvm) => execute_with_jni_env(jvm, |env| {
305            let result = call_static_method!(
306                env,
307                {com.risingwave.connector.api.Monitor},
308                {String dumpStackTrace()}
309            )
310            .with_context(|| "Failed to call Java function")?;
311            let result = JString::from(result);
312            let result = env
313                .get_string(&result)
314                .with_context(|| "Failed to convert JString")?;
315            let result = result
316                .to_str()
317                .with_context(|| "Failed to convert JavaStr")?;
318            Ok(Some(result.to_owned()))
319        }),
320    }
321}