1#![feature(error_generic_member_access)]
16#![feature(once_cell_try)]
17#![feature(type_alias_impl_trait)]
18#![feature(try_blocks)]
19#![feature(used_with_arg)]
20
21pub mod jvm_runtime;
22mod macros;
23mod opendal_schema_history;
24mod tracing_slf4j;
25
26use std::backtrace::Backtrace;
27use std::marker::PhantomData;
28use std::ops::{Deref, DerefMut};
29use std::slice::from_raw_parts;
30use std::sync::{LazyLock, OnceLock};
31
32use anyhow::anyhow;
33use bytes::Bytes;
34use cfg_or_panic::cfg_or_panic;
35use chrono::{DateTime, Datelike, Timelike};
36use futures::TryStreamExt;
37use futures::stream::BoxStream;
38use jni::JNIEnv;
39use jni::objects::{
40 AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JStaticMethodID, JString,
41 JValueOwned, ReleaseMode,
42};
43use jni::signature::ReturnType;
44use jni::sys::{
45 JNI_FALSE, JNI_TRUE, jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue,
46};
47pub use paste::paste;
48use prost::{DecodeError, Message};
49use risingwave_common::array::{ArrayError, StreamChunk};
50use risingwave_common::hash::VirtualNode;
51use risingwave_common::row::{OwnedRow, Row};
52use risingwave_common::test_prelude::StreamChunkTestExt;
53use risingwave_common::types::{Decimal, ScalarRefImpl};
54use risingwave_common::util::panic::rw_catch_unwind;
55use risingwave_pb::connector_service::{
56 GetEventStreamResponse, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse,
57 SinkWriterStreamRequest, SinkWriterStreamResponse,
58};
59use risingwave_pb::data::Op;
60use thiserror::Error;
61use thiserror_ext::AsReport;
62use tokio::runtime::Runtime;
63use tokio::sync::mpsc::{Receiver, Sender};
64use tracing_slf4j::*;
65
66#[macro_export]
70macro_rules! enable {
71 () => {
72 use risingwave_jni_core as _;
73 };
74}
75
76pub static JAVA_BINDING_ASYNC_RUNTIME: LazyLock<Runtime> =
77 LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
78
79#[derive(Error, Debug)]
80pub enum BindingError {
81 #[error("JniError {error}")]
82 Jni {
83 #[from]
84 error: jni::errors::Error,
85 backtrace: Backtrace,
86 },
87
88 #[error("StorageError {error}")]
89 Storage {
90 #[from]
91 error: anyhow::Error,
92 backtrace: Backtrace,
93 },
94
95 #[error("DecodeError {error}")]
96 Decode {
97 #[from]
98 error: DecodeError,
99 backtrace: Backtrace,
100 },
101
102 #[error("StreamChunkArrayError {error}")]
103 StreamChunkArray {
104 #[from]
105 error: ArrayError,
106 backtrace: Backtrace,
107 },
108}
109
110type Result<T> = std::result::Result<T, BindingError>;
111
112pub fn to_guarded_slice<'array, 'env>(
113 array: &'array JByteArray<'env>,
114 env: &'array mut JNIEnv<'env>,
115) -> Result<SliceGuard<'env, 'array>> {
116 unsafe {
117 let array = env.get_array_elements(array, ReleaseMode::NoCopyBack)?;
118 let slice = from_raw_parts(array.as_ptr() as *mut u8, array.len());
119
120 Ok(SliceGuard {
121 _array: array,
122 slice,
123 })
124 }
125}
126
127pub struct SliceGuard<'env, 'array> {
129 _array: AutoElements<'env, 'env, 'array, jbyte>,
130 slice: &'array [u8],
131}
132
133impl Deref for SliceGuard<'_, '_> {
134 type Target = [u8];
135
136 fn deref(&self) -> &Self::Target {
137 self.slice
138 }
139}
140
141#[repr(transparent)]
142pub struct Pointer<'a, T> {
143 pointer: jlong,
144 _phantom: PhantomData<&'a T>,
145}
146
147impl<T> Default for Pointer<'_, T> {
148 fn default() -> Self {
149 Self {
150 pointer: 0,
151 _phantom: Default::default(),
152 }
153 }
154}
155
156impl<T> From<T> for Pointer<'static, T> {
157 fn from(value: T) -> Self {
158 Pointer {
159 pointer: Box::into_raw(Box::new(value)) as jlong,
160 _phantom: PhantomData,
161 }
162 }
163}
164
165impl<'a, T> Pointer<'a, T> {
166 fn as_ref(&self) -> &'a T {
167 assert!(self.pointer != 0);
168 unsafe { &*(self.pointer as *const T) }
169 }
170
171 fn as_mut(&mut self) -> &'a mut T {
172 assert!(self.pointer != 0);
173 unsafe { &mut *(self.pointer as *mut T) }
174 }
175}
176
177pub type OwnedPointer<T> = Pointer<'static, T>;
182
183impl<T> OwnedPointer<T> {
184 pub fn into_pointer(self) -> jlong {
186 self.pointer
187 }
188
189 fn release(self) {
191 tracing::debug!(
192 type_name = std::any::type_name::<T>(),
193 address = %format_args!("{:x}", self.pointer),
194 "release jni OwnedPointer"
195 );
196 assert!(self.pointer != 0);
197 unsafe { drop(Box::from_raw(self.pointer as *mut T)) }
198 }
199}
200
201#[repr(C)]
204pub struct EnvParam<'a> {
205 env: JNIEnv<'a>,
206 class: JClass<'a>,
207}
208
209impl<'a> Deref for EnvParam<'a> {
210 type Target = JNIEnv<'a>;
211
212 fn deref(&self) -> &Self::Target {
213 &self.env
214 }
215}
216
217impl DerefMut for EnvParam<'_> {
218 fn deref_mut(&mut self) -> &mut Self::Target {
219 &mut self.env
220 }
221}
222
223impl<'a> EnvParam<'a> {
224 pub fn get_class(&self) -> &JClass<'a> {
225 &self.class
226 }
227}
228
229pub fn execute_and_catch<'env, F, Ret>(mut env: EnvParam<'env>, inner: F) -> Ret
230where
231 F: FnOnce(&mut EnvParam<'env>) -> Result<Ret>,
232 Ret: Default + 'env,
233{
234 match rw_catch_unwind(std::panic::AssertUnwindSafe(|| inner(&mut env))) {
235 Ok(Ok(ret)) => ret,
236 Ok(Err(e)) => {
237 match e {
238 BindingError::Jni {
239 error: jni::errors::Error::JavaException,
240 backtrace,
241 } => {
242 tracing::error!("get JavaException thrown from: {:?}", backtrace);
243 }
245 _ => {
246 env.throw(format!("get error while processing: {:?}", e.as_report()))
247 .expect("should be able to throw");
248 }
249 }
250 Ret::default()
251 }
252 Err(e) => {
253 env.throw(format!("panic while processing: {:?}", e))
254 .expect("should be able to throw");
255 Ret::default()
256 }
257 }
258}
259
260#[derive(Default)]
261struct JavaClassMethodCache {
262 big_decimal_ctor: OnceLock<(GlobalRef, JMethodID)>,
263
264 timestamp_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
265 timestamptz_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
266 date_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
267 time_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
268 instant_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
269 utc: OnceLock<GlobalRef>,
270}
271
272mod opaque_type {
273 use super::*;
274 pub type StreamChunkRowIterator<'a> = impl Iterator<Item = (Op, OwnedRow)> + 'a;
276
277 impl<'a> JavaBindingIteratorInner<'a> {
278 #[define_opaque(StreamChunkRowIterator)]
279 pub(super) fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> {
280 JavaBindingIteratorInner::StreamChunk(
281 chunk
282 .rows()
283 .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())),
284 )
285 }
286 }
287}
288pub use opaque_type::StreamChunkRowIterator;
289pub type HummockJavaBindingIterator = BoxStream<'static, anyhow::Result<(Bytes, OwnedRow)>>;
290pub enum JavaBindingIteratorInner<'a> {
291 Hummock(HummockJavaBindingIterator),
292 StreamChunk(StreamChunkRowIterator<'a>),
293}
294
295enum RowExtra {
296 Op(Op),
297 Key(Bytes),
298}
299
300impl RowExtra {
301 fn as_op(&self) -> Op {
302 match self {
303 RowExtra::Op(op) => *op,
304 RowExtra::Key(_) => unreachable!("should be op"),
305 }
306 }
307
308 fn as_key(&self) -> &Bytes {
309 match self {
310 RowExtra::Key(key) => key,
311 RowExtra::Op(_) => unreachable!("should be key"),
312 }
313 }
314}
315
316struct RowCursor {
317 row: OwnedRow,
318 extra: RowExtra,
319}
320
321pub struct JavaBindingIterator<'a> {
322 inner: JavaBindingIteratorInner<'a>,
323 cursor: Option<RowCursor>,
324 class_cache: JavaClassMethodCache,
325}
326
327impl JavaBindingIterator<'static> {
328 pub fn new_hummock_iter(iter: HummockJavaBindingIterator) -> Self {
329 Self {
330 inner: JavaBindingIteratorInner::Hummock(iter),
331 cursor: None,
332 class_cache: Default::default(),
333 }
334 }
335}
336
337impl Deref for JavaBindingIterator<'_> {
338 type Target = OwnedRow;
339
340 fn deref(&self) -> &Self::Target {
341 &self
342 .cursor
343 .as_ref()
344 .expect("should exist when call row methods")
345 .row
346 }
347}
348
349#[unsafe(no_mangle)]
350extern "system" fn Java_com_risingwave_java_binding_Binding_defaultVnodeCount(
351 _env: EnvParam<'_>,
352) -> jint {
353 VirtualNode::COUNT_FOR_COMPAT as jint
354}
355
356#[cfg_or_panic(not(madsim))]
357#[unsafe(no_mangle)]
358extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNewStreamChunk<'a>(
359 env: EnvParam<'a>,
360 chunk: Pointer<'a, StreamChunk>,
361) -> Pointer<'static, JavaBindingIterator<'a>> {
362 execute_and_catch(env, move |_env| {
363 let iter = JavaBindingIterator {
364 inner: JavaBindingIteratorInner::from_chunk(chunk.as_ref()),
365 cursor: None,
366 class_cache: Default::default(),
367 };
368 Ok(iter.into())
369 })
370}
371
372#[cfg_or_panic(not(madsim))]
373#[unsafe(no_mangle)]
374extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>(
375 env: EnvParam<'a>,
376 mut pointer: Pointer<'a, JavaBindingIterator<'a>>,
377) -> jboolean {
378 execute_and_catch(env, move |_env| {
379 let iter = pointer.as_mut();
380 match &mut iter.inner {
381 JavaBindingIteratorInner::Hummock(hummock_iter) => {
382 match JAVA_BINDING_ASYNC_RUNTIME.block_on(hummock_iter.try_next())? {
383 None => {
384 iter.cursor = None;
385 Ok(JNI_FALSE)
386 }
387 Some((key, row)) => {
388 iter.cursor = Some(RowCursor {
389 row,
390 extra: RowExtra::Key(key),
391 });
392 Ok(JNI_TRUE)
393 }
394 }
395 }
396 JavaBindingIteratorInner::StreamChunk(stream_chunk_iter) => {
397 match stream_chunk_iter.next() {
398 None => {
399 iter.cursor = None;
400 Ok(JNI_FALSE)
401 }
402 Some((op, row)) => {
403 iter.cursor = Some(RowCursor {
404 row,
405 extra: RowExtra::Op(op),
406 });
407 Ok(JNI_TRUE)
408 }
409 }
410 }
411 }
412 })
413}
414
415#[unsafe(no_mangle)]
416extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorClose<'a>(
417 _env: EnvParam<'a>,
418 pointer: OwnedPointer<JavaBindingIterator<'a>>,
419) {
420 pointer.release()
421}
422
423#[unsafe(no_mangle)]
424extern "system" fn Java_com_risingwave_java_binding_Binding_newStreamChunkFromPayload<'a>(
425 env: EnvParam<'a>,
426 stream_chunk_payload: JByteArray<'a>,
427) -> Pointer<'static, StreamChunk> {
428 execute_and_catch(env, move |env| {
429 let prost_stream_chumk =
430 Message::decode(to_guarded_slice(&stream_chunk_payload, env)?.deref())?;
431 Ok(StreamChunk::from_protobuf(&prost_stream_chumk)?.into())
432 })
433}
434
435#[unsafe(no_mangle)]
436extern "system" fn Java_com_risingwave_java_binding_Binding_newStreamChunkFromPretty<'a>(
437 env: EnvParam<'a>,
438 str: JString<'a>,
439) -> Pointer<'static, StreamChunk> {
440 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
441 Ok(StreamChunk::from_pretty(env.get_string(&str)?.to_str().unwrap()).into())
442 })
443}
444
445#[unsafe(no_mangle)]
446extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkClose(
447 _env: EnvParam<'_>,
448 chunk: OwnedPointer<StreamChunk>,
449) {
450 chunk.release()
451}
452
453#[unsafe(no_mangle)]
454extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetKey<'a>(
455 env: EnvParam<'a>,
456 pointer: Pointer<'a, JavaBindingIterator<'a>>,
457) -> JByteArray<'a> {
458 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
459 Ok(env.byte_array_from_slice(
460 pointer
461 .as_ref()
462 .cursor
463 .as_ref()
464 .expect("should exists when call get key")
465 .extra
466 .as_key()
467 .as_ref(),
468 )?)
469 })
470}
471
472#[unsafe(no_mangle)]
473extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetOp<'a>(
474 env: EnvParam<'a>,
475 pointer: Pointer<'a, JavaBindingIterator<'a>>,
476) -> jint {
477 execute_and_catch(env, move |_env| {
478 Ok(pointer
479 .as_ref()
480 .cursor
481 .as_ref()
482 .expect("should exist when call get op")
483 .extra
484 .as_op() as jint)
485 })
486}
487
488#[unsafe(no_mangle)]
489extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorIsNull<'a>(
490 env: EnvParam<'a>,
491 pointer: Pointer<'a, JavaBindingIterator<'a>>,
492 idx: jint,
493) -> jboolean {
494 execute_and_catch(env, move |_env| {
495 Ok(pointer.as_ref().datum_at(idx as usize).is_none() as jboolean)
496 })
497}
498
499#[unsafe(no_mangle)]
500extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt16Value<'a>(
501 env: EnvParam<'a>,
502 pointer: Pointer<'a, JavaBindingIterator<'a>>,
503 idx: jint,
504) -> jshort {
505 execute_and_catch(env, move |_env| {
506 Ok(pointer
507 .as_ref()
508 .datum_at(idx as usize)
509 .unwrap()
510 .into_int16())
511 })
512}
513
514#[unsafe(no_mangle)]
515extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt32Value<'a>(
516 env: EnvParam<'a>,
517 pointer: Pointer<'a, JavaBindingIterator<'a>>,
518 idx: jint,
519) -> jint {
520 execute_and_catch(env, move |_env| {
521 Ok(pointer
522 .as_ref()
523 .datum_at(idx as usize)
524 .unwrap()
525 .into_int32())
526 })
527}
528
529#[unsafe(no_mangle)]
530extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt64Value<'a>(
531 env: EnvParam<'a>,
532 pointer: Pointer<'a, JavaBindingIterator<'a>>,
533 idx: jint,
534) -> jlong {
535 execute_and_catch(env, move |_env| {
536 Ok(pointer
537 .as_ref()
538 .datum_at(idx as usize)
539 .unwrap()
540 .into_int64())
541 })
542}
543
544#[unsafe(no_mangle)]
545extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetFloatValue<'a>(
546 env: EnvParam<'a>,
547 pointer: Pointer<'a, JavaBindingIterator<'a>>,
548 idx: jint,
549) -> jfloat {
550 execute_and_catch(env, move |_env| {
551 Ok(pointer
552 .as_ref()
553 .datum_at(idx as usize)
554 .unwrap()
555 .into_float32()
556 .into())
557 })
558}
559
560#[unsafe(no_mangle)]
561extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDoubleValue<'a>(
562 env: EnvParam<'a>,
563 pointer: Pointer<'a, JavaBindingIterator<'a>>,
564 idx: jint,
565) -> jdouble {
566 execute_and_catch(env, move |_env| {
567 Ok(pointer
568 .as_ref()
569 .datum_at(idx as usize)
570 .unwrap()
571 .into_float64()
572 .into())
573 })
574}
575
576#[unsafe(no_mangle)]
577extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetBooleanValue<'a>(
578 env: EnvParam<'a>,
579 pointer: Pointer<'a, JavaBindingIterator<'a>>,
580 idx: jint,
581) -> jboolean {
582 execute_and_catch(env, move |_env| {
583 Ok(pointer.as_ref().datum_at(idx as usize).unwrap().into_bool() as jboolean)
584 })
585}
586
587#[unsafe(no_mangle)]
588extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetStringValue<'a>(
589 env: EnvParam<'a>,
590 pointer: Pointer<'a, JavaBindingIterator<'a>>,
591 idx: jint,
592) -> JString<'a> {
593 execute_and_catch(env, move |env: &mut EnvParam<'a>| {
594 Ok(env.new_string(pointer.as_ref().datum_at(idx as usize).unwrap().into_utf8())?)
595 })
596}
597
598#[unsafe(no_mangle)]
599extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetIntervalValue<'a>(
600 env: EnvParam<'a>,
601 pointer: Pointer<'a, JavaBindingIterator<'a>>,
602 idx: jint,
603) -> JString<'a> {
604 execute_and_catch(env, move |env: &mut EnvParam<'a>| {
605 let interval = pointer
606 .as_ref()
607 .datum_at(idx as usize)
608 .unwrap()
609 .into_interval()
610 .as_iso_8601();
611 Ok(env.new_string(interval)?)
612 })
613}
614
615#[unsafe(no_mangle)]
616extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetJsonbValue<'a>(
617 env: EnvParam<'a>,
618 pointer: Pointer<'a, JavaBindingIterator<'a>>,
619 idx: jint,
620) -> JString<'a> {
621 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
622 let jsonb = pointer
623 .as_ref()
624 .datum_at(idx as usize)
625 .unwrap()
626 .into_jsonb()
627 .to_string();
628 Ok(env.new_string(jsonb)?)
629 })
630}
631
632#[unsafe(no_mangle)]
633extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimestampValue<'a>(
634 env: EnvParam<'a>,
635 pointer: Pointer<'a, JavaBindingIterator<'a>>,
636 idx: jint,
637) -> JObject<'a> {
638 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
639 let value = pointer
640 .as_ref()
641 .datum_at(idx as usize)
642 .unwrap()
643 .into_timestamp();
644
645 let sig = gen_jni_sig!(java.time.LocalDateTime of(int year, int month, int dayOfMonth, int hour, int minute, int second, int nanoOfSecond));
646
647 let (timestamp_class_ref, constructor) = pointer
648 .as_ref()
649 .class_cache
650 .timestamp_ctor
651 .get_or_try_init(|| {
652 let cls = env.find_class(gen_class_name!(java.time.LocalDateTime))?;
653 let init_method = env.get_static_method_id(&cls, "of", sig)?;
654 Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
655 })?;
656 unsafe {
657 let JValueOwned::Object(timestamp_obj) = env.call_static_method_unchecked(
658 <&JClass<'_>>::from(timestamp_class_ref.as_obj()),
659 *constructor,
660 ReturnType::Object,
661 &[
662 jvalue { i: value.0.year() },
663 jvalue {
664 i: value.0.month() as i32,
665 },
666 jvalue {
667 i: value.0.day() as i32,
668 },
669 jvalue {
670 i: value.0.hour() as i32,
671 },
672 jvalue {
673 i: value.0.minute() as i32,
674 },
675 jvalue {
676 i: value.0.second() as i32,
677 },
678 jvalue {
679 i: value.0.nanosecond() as i32,
680 },
681 ],
682 )?
683 else {
684 return Err(BindingError::from(jni::errors::Error::MethodNotFound {
685 name: "of".to_owned(),
686 sig: sig.into(),
687 }));
688 };
689 Ok(timestamp_obj)
690 }
691 })
692}
693
694#[unsafe(no_mangle)]
695extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimestamptzValue<'a>(
696 env: EnvParam<'a>,
697 pointer: Pointer<'a, JavaBindingIterator<'a>>,
698 idx: jint,
699) -> JObject<'a> {
700 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
701 let value = pointer
702 .as_ref()
703 .datum_at(idx as usize)
704 .unwrap()
705 .into_timestamptz();
706
707 let instant_sig =
708 gen_jni_sig!(java.time.Instant ofEpochSecond(long epochSecond, long nanoAdjustment));
709
710 let (instant_class_ref, instant_constructor) = pointer
711 .as_ref()
712 .class_cache
713 .instant_ctor
714 .get_or_try_init(|| {
715 let cls = env.find_class(gen_class_name!(java.time.Instant))?;
716 let init_method = env.get_static_method_id(&cls, "ofEpochSecond", instant_sig)?;
717 Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
718 })?;
719 let instant_obj = unsafe {
720 let JValueOwned::Object(instant_obj) = env.call_static_method_unchecked(
721 <&JClass<'_>>::from(instant_class_ref.as_obj()),
722 *instant_constructor,
723 ReturnType::Object,
724 &[
725 jvalue {
726 j: value.timestamp(),
727 },
728 jvalue {
729 j: value.timestamp_subsec_nanos() as i64,
730 },
731 ],
732 )?
733 else {
734 return Err(BindingError::from(jni::errors::Error::MethodNotFound {
735 name: "ofEpochSecond".to_owned(),
736 sig: instant_sig.into(),
737 }));
738 };
739 instant_obj
740 };
741
742 let utc_ref = pointer.as_ref().class_cache.utc.get_or_try_init(|| {
743 let cls = env.find_class(gen_class_name!(java.time.ZoneOffset))?;
744 let utc = env
745 .get_static_field(&cls, "UTC", gen_jni_type_sig!(java.time.ZoneOffset))?
746 .l()?;
747 env.new_global_ref(utc)
748 })?;
749
750 let sig = gen_jni_sig!(java.time.OffsetDateTime ofInstant(java.time.Instant instant, java.time.ZoneId zone));
751
752 let (timestamptz_class_ref, constructor) = pointer
753 .as_ref()
754 .class_cache
755 .timestamptz_ctor
756 .get_or_try_init(|| {
757 let cls = env.find_class(gen_class_name!(java.time.OffsetDateTime))?;
758 let init_method = env.get_static_method_id(&cls, "ofInstant", sig)?;
759 Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
760 })?;
761 unsafe {
762 let JValueOwned::Object(timestamptz_obj) = env.call_static_method_unchecked(
763 <&JClass<'_>>::from(timestamptz_class_ref.as_obj()),
764 *constructor,
765 ReturnType::Object,
766 &[
767 jvalue {
768 l: instant_obj.as_raw(),
769 },
770 jvalue {
771 l: utc_ref.as_obj().as_raw(),
772 },
773 ],
774 )?
775 else {
776 return Err(BindingError::from(jni::errors::Error::MethodNotFound {
777 name: "ofInstant".to_owned(),
778 sig: sig.into(),
779 }));
780 };
781 Ok(timestamptz_obj)
782 }
783 })
784}
785
786#[unsafe(no_mangle)]
787extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDecimalValue<'a>(
788 env: EnvParam<'a>,
789 pointer: Pointer<'a, JavaBindingIterator<'a>>,
790 idx: jint,
791) -> JObject<'a> {
792 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
793 let decimal_value = pointer
794 .as_ref()
795 .datum_at(idx as usize)
796 .unwrap()
797 .into_decimal();
798
799 match decimal_value {
800 Decimal::NaN | Decimal::NegativeInf | Decimal::PositiveInf => {
801 return Ok(JObject::null());
802 }
803 Decimal::Normalized(_) => {}
804 };
805
806 let value = decimal_value.to_string();
807 let string_value = env.new_string(value)?;
808 let (decimal_class_ref, constructor) = pointer
809 .as_ref()
810 .class_cache
811 .big_decimal_ctor
812 .get_or_try_init(|| {
813 let cls = env.find_class("java/math/BigDecimal")?;
814 let init_method = env.get_method_id(&cls, "<init>", "(Ljava/lang/String;)V")?;
815 Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
816 })?;
817 unsafe {
818 let decimal_class = <&JClass<'_>>::from(decimal_class_ref.as_obj());
819 let date_obj = env.new_object_unchecked(
820 decimal_class,
821 *constructor,
822 &[jvalue {
823 l: string_value.into_raw(),
824 }],
825 )?;
826 Ok(date_obj)
827 }
828 })
829}
830
831#[unsafe(no_mangle)]
832extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDateValue<'a>(
833 env: EnvParam<'a>,
834 pointer: Pointer<'a, JavaBindingIterator<'a>>,
835 idx: jint,
836) -> JObject<'a> {
837 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
838 let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_date();
839 let epoch_days = (value.0 - DateTime::UNIX_EPOCH.date_naive()).num_days();
840
841 let sig = gen_jni_sig!(java.time.LocalDate ofEpochDay(long));
842
843 let (date_class_ref, constructor) =
844 pointer.as_ref().class_cache.date_ctor.get_or_try_init(|| {
845 let cls = env.find_class(gen_class_name!(java.time.LocalDate))?;
846 let init_method = env.get_static_method_id(&cls, "ofEpochDay", sig)?;
847 Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
848 })?;
849 unsafe {
850 let JValueOwned::Object(date_obj) = env.call_static_method_unchecked(
851 <&JClass<'_>>::from(date_class_ref.as_obj()),
852 *constructor,
853 ReturnType::Object,
854 &[jvalue { j: epoch_days }],
855 )?
856 else {
857 return Err(BindingError::from(jni::errors::Error::MethodNotFound {
858 name: "ofEpochDay".to_owned(),
859 sig: sig.into(),
860 }));
861 };
862 Ok(date_obj)
863 }
864 })
865}
866
867#[unsafe(no_mangle)]
868extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimeValue<'a>(
869 env: EnvParam<'a>,
870 pointer: Pointer<'a, JavaBindingIterator<'a>>,
871 idx: jint,
872) -> JObject<'a> {
873 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
874 let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_time();
875
876 let sig = gen_jni_sig!(java.time.LocalTime of(int hour, int minute, int second, int nanoOfSecond));
877
878 let (time_class_ref, constructor) =
879 pointer.as_ref().class_cache.time_ctor.get_or_try_init(|| {
880 let cls = env.find_class(gen_class_name!(java.time.LocalTime))?;
881 let init_method = env.get_static_method_id(&cls, "of", sig)?;
882 Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
883 })?;
884 unsafe {
885 let JValueOwned::Object(time_obj) = env.call_static_method_unchecked(
886 <&JClass<'_>>::from(time_class_ref.as_obj()),
887 *constructor,
888 ReturnType::Object,
889 &[
890 jvalue {
891 i: value.0.hour() as i32,
892 },
893 jvalue {
894 i: value.0.minute() as i32,
895 },
896 jvalue {
897 i: value.0.second() as i32,
898 },
899 jvalue {
900 i: value.0.nanosecond() as i32,
901 },
902 ],
903 )?
904 else {
905 return Err(BindingError::from(jni::errors::Error::MethodNotFound {
906 name: "of".to_owned(),
907 sig: sig.into(),
908 }));
909 };
910 Ok(time_obj)
911 }
912 })
913}
914
915#[unsafe(no_mangle)]
916extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetByteaValue<'a>(
917 env: EnvParam<'a>,
918 pointer: Pointer<'a, JavaBindingIterator<'a>>,
919 idx: jint,
920) -> JByteArray<'a> {
921 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
922 let bytes = pointer
923 .as_ref()
924 .datum_at(idx as usize)
925 .unwrap()
926 .into_bytea();
927 Ok(env.byte_array_from_slice(bytes)?)
928 })
929}
930
931#[unsafe(no_mangle)]
932extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetArrayValue<'a>(
933 env: EnvParam<'a>,
934 pointer: Pointer<'a, JavaBindingIterator<'a>>,
935 idx: jint,
936 class: JClass<'a>,
937) -> JObject<'a> {
938 execute_and_catch(env, move |env: &mut EnvParam<'_>| {
939 let elems = pointer
940 .as_ref()
941 .datum_at(idx as usize)
942 .unwrap()
943 .into_list()
944 .iter();
945
946 let jarray = env.new_object_array(elems.len() as jsize, &class, JObject::null())?;
948
949 for (i, ele) in elems.enumerate() {
950 let index = i as jsize;
951 match ele {
952 None => env.set_object_array_element(&jarray, i as jsize, JObject::null())?,
953 Some(val) => match val {
954 ScalarRefImpl::Int16(v) => {
955 let o = call_static_method!(
956 env,
957 {Short},
958 {Short valueOf(short s)},
959 v
960 )?;
961 env.set_object_array_element(&jarray, index, &o)?;
962 }
963 ScalarRefImpl::Int32(v) => {
964 let o = call_static_method!(
965 env,
966 {Integer},
967 {Integer valueOf(int i)},
968 v
969 )?;
970 env.set_object_array_element(&jarray, index, &o)?;
971 }
972 ScalarRefImpl::Int64(v) => {
973 let o = call_static_method!(
974 env,
975 {Long},
976 {Long valueOf(long l)},
977 v
978 )?;
979 env.set_object_array_element(&jarray, index, &o)?;
980 }
981 ScalarRefImpl::Float32(v) => {
982 let o = call_static_method!(
983 env,
984 {Float},
985 {Float valueOf(float f)},
986 v.into_inner()
987 )?;
988 env.set_object_array_element(&jarray, index, &o)?;
989 }
990 ScalarRefImpl::Float64(v) => {
991 let o = call_static_method!(
992 env,
993 {Double},
994 {Double valueOf(double d)},
995 v.into_inner()
996 )?;
997 env.set_object_array_element(&jarray, index, &o)?;
998 }
999 ScalarRefImpl::Utf8(v) => {
1000 let obj = env.new_string(v)?;
1001 env.set_object_array_element(&jarray, index, obj)?
1002 }
1003 _ => env.set_object_array_element(&jarray, index, JObject::null())?,
1004 },
1005 }
1006 }
1007 let output = unsafe { JObject::from_raw(jarray.into_raw()) };
1008 Ok(output)
1009 })
1010}
1011
1012pub type JniSenderType<T> = Sender<anyhow::Result<T>>;
1013pub type JniReceiverType<T> = Receiver<T>;
1014
1015#[unsafe(no_mangle)]
1020extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel<'a>(
1021 env: EnvParam<'a>,
1022 channel: Pointer<'a, JniSenderType<GetEventStreamResponse>>,
1023 msg: JByteArray<'a>,
1024) -> jboolean {
1025 execute_and_catch(env, move |env| {
1026 if msg.is_null() {
1028 if channel.as_ref().is_closed() {
1029 return Ok(JNI_FALSE);
1030 } else {
1031 return Ok(JNI_TRUE);
1032 }
1033 }
1034
1035 let get_event_stream_response: GetEventStreamResponse =
1036 Message::decode(to_guarded_slice(&msg, env)?.deref())?;
1037
1038 match channel
1039 .as_ref()
1040 .blocking_send(Ok(get_event_stream_response))
1041 {
1042 Ok(_) => Ok(JNI_TRUE),
1043 Err(e) => {
1044 tracing::info!(error = %e.as_report(), "send error");
1045 Ok(JNI_FALSE)
1046 }
1047 }
1048 })
1049}
1050
1051#[unsafe(no_mangle)]
1052extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceErrorToChannel<'a>(
1053 env: EnvParam<'a>,
1054 channel: Pointer<'a, JniSenderType<GetEventStreamResponse>>,
1055 msg: JString<'a>,
1056) -> jboolean {
1057 execute_and_catch(env, move |env| {
1058 let ret = env.get_string(&msg);
1059 match ret {
1060 Ok(str) => {
1061 let err_msg: String = str.into();
1062 match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) {
1063 Ok(_) => Ok(JNI_TRUE),
1064 Err(e) => {
1065 tracing::info!(error = ?e.as_report(), "send error");
1066 Ok(JNI_FALSE)
1067 }
1068 }
1069 }
1070 Err(err) => {
1071 if msg.is_null() {
1072 tracing::warn!("source error message is null");
1073 Ok(JNI_TRUE)
1074 } else {
1075 tracing::error!(error = ?err.as_report(), "source error message should be a java string");
1076 Ok(JNI_FALSE)
1077 }
1078 }
1079 }
1080 })
1081}
1082
1083#[unsafe(no_mangle)]
1084extern "system" fn Java_com_risingwave_java_binding_Binding_cdcSourceSenderClose(
1085 _env: EnvParam<'_>,
1086 channel: OwnedPointer<JniSenderType<GetEventStreamResponse>>,
1087) {
1088 channel.release();
1089}
1090
1091pub enum JniSinkWriterStreamRequest {
1092 PbRequest(SinkWriterStreamRequest),
1093 Chunk {
1094 epoch: u64,
1095 batch_id: u64,
1096 chunk: StreamChunk,
1097 },
1098}
1099
1100impl From<SinkWriterStreamRequest> for JniSinkWriterStreamRequest {
1101 fn from(value: SinkWriterStreamRequest) -> Self {
1102 Self::PbRequest(value)
1103 }
1104}
1105
1106#[unsafe(no_mangle)]
1107pub extern "system" fn Java_com_risingwave_java_binding_Binding_recvSinkWriterRequestFromChannel<
1108 'a,
1109>(
1110 env: EnvParam<'a>,
1111 mut channel: Pointer<'a, JniReceiverType<JniSinkWriterStreamRequest>>,
1112) -> JObject<'a> {
1113 execute_and_catch(env, move |env| match channel.as_mut().blocking_recv() {
1114 Some(msg) => {
1115 let obj = match msg {
1116 JniSinkWriterStreamRequest::PbRequest(request) => {
1117 let bytes = env.byte_array_from_slice(&Message::encode_to_vec(&request))?;
1118 let jobj = JObject::from(bytes);
1119 call_static_method!(
1120 env,
1121 {com.risingwave.java.binding.JniSinkWriterStreamRequest},
1122 {com.risingwave.java.binding.JniSinkWriterStreamRequest fromSerializedPayload(byte[] payload)},
1123 &jobj
1124 )?
1125 }
1126 JniSinkWriterStreamRequest::Chunk {
1127 epoch,
1128 batch_id,
1129 chunk,
1130 } => {
1131 let pointer = Box::into_raw(Box::new(chunk));
1132 call_static_method!(
1133 env,
1134 {com.risingwave.java.binding.JniSinkWriterStreamRequest},
1135 {com.risingwave.java.binding.JniSinkWriterStreamRequest fromStreamChunkOwnedPointer(long pointer, long epoch, long batchId)},
1136 pointer as u64, epoch, batch_id
1137 )
1138 .inspect_err(|_| unsafe {
1139 drop(Box::from_raw(pointer));
1141 })?
1142 }
1143 };
1144 Ok(obj)
1145 }
1146 None => Ok(JObject::null()),
1147 })
1148}
1149
1150#[unsafe(no_mangle)]
1151pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterResponseToChannel<
1152 'a,
1153>(
1154 env: EnvParam<'a>,
1155 channel: Pointer<'a, JniSenderType<SinkWriterStreamResponse>>,
1156 msg: JByteArray<'a>,
1157) -> jboolean {
1158 execute_and_catch(env, move |env| {
1159 let sink_writer_stream_response: SinkWriterStreamResponse =
1160 Message::decode(to_guarded_slice(&msg, env)?.deref())?;
1161
1162 match channel
1163 .as_ref()
1164 .blocking_send(Ok(sink_writer_stream_response))
1165 {
1166 Ok(_) => Ok(JNI_TRUE),
1167 Err(e) => {
1168 tracing::info!(error = ?e.as_report(), "send error");
1169 Ok(JNI_FALSE)
1170 }
1171 }
1172 })
1173}
1174
1175#[unsafe(no_mangle)]
1176pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterErrorToChannel<'a>(
1177 env: EnvParam<'a>,
1178 channel: Pointer<'a, Sender<anyhow::Result<SinkWriterStreamResponse>>>,
1179 msg: JString<'a>,
1180) -> jboolean {
1181 execute_and_catch(env, move |env| {
1182 let ret = env.get_string(&msg);
1183 match ret {
1184 Ok(str) => {
1185 let err_msg: String = str.into();
1186 match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) {
1187 Ok(_) => Ok(JNI_TRUE),
1188 Err(e) => {
1189 tracing::info!(error = ?e.as_report(), "send error");
1190 Ok(JNI_FALSE)
1191 }
1192 }
1193 }
1194 Err(err) => {
1195 if msg.is_null() {
1196 tracing::warn!("sink error message is null");
1197 Ok(JNI_TRUE)
1198 } else {
1199 tracing::error!(error = ?err.as_report(), "sink error message should be a java string");
1200 Ok(JNI_FALSE)
1201 }
1202 }
1203 }
1204 })
1205}
1206
1207#[unsafe(no_mangle)]
1208pub extern "system" fn Java_com_risingwave_java_binding_Binding_recvSinkCoordinatorRequestFromChannel<
1209 'a,
1210>(
1211 env: EnvParam<'a>,
1212 mut channel: Pointer<'a, JniReceiverType<SinkCoordinatorStreamRequest>>,
1213) -> JByteArray<'a> {
1214 execute_and_catch(env, move |env| match channel.as_mut().blocking_recv() {
1215 Some(msg) => {
1216 let bytes = env
1217 .byte_array_from_slice(&Message::encode_to_vec(&msg))
1218 .unwrap();
1219 Ok(bytes)
1220 }
1221 None => Ok(JObject::null().into()),
1222 })
1223}
1224
1225#[unsafe(no_mangle)]
1226pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkCoordinatorResponseToChannel<
1227 'a,
1228>(
1229 env: EnvParam<'a>,
1230 channel: Pointer<'a, JniSenderType<SinkCoordinatorStreamResponse>>,
1231 msg: JByteArray<'a>,
1232) -> jboolean {
1233 execute_and_catch(env, move |env| {
1234 let sink_coordinator_stream_response: SinkCoordinatorStreamResponse =
1235 Message::decode(to_guarded_slice(&msg, env)?.deref())?;
1236
1237 match channel
1238 .as_ref()
1239 .blocking_send(Ok(sink_coordinator_stream_response))
1240 {
1241 Ok(_) => Ok(JNI_TRUE),
1242 Err(e) => {
1243 tracing::info!(error = ?e.as_report(), "send error");
1244 Ok(JNI_FALSE)
1245 }
1246 }
1247 })
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use risingwave_common::types::Timestamptz;
1253
1254 #[test]
1258 fn test_timestamptz_to_i64() {
1259 assert_eq!(
1260 "2023-06-01 09:45:00+08:00".parse::<Timestamptz>().unwrap(),
1261 Timestamptz::from_micros(1_685_583_900_000_000)
1262 );
1263 }
1264}