risingwave_jni_core/
jvm_runtime.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::c_void;
16use std::path::PathBuf;
17
18use anyhow::{Context, bail};
19use fs_err as fs;
20use fs_err::PathExt;
21use jni::objects::{JObject, JString};
22use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
23use risingwave_common::global_jvm::{JVM_BUILDER, Jvm, JvmBuilder};
24use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::opendal_schema_history::*;
29use crate::{call_method, call_static_method};
30
31/// Use 10% of compute total memory by default. Compute node uses 0.7 * system memory by default.
32const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
33
34fn locate_libs_path() -> anyhow::Result<PathBuf> {
35    let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
36        PathBuf::from(libs_path)
37    } else {
38        tracing::info!(
39            "environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead"
40        );
41        std::env::current_exe()
42            .and_then(|p| p.fs_err_canonicalize()) // resolve symlink of the current executable
43            .context("unable to get path of the executable")?
44            .parent()
45            .expect("not root")
46            .join("libs")
47    };
48    Ok(libs_path)
49}
50
51fn build_jvm_with_native_registration() -> anyhow::Result<JavaVM> {
52    let libs_path = locate_libs_path().context("failed to locate connector libs")?;
53    tracing::info!(path = %libs_path.display(), "located connector libs");
54
55    let mut class_vec = vec![];
56
57    let entries = fs::read_dir(&libs_path).context(if cfg!(debug_assertions) {
58        "failed to read connector libs; \
59        for RiseDev users, please check if ENABLE_BUILD_RW_CONNECTOR is set with `risedev configure`"
60    } else {
61        "failed to read connector libs, \
62        please check if env var CONNECTOR_LIBS_PATH is correctly configured"
63    })?;
64    for entry in entries.flatten() {
65        let entry_path = entry.path();
66        if entry_path.file_name().is_some() {
67            let path = fs::canonicalize(entry_path)
68                .expect("invalid entry_path obtained from fs::read_dir");
69            class_vec.push(path.to_str().unwrap().to_owned());
70        }
71    }
72
73    // move risingwave-source-cdc to the head of classpath, because we have some patched Debezium classes
74    // in this jar which needs to be loaded first.
75    let mut new_class_vec = Vec::with_capacity(class_vec.len());
76    for path in class_vec {
77        if path.contains("risingwave-source-cdc") {
78            new_class_vec.insert(0, path.clone());
79        } else {
80            new_class_vec.push(path.clone());
81        }
82    }
83    class_vec = new_class_vec;
84
85    let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
86        heap_size
87    } else {
88        format!(
89            "{}",
90            (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
91        )
92    };
93
94    // FIXME: passing custom arguments to the embedded jvm when compute node start
95    // Build the VM properties
96    let mut args_builder = InitArgsBuilder::new()
97        // Pass the JNI API version (default is 8)
98        .version(JNIVersion::V8)
99        .option("-Dis_embedded_connector=true")
100        .option(format!("-Djava.class.path={}", class_vec.join(":")))
101        .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
102        .option("-Xms16m")
103        .option(format!("-Xmx{}", jvm_heap_size));
104
105    // Exit the embedded JVM on uncaught `OutOfMemoryError` so that the host
106    // compute node process terminates and the cluster recovers via the standard
107    // recovery loop, matching native OOM behavior. Set `JVM_EXIT_ON_OOM=false`
108    // (or `0`) to opt out.
109    let exit_on_oom = std::env::var("JVM_EXIT_ON_OOM")
110        .map(|v| !matches!(v.to_ascii_lowercase().as_str(), "false" | "0" | ""))
111        .unwrap_or(true);
112    if exit_on_oom {
113        args_builder = args_builder.option("-XX:+ExitOnOutOfMemoryError");
114    }
115
116    tracing::info!("JVM args: {:?}", args_builder);
117    let jvm_args = args_builder.build().context("invalid jvm args")?;
118
119    // Create a new VM
120    let jvm = match JavaVM::new(jvm_args) {
121        Err(err) => {
122            tracing::error!(error = ?err.as_report(), "fail to new JVM");
123            bail!("fail to new JVM");
124        }
125        Ok(jvm) => jvm,
126    };
127
128    tracing::info!("initialize JVM successfully");
129
130    let result: std::result::Result<(), jni::errors::Error> = try {
131        let mut env = jvm_env(&jvm)?;
132        register_java_binding_native_methods(&mut env)?;
133    };
134
135    result.context("failed to register native method")?;
136
137    Ok(jvm)
138}
139
140pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
141    jvm.attach_current_thread()
142        .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
143}
144
145pub fn register_java_binding_native_methods(
146    env: &mut JNIEnv<'_>,
147) -> Result<(), jni::errors::Error> {
148    let binding_class = env
149        .find_class(gen_class_name!(com.risingwave.java.binding.Binding))
150        .inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
151    use crate::*;
152    macro_rules! gen_native_method_array {
153        () => {{
154            $crate::for_all_native_methods! {gen_native_method_array}
155        }};
156        ({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
157            [
158                $(
159                    $crate::gen_native_method_entry! {
160                        Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
161                    },
162                )*
163            ]
164        }
165    }
166    env.register_native_methods(binding_class, &gen_native_method_array!())
167        .inspect_err(
168            |e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
169        )?;
170
171    tracing::info!("register native methods for jvm successfully");
172    Ok(())
173}
174
175/// Load JVM memory statistics from the runtime. If JVM is not initialized or fail to initialize,
176/// return zero.
177pub fn load_jvm_memory_stats() -> (usize, usize) {
178    match Jvm::get() {
179        Some(jvm) => {
180            let result: Result<(usize, usize), anyhow::Error> = try {
181                execute_with_jni_env(jvm, |env| {
182                    let runtime_instance = crate::call_static_method!(
183                        env,
184                        {Runtime},
185                        {Runtime getRuntime()}
186                    )?;
187
188                    let total_memory =
189                        call_method!(env, runtime_instance.as_ref(), {long totalMemory()})?;
190                    let free_memory =
191                        call_method!(env, runtime_instance.as_ref(), {long freeMemory()})?;
192
193                    Ok((total_memory as usize, (total_memory - free_memory) as usize))
194                })?
195            };
196            match result {
197                Ok(ret) => ret,
198                Err(e) => {
199                    error!(error = ?e.as_report(), "failed to collect jvm stats");
200                    (0, 0)
201                }
202            }
203        }
204        _ => (0, 0),
205    }
206}
207
208pub fn execute_with_jni_env<T>(
209    jvm: Jvm,
210    f: impl FnOnce(&mut JNIEnv<'_>) -> anyhow::Result<T>,
211) -> anyhow::Result<T> {
212    let mut env = jvm
213        .attach_current_thread()
214        .with_context(|| "Failed to attach current rust thread to jvm")?;
215
216    // set context class loader for the thread
217    // java.lang.Thread.currentThread()
218    //     .setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
219
220    let thread = crate::call_static_method!(
221        env,
222        {Thread},
223        {Thread currentThread()}
224    )?;
225
226    let system_class_loader = crate::call_static_method!(
227        env,
228        {ClassLoader},
229        {ClassLoader getSystemClassLoader()}
230    )?;
231
232    crate::call_method!(
233        env,
234        thread,
235        {void setContextClassLoader(ClassLoader)},
236        &system_class_loader
237    )?;
238
239    let ret = f(&mut env);
240
241    match env.exception_check() {
242        Ok(true) => {
243            let exception = env.exception_occurred().inspect_err(|e| {
244                tracing::warn!(error = %e.as_report(), "Failed to get jvm exception");
245            })?;
246            env.exception_describe().inspect_err(|e| {
247                tracing::warn!(error = %e.as_report(), "Failed to describe jvm exception");
248            })?;
249            env.exception_clear().inspect_err(|e| {
250                tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
251            })?;
252            let message = call_method!(env, exception, {String getMessage()})?;
253            let message = jobj_to_str(&mut env, message)?;
254            return Err(anyhow::anyhow!("Caught Java Exception: {}", message));
255        }
256        Ok(false) => {
257            // No exception, do nothing
258        }
259        Err(e) => {
260            tracing::warn!(error = %e.as_report(), "Failed to check exception");
261        }
262    }
263
264    ret
265}
266
267/// A helper method to convert an java object to rust string.
268pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
269    if !env.is_instance_of(&obj, "java/lang/String")? {
270        bail!("Input object is not a java string and can't be converted!")
271    }
272    let jstr = JString::from(obj);
273    let java_str = env.get_string(&jstr)?;
274    Ok(java_str.to_str()?.to_owned())
275}
276
277/// Dumps the JVM stack traces.
278///
279/// # Returns
280///
281/// - `Ok(None)` if JVM is not initialized.
282/// - `Ok(Some(String))` if JVM is initialized and stack traces are dumped.
283/// - `Err` if failed to dump stack traces.
284pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
285    match Jvm::get() {
286        None => Ok(None),
287        Some(jvm) => execute_with_jni_env(jvm, |env| {
288            let result = call_static_method!(
289                env,
290                {com.risingwave.connector.api.Monitor},
291                {String dumpStackTrace()}
292            )
293            .with_context(|| "Failed to call Java function")?;
294            let result = JString::from(result);
295            let result = env
296                .get_string(&result)
297                .with_context(|| "Failed to convert JString")?;
298            let result = result
299                .to_str()
300                .with_context(|| "Failed to convert JavaStr")?;
301            Ok(Some(result.to_owned()))
302        }),
303    }
304}
305
306/// Register the JVM initialization closure.
307#[linkme::distributed_slice(JVM_BUILDER)]
308static REGISTERED_JVM_BUILDER: JvmBuilder = build_jvm_with_native_registration;