risingwave_jni_core/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(error_generic_member_access)]
16#![feature(once_cell_try)]
17#![feature(type_alias_impl_trait)]
18#![feature(try_blocks)]
19#![feature(used_with_arg)]
20
21pub mod jvm_runtime;
22mod macros;
23mod opendal_schema_history;
24mod tracing_slf4j;
25
26use std::backtrace::Backtrace;
27use std::marker::PhantomData;
28use std::ops::{Deref, DerefMut};
29use std::slice::from_raw_parts;
30use std::sync::{LazyLock, OnceLock};
31
32use anyhow::anyhow;
33use bytes::Bytes;
34use cfg_or_panic::cfg_or_panic;
35use chrono::{DateTime, Datelike, Timelike};
36use futures::TryStreamExt;
37use futures::stream::BoxStream;
38use jni::JNIEnv;
39use jni::objects::{
40    AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JStaticMethodID, JString,
41    JValueOwned, ReleaseMode,
42};
43use jni::signature::ReturnType;
44use jni::sys::{
45    JNI_FALSE, JNI_TRUE, jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue,
46};
47pub use paste::paste;
48use prost::{DecodeError, Message};
49use risingwave_common::array::{ArrayError, StreamChunk};
50use risingwave_common::hash::VirtualNode;
51use risingwave_common::row::{OwnedRow, Row};
52use risingwave_common::test_prelude::StreamChunkTestExt;
53use risingwave_common::types::{Decimal, ScalarRefImpl};
54use risingwave_common::util::panic::rw_catch_unwind;
55use risingwave_pb::connector_service::{
56    GetEventStreamResponse, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse,
57    SinkWriterStreamRequest, SinkWriterStreamResponse,
58};
59use risingwave_pb::data::Op;
60use thiserror::Error;
61use thiserror_ext::AsReport;
62use tokio::runtime::Runtime;
63use tokio::sync::mpsc::{Receiver, Sender};
64use tracing_slf4j::*;
65
66/// Enable JVM and Java libraries.
67///
68/// This macro forces this crate to be linked, which registers the JVM builder.
69#[macro_export]
70macro_rules! enable {
71    () => {
72        use risingwave_jni_core as _;
73    };
74}
75
76pub static JAVA_BINDING_ASYNC_RUNTIME: LazyLock<Runtime> =
77    LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
78
79#[derive(Error, Debug)]
80pub enum BindingError {
81    #[error("JniError {error}")]
82    Jni {
83        #[from]
84        error: jni::errors::Error,
85        backtrace: Backtrace,
86    },
87
88    #[error("StorageError {error}")]
89    Storage {
90        #[from]
91        error: anyhow::Error,
92        backtrace: Backtrace,
93    },
94
95    #[error("DecodeError {error}")]
96    Decode {
97        #[from]
98        error: DecodeError,
99        backtrace: Backtrace,
100    },
101
102    #[error("StreamChunkArrayError {error}")]
103    StreamChunkArray {
104        #[from]
105        error: ArrayError,
106        backtrace: Backtrace,
107    },
108}
109
110type Result<T> = std::result::Result<T, BindingError>;
111
112pub fn to_guarded_slice<'array, 'env>(
113    array: &'array JByteArray<'env>,
114    env: &'array mut JNIEnv<'env>,
115) -> Result<SliceGuard<'env, 'array>> {
116    unsafe {
117        let array = env.get_array_elements(array, ReleaseMode::NoCopyBack)?;
118        let slice = from_raw_parts(array.as_ptr() as *mut u8, array.len());
119
120        Ok(SliceGuard {
121            _array: array,
122            slice,
123        })
124    }
125}
126
127/// Wrapper around `&[u8]` derived from `jbyteArray` to prevent it from being auto-released.
128pub struct SliceGuard<'env, 'array> {
129    _array: AutoElements<'env, 'env, 'array, jbyte>,
130    slice: &'array [u8],
131}
132
133impl Deref for SliceGuard<'_, '_> {
134    type Target = [u8];
135
136    fn deref(&self) -> &Self::Target {
137        self.slice
138    }
139}
140
141#[repr(transparent)]
142pub struct Pointer<'a, T> {
143    pointer: jlong,
144    _phantom: PhantomData<&'a T>,
145}
146
147impl<T> Default for Pointer<'_, T> {
148    fn default() -> Self {
149        Self {
150            pointer: 0,
151            _phantom: Default::default(),
152        }
153    }
154}
155
156impl<T> From<T> for Pointer<'static, T> {
157    fn from(value: T) -> Self {
158        Pointer {
159            pointer: Box::into_raw(Box::new(value)) as jlong,
160            _phantom: PhantomData,
161        }
162    }
163}
164
165impl<'a, T> Pointer<'a, T> {
166    fn as_ref(&self) -> &'a T {
167        assert!(self.pointer != 0);
168        unsafe { &*(self.pointer as *const T) }
169    }
170
171    fn as_mut(&mut self) -> &'a mut T {
172        assert!(self.pointer != 0);
173        unsafe { &mut *(self.pointer as *mut T) }
174    }
175}
176
177/// A pointer that owns the object it points to.
178///
179/// Note that dropping an `OwnedPointer` does not release the object.
180/// Instead, you should call [`OwnedPointer::release`] manually.
181pub type OwnedPointer<T> = Pointer<'static, T>;
182
183impl<T> OwnedPointer<T> {
184    /// Consume `self` and return the pointer value. Used for passing to JNI.
185    pub fn into_pointer(self) -> jlong {
186        self.pointer
187    }
188
189    /// Release the object behind the pointer.
190    fn release(self) {
191        tracing::debug!(
192            type_name = std::any::type_name::<T>(),
193            address = %format_args!("{:x}", self.pointer),
194            "release jni OwnedPointer"
195        );
196        assert!(self.pointer != 0);
197        unsafe { drop(Box::from_raw(self.pointer as *mut T)) }
198    }
199}
200
201/// In most Jni interfaces, the first parameter is `JNIEnv`, and the second parameter is `JClass`.
202/// This struct simply encapsulates the two common parameters into a single struct for simplicity.
203#[repr(C)]
204pub struct EnvParam<'a> {
205    env: JNIEnv<'a>,
206    class: JClass<'a>,
207}
208
209impl<'a> Deref for EnvParam<'a> {
210    type Target = JNIEnv<'a>;
211
212    fn deref(&self) -> &Self::Target {
213        &self.env
214    }
215}
216
217impl DerefMut for EnvParam<'_> {
218    fn deref_mut(&mut self) -> &mut Self::Target {
219        &mut self.env
220    }
221}
222
223impl<'a> EnvParam<'a> {
224    pub fn get_class(&self) -> &JClass<'a> {
225        &self.class
226    }
227}
228
229pub fn execute_and_catch<'env, F, Ret>(mut env: EnvParam<'env>, inner: F) -> Ret
230where
231    F: FnOnce(&mut EnvParam<'env>) -> Result<Ret>,
232    Ret: Default + 'env,
233{
234    match rw_catch_unwind(std::panic::AssertUnwindSafe(|| inner(&mut env))) {
235        Ok(Ok(ret)) => ret,
236        Ok(Err(e)) => {
237            match e {
238                BindingError::Jni {
239                    error: jni::errors::Error::JavaException,
240                    backtrace,
241                } => {
242                    tracing::error!("get JavaException thrown from: {:?}", backtrace);
243                    // the exception is already thrown. No need to throw again
244                }
245                _ => {
246                    env.throw(format!("get error while processing: {:?}", e.as_report()))
247                        .expect("should be able to throw");
248                }
249            }
250            Ret::default()
251        }
252        Err(e) => {
253            env.throw(format!("panic while processing: {:?}", e))
254                .expect("should be able to throw");
255            Ret::default()
256        }
257    }
258}
259
260#[derive(Default)]
261struct JavaClassMethodCache {
262    big_decimal_ctor: OnceLock<(GlobalRef, JMethodID)>,
263
264    timestamp_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
265    timestamptz_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
266    date_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
267    time_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
268    instant_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
269    utc: OnceLock<GlobalRef>,
270}
271
272mod opaque_type {
273    use super::*;
274    // TODO: may only return a RowRef
275    pub type StreamChunkRowIterator<'a> = impl Iterator<Item = (Op, OwnedRow)> + 'a;
276
277    impl<'a> JavaBindingIteratorInner<'a> {
278        #[define_opaque(StreamChunkRowIterator)]
279        pub(super) fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> {
280            JavaBindingIteratorInner::StreamChunk(
281                chunk
282                    .rows()
283                    .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())),
284            )
285        }
286    }
287}
288pub use opaque_type::StreamChunkRowIterator;
289pub type HummockJavaBindingIterator = BoxStream<'static, anyhow::Result<(Bytes, OwnedRow)>>;
290pub enum JavaBindingIteratorInner<'a> {
291    Hummock(HummockJavaBindingIterator),
292    StreamChunk(StreamChunkRowIterator<'a>),
293}
294
295enum RowExtra {
296    Op(Op),
297    Key(Bytes),
298}
299
300impl RowExtra {
301    fn as_op(&self) -> Op {
302        match self {
303            RowExtra::Op(op) => *op,
304            RowExtra::Key(_) => unreachable!("should be op"),
305        }
306    }
307
308    fn as_key(&self) -> &Bytes {
309        match self {
310            RowExtra::Key(key) => key,
311            RowExtra::Op(_) => unreachable!("should be key"),
312        }
313    }
314}
315
316struct RowCursor {
317    row: OwnedRow,
318    extra: RowExtra,
319}
320
321pub struct JavaBindingIterator<'a> {
322    inner: JavaBindingIteratorInner<'a>,
323    cursor: Option<RowCursor>,
324    class_cache: JavaClassMethodCache,
325}
326
327impl JavaBindingIterator<'static> {
328    pub fn new_hummock_iter(iter: HummockJavaBindingIterator) -> Self {
329        Self {
330            inner: JavaBindingIteratorInner::Hummock(iter),
331            cursor: None,
332            class_cache: Default::default(),
333        }
334    }
335}
336
337impl Deref for JavaBindingIterator<'_> {
338    type Target = OwnedRow;
339
340    fn deref(&self) -> &Self::Target {
341        &self
342            .cursor
343            .as_ref()
344            .expect("should exist when call row methods")
345            .row
346    }
347}
348
349#[unsafe(no_mangle)]
350extern "system" fn Java_com_risingwave_java_binding_Binding_defaultVnodeCount(
351    _env: EnvParam<'_>,
352) -> jint {
353    VirtualNode::COUNT_FOR_COMPAT as jint
354}
355
356#[cfg_or_panic(not(madsim))]
357#[unsafe(no_mangle)]
358extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNewStreamChunk<'a>(
359    env: EnvParam<'a>,
360    chunk: Pointer<'a, StreamChunk>,
361) -> Pointer<'static, JavaBindingIterator<'a>> {
362    execute_and_catch(env, move |_env| {
363        let iter = JavaBindingIterator {
364            inner: JavaBindingIteratorInner::from_chunk(chunk.as_ref()),
365            cursor: None,
366            class_cache: Default::default(),
367        };
368        Ok(iter.into())
369    })
370}
371
372#[cfg_or_panic(not(madsim))]
373#[unsafe(no_mangle)]
374extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>(
375    env: EnvParam<'a>,
376    mut pointer: Pointer<'a, JavaBindingIterator<'a>>,
377) -> jboolean {
378    execute_and_catch(env, move |_env| {
379        let iter = pointer.as_mut();
380        match &mut iter.inner {
381            JavaBindingIteratorInner::Hummock(hummock_iter) => {
382                match JAVA_BINDING_ASYNC_RUNTIME.block_on(hummock_iter.try_next())? {
383                    None => {
384                        iter.cursor = None;
385                        Ok(JNI_FALSE)
386                    }
387                    Some((key, row)) => {
388                        iter.cursor = Some(RowCursor {
389                            row,
390                            extra: RowExtra::Key(key),
391                        });
392                        Ok(JNI_TRUE)
393                    }
394                }
395            }
396            JavaBindingIteratorInner::StreamChunk(stream_chunk_iter) => {
397                match stream_chunk_iter.next() {
398                    None => {
399                        iter.cursor = None;
400                        Ok(JNI_FALSE)
401                    }
402                    Some((op, row)) => {
403                        iter.cursor = Some(RowCursor {
404                            row,
405                            extra: RowExtra::Op(op),
406                        });
407                        Ok(JNI_TRUE)
408                    }
409                }
410            }
411        }
412    })
413}
414
415#[unsafe(no_mangle)]
416extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorClose<'a>(
417    _env: EnvParam<'a>,
418    pointer: OwnedPointer<JavaBindingIterator<'a>>,
419) {
420    pointer.release()
421}
422
423#[unsafe(no_mangle)]
424extern "system" fn Java_com_risingwave_java_binding_Binding_newStreamChunkFromPayload<'a>(
425    env: EnvParam<'a>,
426    stream_chunk_payload: JByteArray<'a>,
427) -> Pointer<'static, StreamChunk> {
428    execute_and_catch(env, move |env| {
429        let prost_stream_chumk =
430            Message::decode(to_guarded_slice(&stream_chunk_payload, env)?.deref())?;
431        Ok(StreamChunk::from_protobuf(&prost_stream_chumk)?.into())
432    })
433}
434
435#[unsafe(no_mangle)]
436extern "system" fn Java_com_risingwave_java_binding_Binding_newStreamChunkFromPretty<'a>(
437    env: EnvParam<'a>,
438    str: JString<'a>,
439) -> Pointer<'static, StreamChunk> {
440    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
441        Ok(StreamChunk::from_pretty(env.get_string(&str)?.to_str().unwrap()).into())
442    })
443}
444
445#[unsafe(no_mangle)]
446extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkClose(
447    _env: EnvParam<'_>,
448    chunk: OwnedPointer<StreamChunk>,
449) {
450    chunk.release()
451}
452
453#[unsafe(no_mangle)]
454extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetKey<'a>(
455    env: EnvParam<'a>,
456    pointer: Pointer<'a, JavaBindingIterator<'a>>,
457) -> JByteArray<'a> {
458    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
459        Ok(env.byte_array_from_slice(
460            pointer
461                .as_ref()
462                .cursor
463                .as_ref()
464                .expect("should exists when call get key")
465                .extra
466                .as_key()
467                .as_ref(),
468        )?)
469    })
470}
471
472#[unsafe(no_mangle)]
473extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetOp<'a>(
474    env: EnvParam<'a>,
475    pointer: Pointer<'a, JavaBindingIterator<'a>>,
476) -> jint {
477    execute_and_catch(env, move |_env| {
478        Ok(pointer
479            .as_ref()
480            .cursor
481            .as_ref()
482            .expect("should exist when call get op")
483            .extra
484            .as_op() as jint)
485    })
486}
487
488#[unsafe(no_mangle)]
489extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorIsNull<'a>(
490    env: EnvParam<'a>,
491    pointer: Pointer<'a, JavaBindingIterator<'a>>,
492    idx: jint,
493) -> jboolean {
494    execute_and_catch(env, move |_env| {
495        Ok(pointer.as_ref().datum_at(idx as usize).is_none() as jboolean)
496    })
497}
498
499#[unsafe(no_mangle)]
500extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt16Value<'a>(
501    env: EnvParam<'a>,
502    pointer: Pointer<'a, JavaBindingIterator<'a>>,
503    idx: jint,
504) -> jshort {
505    execute_and_catch(env, move |_env| {
506        Ok(pointer
507            .as_ref()
508            .datum_at(idx as usize)
509            .unwrap()
510            .into_int16())
511    })
512}
513
514#[unsafe(no_mangle)]
515extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt32Value<'a>(
516    env: EnvParam<'a>,
517    pointer: Pointer<'a, JavaBindingIterator<'a>>,
518    idx: jint,
519) -> jint {
520    execute_and_catch(env, move |_env| {
521        Ok(pointer
522            .as_ref()
523            .datum_at(idx as usize)
524            .unwrap()
525            .into_int32())
526    })
527}
528
529#[unsafe(no_mangle)]
530extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt64Value<'a>(
531    env: EnvParam<'a>,
532    pointer: Pointer<'a, JavaBindingIterator<'a>>,
533    idx: jint,
534) -> jlong {
535    execute_and_catch(env, move |_env| {
536        Ok(pointer
537            .as_ref()
538            .datum_at(idx as usize)
539            .unwrap()
540            .into_int64())
541    })
542}
543
544#[unsafe(no_mangle)]
545extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetFloatValue<'a>(
546    env: EnvParam<'a>,
547    pointer: Pointer<'a, JavaBindingIterator<'a>>,
548    idx: jint,
549) -> jfloat {
550    execute_and_catch(env, move |_env| {
551        Ok(pointer
552            .as_ref()
553            .datum_at(idx as usize)
554            .unwrap()
555            .into_float32()
556            .into())
557    })
558}
559
560#[unsafe(no_mangle)]
561extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDoubleValue<'a>(
562    env: EnvParam<'a>,
563    pointer: Pointer<'a, JavaBindingIterator<'a>>,
564    idx: jint,
565) -> jdouble {
566    execute_and_catch(env, move |_env| {
567        Ok(pointer
568            .as_ref()
569            .datum_at(idx as usize)
570            .unwrap()
571            .into_float64()
572            .into())
573    })
574}
575
576#[unsafe(no_mangle)]
577extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetBooleanValue<'a>(
578    env: EnvParam<'a>,
579    pointer: Pointer<'a, JavaBindingIterator<'a>>,
580    idx: jint,
581) -> jboolean {
582    execute_and_catch(env, move |_env| {
583        Ok(pointer.as_ref().datum_at(idx as usize).unwrap().into_bool() as jboolean)
584    })
585}
586
587#[unsafe(no_mangle)]
588extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetStringValue<'a>(
589    env: EnvParam<'a>,
590    pointer: Pointer<'a, JavaBindingIterator<'a>>,
591    idx: jint,
592) -> JString<'a> {
593    execute_and_catch(env, move |env: &mut EnvParam<'a>| {
594        Ok(env.new_string(pointer.as_ref().datum_at(idx as usize).unwrap().into_utf8())?)
595    })
596}
597
598#[unsafe(no_mangle)]
599extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetIntervalValue<'a>(
600    env: EnvParam<'a>,
601    pointer: Pointer<'a, JavaBindingIterator<'a>>,
602    idx: jint,
603) -> JString<'a> {
604    execute_and_catch(env, move |env: &mut EnvParam<'a>| {
605        let interval = pointer
606            .as_ref()
607            .datum_at(idx as usize)
608            .unwrap()
609            .into_interval()
610            .as_iso_8601();
611        Ok(env.new_string(interval)?)
612    })
613}
614
615#[unsafe(no_mangle)]
616extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetJsonbValue<'a>(
617    env: EnvParam<'a>,
618    pointer: Pointer<'a, JavaBindingIterator<'a>>,
619    idx: jint,
620) -> JString<'a> {
621    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
622        let jsonb = pointer
623            .as_ref()
624            .datum_at(idx as usize)
625            .unwrap()
626            .into_jsonb()
627            .to_string();
628        Ok(env.new_string(jsonb)?)
629    })
630}
631
632#[unsafe(no_mangle)]
633extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimestampValue<'a>(
634    env: EnvParam<'a>,
635    pointer: Pointer<'a, JavaBindingIterator<'a>>,
636    idx: jint,
637) -> JObject<'a> {
638    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
639        let value = pointer
640            .as_ref()
641            .datum_at(idx as usize)
642            .unwrap()
643            .into_timestamp();
644
645        let sig = gen_jni_sig!(java.time.LocalDateTime of(int year, int month, int dayOfMonth, int hour, int minute, int second, int nanoOfSecond));
646
647        let (timestamp_class_ref, constructor) = pointer
648            .as_ref()
649            .class_cache
650            .timestamp_ctor
651            .get_or_try_init(|| {
652                let cls = env.find_class(gen_class_name!(java.time.LocalDateTime))?;
653                let init_method = env.get_static_method_id(&cls, "of", sig)?;
654                Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
655            })?;
656        unsafe {
657            let JValueOwned::Object(timestamp_obj) = env.call_static_method_unchecked(
658                <&JClass<'_>>::from(timestamp_class_ref.as_obj()),
659                *constructor,
660                ReturnType::Object,
661                &[
662                    jvalue { i: value.0.year() },
663                    jvalue {
664                        i: value.0.month() as i32,
665                    },
666                    jvalue {
667                        i: value.0.day() as i32,
668                    },
669                    jvalue {
670                        i: value.0.hour() as i32,
671                    },
672                    jvalue {
673                        i: value.0.minute() as i32,
674                    },
675                    jvalue {
676                        i: value.0.second() as i32,
677                    },
678                    jvalue {
679                        i: value.0.nanosecond() as i32,
680                    },
681                ],
682            )?
683            else {
684                return Err(BindingError::from(jni::errors::Error::MethodNotFound {
685                    name: "of".to_owned(),
686                    sig: sig.into(),
687                }));
688            };
689            Ok(timestamp_obj)
690        }
691    })
692}
693
694#[unsafe(no_mangle)]
695extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimestamptzValue<'a>(
696    env: EnvParam<'a>,
697    pointer: Pointer<'a, JavaBindingIterator<'a>>,
698    idx: jint,
699) -> JObject<'a> {
700    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
701        let value = pointer
702            .as_ref()
703            .datum_at(idx as usize)
704            .unwrap()
705            .into_timestamptz();
706
707        let instant_sig =
708            gen_jni_sig!(java.time.Instant ofEpochSecond(long epochSecond, long nanoAdjustment));
709
710        let (instant_class_ref, instant_constructor) = pointer
711            .as_ref()
712            .class_cache
713            .instant_ctor
714            .get_or_try_init(|| {
715                let cls = env.find_class(gen_class_name!(java.time.Instant))?;
716                let init_method = env.get_static_method_id(&cls, "ofEpochSecond", instant_sig)?;
717                Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
718            })?;
719        let instant_obj = unsafe {
720            let JValueOwned::Object(instant_obj) = env.call_static_method_unchecked(
721                <&JClass<'_>>::from(instant_class_ref.as_obj()),
722                *instant_constructor,
723                ReturnType::Object,
724                &[
725                    jvalue {
726                        j: value.timestamp(),
727                    },
728                    jvalue {
729                        j: value.timestamp_subsec_nanos() as i64,
730                    },
731                ],
732            )?
733            else {
734                return Err(BindingError::from(jni::errors::Error::MethodNotFound {
735                    name: "ofEpochSecond".to_owned(),
736                    sig: instant_sig.into(),
737                }));
738            };
739            instant_obj
740        };
741
742        let utc_ref = pointer.as_ref().class_cache.utc.get_or_try_init(|| {
743            let cls = env.find_class(gen_class_name!(java.time.ZoneOffset))?;
744            let utc = env
745                .get_static_field(&cls, "UTC", gen_jni_type_sig!(java.time.ZoneOffset))?
746                .l()?;
747            env.new_global_ref(utc)
748        })?;
749
750        let sig = gen_jni_sig!(java.time.OffsetDateTime ofInstant(java.time.Instant instant, java.time.ZoneId zone));
751
752        let (timestamptz_class_ref, constructor) = pointer
753            .as_ref()
754            .class_cache
755            .timestamptz_ctor
756            .get_or_try_init(|| {
757                let cls = env.find_class(gen_class_name!(java.time.OffsetDateTime))?;
758                let init_method = env.get_static_method_id(&cls, "ofInstant", sig)?;
759                Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
760            })?;
761        unsafe {
762            let JValueOwned::Object(timestamptz_obj) = env.call_static_method_unchecked(
763                <&JClass<'_>>::from(timestamptz_class_ref.as_obj()),
764                *constructor,
765                ReturnType::Object,
766                &[
767                    jvalue {
768                        l: instant_obj.as_raw(),
769                    },
770                    jvalue {
771                        l: utc_ref.as_obj().as_raw(),
772                    },
773                ],
774            )?
775            else {
776                return Err(BindingError::from(jni::errors::Error::MethodNotFound {
777                    name: "ofInstant".to_owned(),
778                    sig: sig.into(),
779                }));
780            };
781            Ok(timestamptz_obj)
782        }
783    })
784}
785
786#[unsafe(no_mangle)]
787extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDecimalValue<'a>(
788    env: EnvParam<'a>,
789    pointer: Pointer<'a, JavaBindingIterator<'a>>,
790    idx: jint,
791) -> JObject<'a> {
792    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
793        let decimal_value = pointer
794            .as_ref()
795            .datum_at(idx as usize)
796            .unwrap()
797            .into_decimal();
798
799        match decimal_value {
800            Decimal::NaN | Decimal::NegativeInf | Decimal::PositiveInf => {
801                return Ok(JObject::null());
802            }
803            Decimal::Normalized(_) => {}
804        };
805
806        let value = decimal_value.to_string();
807        let string_value = env.new_string(value)?;
808        let (decimal_class_ref, constructor) = pointer
809            .as_ref()
810            .class_cache
811            .big_decimal_ctor
812            .get_or_try_init(|| {
813                let cls = env.find_class("java/math/BigDecimal")?;
814                let init_method = env.get_method_id(&cls, "<init>", "(Ljava/lang/String;)V")?;
815                Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
816            })?;
817        unsafe {
818            let decimal_class = <&JClass<'_>>::from(decimal_class_ref.as_obj());
819            let date_obj = env.new_object_unchecked(
820                decimal_class,
821                *constructor,
822                &[jvalue {
823                    l: string_value.into_raw(),
824                }],
825            )?;
826            Ok(date_obj)
827        }
828    })
829}
830
831#[unsafe(no_mangle)]
832extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDateValue<'a>(
833    env: EnvParam<'a>,
834    pointer: Pointer<'a, JavaBindingIterator<'a>>,
835    idx: jint,
836) -> JObject<'a> {
837    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
838        let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_date();
839        let epoch_days = (value.0 - DateTime::UNIX_EPOCH.date_naive()).num_days();
840
841        let sig = gen_jni_sig!(java.time.LocalDate ofEpochDay(long));
842
843        let (date_class_ref, constructor) =
844            pointer.as_ref().class_cache.date_ctor.get_or_try_init(|| {
845                let cls = env.find_class(gen_class_name!(java.time.LocalDate))?;
846                let init_method = env.get_static_method_id(&cls, "ofEpochDay", sig)?;
847                Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
848            })?;
849        unsafe {
850            let JValueOwned::Object(date_obj) = env.call_static_method_unchecked(
851                <&JClass<'_>>::from(date_class_ref.as_obj()),
852                *constructor,
853                ReturnType::Object,
854                &[jvalue { j: epoch_days }],
855            )?
856            else {
857                return Err(BindingError::from(jni::errors::Error::MethodNotFound {
858                    name: "ofEpochDay".to_owned(),
859                    sig: sig.into(),
860                }));
861            };
862            Ok(date_obj)
863        }
864    })
865}
866
867#[unsafe(no_mangle)]
868extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimeValue<'a>(
869    env: EnvParam<'a>,
870    pointer: Pointer<'a, JavaBindingIterator<'a>>,
871    idx: jint,
872) -> JObject<'a> {
873    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
874        let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_time();
875
876        let sig = gen_jni_sig!(java.time.LocalTime of(int hour, int minute, int second, int nanoOfSecond));
877
878        let (time_class_ref, constructor) =
879            pointer.as_ref().class_cache.time_ctor.get_or_try_init(|| {
880                let cls = env.find_class(gen_class_name!(java.time.LocalTime))?;
881                let init_method = env.get_static_method_id(&cls, "of", sig)?;
882                Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
883            })?;
884        unsafe {
885            let JValueOwned::Object(time_obj) = env.call_static_method_unchecked(
886                <&JClass<'_>>::from(time_class_ref.as_obj()),
887                *constructor,
888                ReturnType::Object,
889                &[
890                    jvalue {
891                        i: value.0.hour() as i32,
892                    },
893                    jvalue {
894                        i: value.0.minute() as i32,
895                    },
896                    jvalue {
897                        i: value.0.second() as i32,
898                    },
899                    jvalue {
900                        i: value.0.nanosecond() as i32,
901                    },
902                ],
903            )?
904            else {
905                return Err(BindingError::from(jni::errors::Error::MethodNotFound {
906                    name: "of".to_owned(),
907                    sig: sig.into(),
908                }));
909            };
910            Ok(time_obj)
911        }
912    })
913}
914
915#[unsafe(no_mangle)]
916extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetByteaValue<'a>(
917    env: EnvParam<'a>,
918    pointer: Pointer<'a, JavaBindingIterator<'a>>,
919    idx: jint,
920) -> JByteArray<'a> {
921    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
922        let bytes = pointer
923            .as_ref()
924            .datum_at(idx as usize)
925            .unwrap()
926            .into_bytea();
927        Ok(env.byte_array_from_slice(bytes)?)
928    })
929}
930
931#[unsafe(no_mangle)]
932extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetArrayValue<'a>(
933    env: EnvParam<'a>,
934    pointer: Pointer<'a, JavaBindingIterator<'a>>,
935    idx: jint,
936    class: JClass<'a>,
937) -> JObject<'a> {
938    execute_and_catch(env, move |env: &mut EnvParam<'_>| {
939        let elems = pointer
940            .as_ref()
941            .datum_at(idx as usize)
942            .unwrap()
943            .into_list()
944            .iter();
945
946        // convert the Rust elements to a Java object array (Object[])
947        let jarray = env.new_object_array(elems.len() as jsize, &class, JObject::null())?;
948
949        for (i, ele) in elems.enumerate() {
950            let index = i as jsize;
951            match ele {
952                None => env.set_object_array_element(&jarray, i as jsize, JObject::null())?,
953                Some(val) => match val {
954                    ScalarRefImpl::Int16(v) => {
955                        let o = call_static_method!(
956                            env,
957                            {Short},
958                            {Short	valueOf(short s)},
959                            v
960                        )?;
961                        env.set_object_array_element(&jarray, index, &o)?;
962                    }
963                    ScalarRefImpl::Int32(v) => {
964                        let o = call_static_method!(
965                            env,
966                            {Integer},
967                            {Integer	valueOf(int i)},
968                            v
969                        )?;
970                        env.set_object_array_element(&jarray, index, &o)?;
971                    }
972                    ScalarRefImpl::Int64(v) => {
973                        let o = call_static_method!(
974                            env,
975                            {Long},
976                            {Long	valueOf(long l)},
977                            v
978                        )?;
979                        env.set_object_array_element(&jarray, index, &o)?;
980                    }
981                    ScalarRefImpl::Float32(v) => {
982                        let o = call_static_method!(
983                            env,
984                            {Float},
985                            {Float	valueOf(float f)},
986                            v.into_inner()
987                        )?;
988                        env.set_object_array_element(&jarray, index, &o)?;
989                    }
990                    ScalarRefImpl::Float64(v) => {
991                        let o = call_static_method!(
992                            env,
993                            {Double},
994                            {Double	valueOf(double d)},
995                            v.into_inner()
996                        )?;
997                        env.set_object_array_element(&jarray, index, &o)?;
998                    }
999                    ScalarRefImpl::Utf8(v) => {
1000                        let obj = env.new_string(v)?;
1001                        env.set_object_array_element(&jarray, index, obj)?
1002                    }
1003                    _ => env.set_object_array_element(&jarray, index, JObject::null())?,
1004                },
1005            }
1006        }
1007        let output = unsafe { JObject::from_raw(jarray.into_raw()) };
1008        Ok(output)
1009    })
1010}
1011
1012pub type JniSenderType<T> = Sender<anyhow::Result<T>>;
1013pub type JniReceiverType<T> = Receiver<T>;
1014
1015/// Send messages to the channel received by `CdcSplitReader`.
1016/// If msg is null, just check whether the channel is closed.
1017/// Return true if sending is successful, otherwise, return false so that caller can stop
1018/// gracefully.
1019#[unsafe(no_mangle)]
1020extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel<'a>(
1021    env: EnvParam<'a>,
1022    channel: Pointer<'a, JniSenderType<GetEventStreamResponse>>,
1023    msg: JByteArray<'a>,
1024) -> jboolean {
1025    execute_and_catch(env, move |env| {
1026        // If msg is null means just check whether channel is closed.
1027        if msg.is_null() {
1028            if channel.as_ref().is_closed() {
1029                return Ok(JNI_FALSE);
1030            } else {
1031                return Ok(JNI_TRUE);
1032            }
1033        }
1034
1035        let get_event_stream_response: GetEventStreamResponse =
1036            Message::decode(to_guarded_slice(&msg, env)?.deref())?;
1037
1038        match channel
1039            .as_ref()
1040            .blocking_send(Ok(get_event_stream_response))
1041        {
1042            Ok(_) => Ok(JNI_TRUE),
1043            Err(e) => {
1044                tracing::info!(error = %e.as_report(), "send error");
1045                Ok(JNI_FALSE)
1046            }
1047        }
1048    })
1049}
1050
1051#[unsafe(no_mangle)]
1052extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceErrorToChannel<'a>(
1053    env: EnvParam<'a>,
1054    channel: Pointer<'a, JniSenderType<GetEventStreamResponse>>,
1055    msg: JString<'a>,
1056) -> jboolean {
1057    execute_and_catch(env, move |env| {
1058        let ret = env.get_string(&msg);
1059        match ret {
1060            Ok(str) => {
1061                let err_msg: String = str.into();
1062                match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) {
1063                    Ok(_) => Ok(JNI_TRUE),
1064                    Err(e) => {
1065                        tracing::info!(error = ?e.as_report(), "send error");
1066                        Ok(JNI_FALSE)
1067                    }
1068                }
1069            }
1070            Err(err) => {
1071                if msg.is_null() {
1072                    tracing::warn!("source error message is null");
1073                    Ok(JNI_TRUE)
1074                } else {
1075                    tracing::error!(error = ?err.as_report(), "source error message should be a java string");
1076                    Ok(JNI_FALSE)
1077                }
1078            }
1079        }
1080    })
1081}
1082
1083#[unsafe(no_mangle)]
1084extern "system" fn Java_com_risingwave_java_binding_Binding_cdcSourceSenderClose(
1085    _env: EnvParam<'_>,
1086    channel: OwnedPointer<JniSenderType<GetEventStreamResponse>>,
1087) {
1088    channel.release();
1089}
1090
1091pub enum JniSinkWriterStreamRequest {
1092    PbRequest(SinkWriterStreamRequest),
1093    Chunk {
1094        epoch: u64,
1095        batch_id: u64,
1096        chunk: StreamChunk,
1097    },
1098}
1099
1100impl From<SinkWriterStreamRequest> for JniSinkWriterStreamRequest {
1101    fn from(value: SinkWriterStreamRequest) -> Self {
1102        Self::PbRequest(value)
1103    }
1104}
1105
1106#[unsafe(no_mangle)]
1107pub extern "system" fn Java_com_risingwave_java_binding_Binding_recvSinkWriterRequestFromChannel<
1108    'a,
1109>(
1110    env: EnvParam<'a>,
1111    mut channel: Pointer<'a, JniReceiverType<JniSinkWriterStreamRequest>>,
1112) -> JObject<'a> {
1113    execute_and_catch(env, move |env| match channel.as_mut().blocking_recv() {
1114        Some(msg) => {
1115            let obj = match msg {
1116                JniSinkWriterStreamRequest::PbRequest(request) => {
1117                    let bytes = env.byte_array_from_slice(&Message::encode_to_vec(&request))?;
1118                    let jobj = JObject::from(bytes);
1119                    call_static_method!(
1120                        env,
1121                        {com.risingwave.java.binding.JniSinkWriterStreamRequest},
1122                        {com.risingwave.java.binding.JniSinkWriterStreamRequest fromSerializedPayload(byte[] payload)},
1123                        &jobj
1124                    )?
1125                }
1126                JniSinkWriterStreamRequest::Chunk {
1127                    epoch,
1128                    batch_id,
1129                    chunk,
1130                } => {
1131                    let pointer = Box::into_raw(Box::new(chunk));
1132                    call_static_method!(
1133                        env,
1134                        {com.risingwave.java.binding.JniSinkWriterStreamRequest},
1135                        {com.risingwave.java.binding.JniSinkWriterStreamRequest fromStreamChunkOwnedPointer(long pointer, long epoch, long batchId)},
1136                        pointer as u64, epoch, batch_id
1137                    )
1138                    .inspect_err(|_| unsafe {
1139                        // release the stream chunk on err
1140                        drop(Box::from_raw(pointer));
1141                    })?
1142                }
1143            };
1144            Ok(obj)
1145        }
1146        None => Ok(JObject::null()),
1147    })
1148}
1149
1150#[unsafe(no_mangle)]
1151pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterResponseToChannel<
1152    'a,
1153>(
1154    env: EnvParam<'a>,
1155    channel: Pointer<'a, JniSenderType<SinkWriterStreamResponse>>,
1156    msg: JByteArray<'a>,
1157) -> jboolean {
1158    execute_and_catch(env, move |env| {
1159        let sink_writer_stream_response: SinkWriterStreamResponse =
1160            Message::decode(to_guarded_slice(&msg, env)?.deref())?;
1161
1162        match channel
1163            .as_ref()
1164            .blocking_send(Ok(sink_writer_stream_response))
1165        {
1166            Ok(_) => Ok(JNI_TRUE),
1167            Err(e) => {
1168                tracing::info!(error = ?e.as_report(), "send error");
1169                Ok(JNI_FALSE)
1170            }
1171        }
1172    })
1173}
1174
1175#[unsafe(no_mangle)]
1176pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterErrorToChannel<'a>(
1177    env: EnvParam<'a>,
1178    channel: Pointer<'a, Sender<anyhow::Result<SinkWriterStreamResponse>>>,
1179    msg: JString<'a>,
1180) -> jboolean {
1181    execute_and_catch(env, move |env| {
1182        let ret = env.get_string(&msg);
1183        match ret {
1184            Ok(str) => {
1185                let err_msg: String = str.into();
1186                match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) {
1187                    Ok(_) => Ok(JNI_TRUE),
1188                    Err(e) => {
1189                        tracing::info!(error = ?e.as_report(), "send error");
1190                        Ok(JNI_FALSE)
1191                    }
1192                }
1193            }
1194            Err(err) => {
1195                if msg.is_null() {
1196                    tracing::warn!("sink error message is null");
1197                    Ok(JNI_TRUE)
1198                } else {
1199                    tracing::error!(error = ?err.as_report(), "sink error message should be a java string");
1200                    Ok(JNI_FALSE)
1201                }
1202            }
1203        }
1204    })
1205}
1206
1207#[unsafe(no_mangle)]
1208pub extern "system" fn Java_com_risingwave_java_binding_Binding_recvSinkCoordinatorRequestFromChannel<
1209    'a,
1210>(
1211    env: EnvParam<'a>,
1212    mut channel: Pointer<'a, JniReceiverType<SinkCoordinatorStreamRequest>>,
1213) -> JByteArray<'a> {
1214    execute_and_catch(env, move |env| match channel.as_mut().blocking_recv() {
1215        Some(msg) => {
1216            let bytes = env
1217                .byte_array_from_slice(&Message::encode_to_vec(&msg))
1218                .unwrap();
1219            Ok(bytes)
1220        }
1221        None => Ok(JObject::null().into()),
1222    })
1223}
1224
1225#[unsafe(no_mangle)]
1226pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkCoordinatorResponseToChannel<
1227    'a,
1228>(
1229    env: EnvParam<'a>,
1230    channel: Pointer<'a, JniSenderType<SinkCoordinatorStreamResponse>>,
1231    msg: JByteArray<'a>,
1232) -> jboolean {
1233    execute_and_catch(env, move |env| {
1234        let sink_coordinator_stream_response: SinkCoordinatorStreamResponse =
1235            Message::decode(to_guarded_slice(&msg, env)?.deref())?;
1236
1237        match channel
1238            .as_ref()
1239            .blocking_send(Ok(sink_coordinator_stream_response))
1240        {
1241            Ok(_) => Ok(JNI_TRUE),
1242            Err(e) => {
1243                tracing::info!(error = ?e.as_report(), "send error");
1244                Ok(JNI_FALSE)
1245            }
1246        }
1247    })
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252    use risingwave_common::types::Timestamptz;
1253
1254    /// make sure that the [`ScalarRefImpl::Int64`] received by
1255    /// [`Java_com_risingwave_java_binding_Binding_iteratorGetTimestampValue`]
1256    /// is of type [`DataType::Timestamptz`] stored in microseconds
1257    #[test]
1258    fn test_timestamptz_to_i64() {
1259        assert_eq!(
1260            "2023-06-01 09:45:00+08:00".parse::<Timestamptz>().unwrap(),
1261            Timestamptz::from_micros(1_685_583_900_000_000)
1262        );
1263    }
1264}