1use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use phf::phf_set;
29use prost::Message;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::bail;
32use risingwave_common::catalog::{ColumnDesc, ColumnId};
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env};
36use risingwave_jni_core::{
37 JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41 Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44 PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45 SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46 ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47 sink_writer_stream_response,
48};
49use risingwave_rpc_client::error::RpcError;
50use risingwave_rpc_client::{
51 BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
52 SinkWriterStreamHandle,
53};
54use rw_futures_util::drop_either_future;
55use sea_orm::DatabaseConnection;
56use thiserror_ext::AsReport;
57use tokio::sync::mpsc;
58use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
59use tokio::task::spawn_blocking;
60use tokio_stream::wrappers::ReceiverStream;
61use tracing::warn;
62
63use super::SinkCommittedEpochSubscriber;
64use super::elasticsearch_opensearch::elasticsearch_converter::{
65 StreamChunkConverter, is_remote_es_sink,
66};
67use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
68use crate::connector_common::IcebergSinkCompactionUpdate;
69use crate::enforce_secret::EnforceSecret;
70use crate::error::ConnectorResult;
71use crate::sink::coordinate::CoordinatedLogSinker;
72use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
73use crate::sink::writer::SinkWriter;
74use crate::sink::{
75 DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
76 SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
77};
78
79macro_rules! def_remote_sink {
80 () => {
81 def_remote_sink! {
82 { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
86 { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
87 }
88 };
89 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
90 #[derive(Debug)]
91 pub struct $variant_name;
92 impl RemoteSinkTrait for $variant_name {
93 const SINK_NAME: &'static str = $sink_name;
94 }
95 impl EnforceSecret for $variant_name {
96 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
97 $($enforce_secret_prop),*
98 };
99 }
100 pub type $sink_type_name = RemoteSink<$variant_name>;
101 };
102 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
103 #[derive(Debug)]
104 pub struct $variant_name;
105 impl RemoteSinkTrait for $variant_name {
106 const SINK_NAME: &'static str = $sink_name;
107 fn default_sink_decouple($desc: &SinkDesc) -> bool {
108 $body
109 }
110 }
111 impl EnforceSecret for $variant_name {
112 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
113 $($enforce_secret_prop),*
114 };
115 }
116 pub type $sink_type_name = RemoteSink<$variant_name>;
117 };
118 ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
119 def_remote_sink! {
120 {$($first)+}
121 }
122 def_remote_sink! {
123 $({$($rest)+})*
124 }
125 };
126 ($($invalid:tt)*) => {
127 compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
128 }
129}
130
131def_remote_sink!();
132
133pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
134 const SINK_NAME: &'static str;
135 fn default_sink_decouple() -> bool {
136 true
137 }
138}
139
140#[derive(Debug)]
141pub struct RemoteSink<R: RemoteSinkTrait> {
142 param: SinkParam,
143 _phantom: PhantomData<R>,
144}
145
146impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
147 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
148}
149
150impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
151 type Error = SinkError;
152
153 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
154 Ok(Self {
155 param,
156 _phantom: PhantomData,
157 })
158 }
159}
160
161impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
162 type Coordinator = DummySinkCommitCoordinator;
163 type LogSinker = RemoteLogSinker;
164
165 const SINK_NAME: &'static str = R::SINK_NAME;
166
167 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
168 match user_specified {
169 SinkDecouple::Default => Ok(R::default_sink_decouple()),
170 SinkDecouple::Enable => Ok(true),
171 SinkDecouple::Disable => Ok(false),
172 }
173 }
174
175 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
176 RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
177 }
178
179 async fn validate(&self) -> Result<()> {
180 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
181 Ok(())
182 }
183}
184
185async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
186 if is_remote_es_sink(sink_name)
192 && param.downstream_pk.len() > 1
193 && !param.properties.contains_key(ES_OPTION_DELIMITER)
194 {
195 bail!("Es sink only supports single pk or pk with delimiter option");
196 }
197 param.columns.iter().try_for_each(|col| {
199 match &col.data_type {
200 DataType::Int16
201 | DataType::Int32
202 | DataType::Int64
203 | DataType::Float32
204 | DataType::Float64
205 | DataType::Boolean
206 | DataType::Decimal
207 | DataType::Timestamp
208 | DataType::Timestamptz
209 | DataType::Varchar
210 | DataType::Date
211 | DataType::Time
212 | DataType::Interval
213 | DataType::Jsonb
214 | DataType::Bytea => Ok(()),
215 DataType::List(list) => {
216 if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
217 Ok(())
218 } else{
219 Err(SinkError::Remote(anyhow!(
220 "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
221 col.name,
222 col.data_type,
223 )))
224 }
225 },
226 DataType::Struct(_) => {
227 if is_remote_es_sink(sink_name){
228 Ok(())
229 }else{
230 Err(SinkError::Remote(anyhow!(
231 "Only Es sink supports struct, got {:?}: {:?}",
232 col.name,
233 col.data_type,
234 )))
235 }
236 },
237 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
238 DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
239 "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
240 col.name,
241 col.data_type,
242 )))}})?;
243
244 let jvm = JVM.get_or_init()?;
245 let sink_param = param.to_proto();
246
247 spawn_blocking(move || -> anyhow::Result<()> {
248 execute_with_jni_env(jvm, |env| {
249 let validate_sink_request = ValidateSinkRequest {
250 sink_param: Some(sink_param),
251 };
252 let validate_sink_request_bytes =
253 env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
254
255 let validate_sink_response_bytes = call_static_method!(
256 env,
257 {com.risingwave.connector.JniSinkValidationHandler},
258 {byte[] validate(byte[] validateSourceRequestBytes)},
259 &validate_sink_request_bytes
260 )?;
261
262 let validate_sink_response: ValidateSinkResponse = Message::decode(
263 risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
264 )?;
265
266 validate_sink_response.error.map_or_else(
267 || Ok(()), |err| bail!("sink cannot pass validation: {}", err.error_message),
269 )
270 })
271 })
272 .await
273 .context("JoinHandle returns error")??;
274
275 Ok(())
276}
277
278pub struct RemoteLogSinker {
279 request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
280 response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
281 stream_chunk_converter: StreamChunkConverter,
282 sink_writer_metrics: SinkWriterMetrics,
283}
284
285impl RemoteLogSinker {
286 async fn new(
287 sink_param: SinkParam,
288 writer_param: SinkWriterParam,
289 sink_name: &str,
290 ) -> Result<Self> {
291 let sink_proto = sink_param.to_proto();
292 let payload_schema = if is_remote_es_sink(sink_name) {
293 let columns = vec![
294 ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
295 ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
296 ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
297 ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
298 ];
299 Some(TableSchema {
300 columns,
301 pk_indices: vec![],
302 })
303 } else {
304 sink_proto.table_schema.clone()
305 };
306
307 let SinkWriterStreamHandle {
308 request_sender,
309 response_stream,
310 } = EmbeddedConnectorClient::new()?
311 .start_sink_writer_stream(payload_schema, sink_proto)
312 .await?;
313
314 Ok(RemoteLogSinker {
315 request_sender,
316 response_stream,
317 sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
318 stream_chunk_converter: StreamChunkConverter::new(
319 sink_name,
320 sink_param.schema(),
321 &sink_param.downstream_pk,
322 &sink_param.properties,
323 sink_param.sink_type.is_append_only(),
324 )?,
325 })
326 }
327}
328
329#[async_trait]
330impl LogSinker for RemoteLogSinker {
331 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
332 log_reader.start_from(None).await?;
333 let mut request_tx = self.request_sender;
334 let mut response_err_stream_rx = self.response_stream;
335 let sink_writer_metrics = self.sink_writer_metrics;
336
337 let (response_tx, mut response_rx) = unbounded_channel();
338
339 let poll_response_stream = async move {
340 loop {
341 let result = response_err_stream_rx
342 .stream
343 .try_next()
344 .instrument_await("log_sinker_wait_next_response")
345 .await;
346 match result {
347 Ok(Some(response)) => {
348 response_tx.send(response).map_err(|err| {
349 SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
350 })?;
351 }
352 Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
353 Err(e) => return Err(SinkError::Remote(anyhow!(e))),
354 }
355 }
356 };
357
358 let poll_consume_log_and_sink = async move {
359 fn truncate_matched_offset(
360 queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
361 persisted_offset: TruncateOffset,
362 log_reader: &mut impl SinkLogReader,
363 sink_writer_metrics: &SinkWriterMetrics,
364 ) -> Result<()> {
365 while let Some((sent_offset, _)) = queue.front()
366 && sent_offset < &persisted_offset
367 {
368 queue.pop_front();
369 }
370
371 let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
372 anyhow!("get unsent offset {:?} in response", persisted_offset)
373 })?;
374 if sent_offset != persisted_offset {
375 bail!(
376 "new response offset {:?} does not match the buffer offset {:?}",
377 persisted_offset,
378 sent_offset
379 );
380 }
381
382 if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
383 (persisted_offset, start_time)
384 {
385 sink_writer_metrics
386 .sink_commit_duration
387 .observe(start_time.elapsed().as_millis() as f64);
388 }
389
390 log_reader.truncate(persisted_offset)?;
391 Ok(())
392 }
393
394 let mut prev_offset: Option<TruncateOffset> = None;
395 let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
397 VecDeque::new();
398
399 loop {
400 let either_result: futures::future::Either<
401 Option<SinkWriterStreamResponse>,
402 LogStoreResult<(u64, LogStoreReadItem)>,
403 > = drop_either_future(
404 select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
405 );
406 match either_result {
407 futures::future::Either::Left(opt) => {
408 let response = opt.context("end of response stream")?;
409 match response {
410 SinkWriterStreamResponse {
411 response:
412 Some(sink_writer_stream_response::Response::Batch(
413 sink_writer_stream_response::BatchWrittenResponse {
414 epoch,
415 batch_id,
416 },
417 )),
418 } => {
419 truncate_matched_offset(
420 &mut sent_offset_queue,
421 TruncateOffset::Chunk {
422 epoch,
423 chunk_id: batch_id as _,
424 },
425 &mut log_reader,
426 &sink_writer_metrics,
427 )?;
428 }
429 SinkWriterStreamResponse {
430 response:
431 Some(sink_writer_stream_response::Response::Commit(
432 sink_writer_stream_response::CommitResponse {
433 epoch,
434 metadata,
435 },
436 )),
437 } => {
438 if let Some(metadata) = metadata {
439 warn!("get unexpected non-empty metadata: {:?}", metadata);
440 }
441 truncate_matched_offset(
442 &mut sent_offset_queue,
443 TruncateOffset::Barrier { epoch },
444 &mut log_reader,
445 &sink_writer_metrics,
446 )?;
447 }
448 response => {
449 return Err(SinkError::Remote(anyhow!(
450 "get unexpected response: {:?}",
451 response
452 )));
453 }
454 }
455 }
456 futures::future::Either::Right(result) => {
457 let (epoch, item): (u64, LogStoreReadItem) = result?;
458
459 match item {
460 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
461 let offset = TruncateOffset::Chunk { epoch, chunk_id };
462 if let Some(prev_offset) = &prev_offset {
463 prev_offset.check_next_offset(offset)?;
464 }
465 let cardinality = chunk.cardinality();
466 sink_writer_metrics
467 .connector_sink_rows_received
468 .inc_by(cardinality as _);
469
470 let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
471 request_tx
472 .send_request(JniSinkWriterStreamRequest::Chunk {
473 epoch,
474 batch_id: chunk_id as u64,
475 chunk,
476 })
477 .instrument_await(span!(
478 "log_sinker_send_chunk (chunk {chunk_id})"
479 ))
480 .await?;
481 prev_offset = Some(offset);
482 sent_offset_queue
483 .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
484 }
485 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
486 let offset = TruncateOffset::Barrier { epoch };
487 if let Some(prev_offset) = &prev_offset {
488 prev_offset.check_next_offset(offset)?;
489 }
490 let start_time = if is_checkpoint {
491 let start_time = Instant::now();
492 request_tx
493 .barrier(epoch, true)
494 .instrument_await(span!(
495 "log_sinker_commit_checkpoint (epoch {epoch})"
496 ))
497 .await?;
498 Some(start_time)
499 } else {
500 request_tx
501 .barrier(epoch, false)
502 .instrument_await(span!(
503 "log_sinker_send_barrier (epoch {epoch})"
504 ))
505 .await?;
506 None
507 };
508 prev_offset = Some(offset);
509 sent_offset_queue
510 .push_back((TruncateOffset::Barrier { epoch }, start_time));
511 }
512 }
513 }
514 }
515 }
516 };
517
518 select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
519 .await
520 .factor_first()
521 .0
522 }
523}
524
525#[derive(Debug)]
526pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
527 param: SinkParam,
528 _phantom: PhantomData<R>,
529}
530
531impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
532 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
533}
534
535impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
536 type Error = SinkError;
537
538 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
539 Ok(Self {
540 param,
541 _phantom: PhantomData,
542 })
543 }
544}
545
546impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
547 type Coordinator = RemoteCoordinator;
548 type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
549
550 const SINK_NAME: &'static str = R::SINK_NAME;
551
552 async fn validate(&self) -> Result<()> {
553 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
554 Ok(())
555 }
556
557 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
558 let metrics = SinkWriterMetrics::new(&writer_param);
559 CoordinatedLogSinker::new(
560 &writer_param,
561 self.param.clone(),
562 CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
563 NonZero::new(1).unwrap(),
564 )
565 .await
566 }
567
568 fn is_coordinated_sink(&self) -> bool {
569 true
570 }
571
572 async fn new_coordinator(
573 &self,
574 _db: DatabaseConnection,
575 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
576 ) -> Result<Self::Coordinator> {
577 RemoteCoordinator::new::<R>(self.param.clone()).await
578 }
579}
580
581pub struct CoordinatedRemoteSinkWriter {
582 epoch: Option<u64>,
583 batch_id: u64,
584 stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
585 metrics: SinkWriterMetrics,
586}
587
588impl CoordinatedRemoteSinkWriter {
589 pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
590 let sink_proto = param.to_proto();
591 let stream_handle = EmbeddedConnectorClient::new()?
592 .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
593 .await?;
594
595 Ok(Self {
596 epoch: None,
597 batch_id: 0,
598 stream_handle,
599 metrics,
600 })
601 }
602
603 #[cfg(test)]
604 fn for_test(
605 response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
606 request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
607 ) -> CoordinatedRemoteSinkWriter {
608 use futures::StreamExt;
609
610 let stream_handle = SinkWriterStreamHandle::for_test(
611 request_sender,
612 ReceiverStream::new(response_receiver)
613 .map_err(RpcError::from)
614 .boxed(),
615 );
616
617 CoordinatedRemoteSinkWriter {
618 epoch: None,
619 batch_id: 0,
620 stream_handle,
621 metrics: SinkWriterMetrics::for_test(),
622 }
623 }
624}
625
626#[async_trait]
627impl SinkWriter for CoordinatedRemoteSinkWriter {
628 type CommitMetadata = Option<SinkMetadata>;
629
630 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
631 let cardinality = chunk.cardinality();
632 self.metrics
633 .connector_sink_rows_received
634 .inc_by(cardinality as _);
635
636 let epoch = self.epoch.ok_or_else(|| {
637 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
638 })?;
639 let batch_id = self.batch_id;
640 self.stream_handle
641 .request_sender
642 .send_request(JniSinkWriterStreamRequest::Chunk {
643 chunk,
644 epoch,
645 batch_id,
646 })
647 .await?;
648 self.batch_id += 1;
649 Ok(())
650 }
651
652 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
653 self.epoch = Some(epoch);
654 Ok(())
655 }
656
657 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
658 let epoch = self.epoch.ok_or_else(|| {
659 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
660 })?;
661 if is_checkpoint {
662 let rsp = self.stream_handle.commit(epoch).await?;
664 rsp.metadata
665 .ok_or_else(|| {
666 SinkError::Remote(anyhow!(
667 "get none metadata in commit response for coordinated sink writer"
668 ))
669 })
670 .map(Some)
671 } else {
672 self.stream_handle.barrier(epoch).await?;
673 Ok(None)
674 }
675 }
676}
677
678pub struct RemoteCoordinator {
679 stream_handle: SinkCoordinatorStreamHandle,
680}
681
682impl RemoteCoordinator {
683 pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
684 let stream_handle = EmbeddedConnectorClient::new()?
685 .start_sink_coordinator_stream(param.clone())
686 .await?;
687
688 tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
689
690 Ok(RemoteCoordinator { stream_handle })
691 }
692}
693
694#[async_trait]
695impl SinkCommitCoordinator for RemoteCoordinator {
696 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
697 Ok(None)
698 }
699
700 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
701 Ok(self.stream_handle.commit(epoch, metadata).await?)
702 }
703}
704
705struct EmbeddedConnectorClient {
706 jvm: &'static JavaVM,
707}
708
709impl EmbeddedConnectorClient {
710 fn new() -> Result<Self> {
711 let jvm = JVM
712 .get_or_init()
713 .context("failed to create EmbeddedConnectorClient")?;
714 Ok(EmbeddedConnectorClient { jvm })
715 }
716
717 async fn start_sink_writer_stream(
718 &self,
719 payload_schema: Option<TableSchema>,
720 sink_proto: PbSinkParam,
721 ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
722 let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
723 SinkWriterStreamRequest {
724 request: Some(SinkRequest::Start(StartSink {
725 sink_param: Some(sink_proto),
726 payload_schema,
727 })),
728 },
729 |rx| async move {
730 let rx = self.start_jvm_worker_thread(
731 gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
732 "runJniSinkWriterThread",
733 rx,
734 );
735 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
736 },
737 )
738 .await?;
739
740 match first_rsp {
741 SinkWriterStreamResponse {
742 response: Some(sink_writer_stream_response::Response::Start(_)),
743 } => Ok(handle),
744 msg => Err(SinkError::Internal(anyhow!(
745 "should get start response but get {:?}",
746 msg
747 ))),
748 }
749 }
750
751 async fn start_sink_coordinator_stream(
752 &self,
753 param: SinkParam,
754 ) -> Result<SinkCoordinatorStreamHandle> {
755 let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
756 SinkCoordinatorStreamRequest {
757 request: Some(sink_coordinator_stream_request::Request::Start(
758 StartCoordinator {
759 param: Some(param.to_proto()),
760 },
761 )),
762 },
763 |rx| async move {
764 let rx = self.start_jvm_worker_thread(
765 gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
766 "runJniSinkCoordinatorThread",
767 rx,
768 );
769 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
770 },
771 )
772 .await?;
773
774 match first_rsp {
775 SinkCoordinatorStreamResponse {
776 response: Some(sink_coordinator_stream_response::Response::Start(_)),
777 } => Ok(handle),
778 msg => Err(SinkError::Internal(anyhow!(
779 "should get start response but get {:?}",
780 msg
781 ))),
782 }
783 }
784
785 fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
786 &self,
787 class_name: &'static str,
788 method_name: &'static str,
789 mut request_rx: JniReceiverType<REQ>,
790 ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
791 let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
792 mpsc::channel(DEFAULT_BUFFER_SIZE);
793
794 let jvm = self.jvm;
795 std::thread::spawn(move || {
796 let result = execute_with_jni_env(jvm, |env| {
797 let result = call_static_method!(
798 env,
799 class_name,
800 method_name,
801 {{void}, {long requestRx, long responseTx}},
802 &mut request_rx as *mut JniReceiverType<REQ>,
803 &mut response_tx as *mut JniSenderType<RSP>
804 );
805
806 match result {
807 Ok(_) => {
808 tracing::debug!("end of jni call {}::{}", class_name, method_name);
809 }
810 Err(e) => {
811 tracing::error!(error = %e.as_report(), "jni call error");
812 }
813 };
814
815 Ok(())
816 });
817
818 if let Err(e) = result {
819 let _ = response_tx.blocking_send(Err(e));
820 }
821 });
822 response_rx
823 }
824}
825
826#[cfg(test)]
827mod test {
828 use std::time::Duration;
829
830 use risingwave_common::array::StreamChunk;
831 use risingwave_common::test_prelude::StreamChunkTestExt;
832 use risingwave_jni_core::JniSinkWriterStreamRequest;
833 use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
834 use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
835 use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
836 use tokio::sync::mpsc;
837
838 use crate::sink::SinkWriter;
839 use crate::sink::remote::CoordinatedRemoteSinkWriter;
840
841 #[tokio::test]
842 async fn test_epoch_check() {
843 let (request_sender, mut request_recv) = mpsc::channel(16);
844 let (_, resp_recv) = mpsc::channel(16);
845
846 let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
847 let chunk = StreamChunk::from_pretty(
848 " i T
849 + 1 Ripper
850 ",
851 );
852
853 assert!(
855 tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
856 .await
857 .expect("test failed: should not commit without epoch")
858 .is_err(),
859 "test failed: no epoch check for commit()"
860 );
861 assert!(
862 request_recv.try_recv().is_err(),
863 "test failed: unchecked epoch before request"
864 );
865
866 assert!(
867 tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
868 .await
869 .expect("test failed: should not write without epoch")
870 .is_err(),
871 "test failed: no epoch check for write_batch()"
872 );
873 assert!(
874 request_recv.try_recv().is_err(),
875 "test failed: unchecked epoch before request"
876 );
877 }
878
879 #[tokio::test]
880 async fn test_remote_sink() {
881 let (request_sender, mut request_receiver) = mpsc::channel(16);
882 let (response_sender, response_receiver) = mpsc::channel(16);
883 let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
884
885 let chunk_a = StreamChunk::from_pretty(
886 " i T
887 + 1 Alice
888 + 2 Bob
889 + 3 Clare
890 ",
891 );
892 let chunk_b = StreamChunk::from_pretty(
893 " i T
894 + 4 David
895 + 5 Eve
896 + 6 Frank
897 ",
898 );
899
900 sink.begin_epoch(2022).await.unwrap();
902 assert_eq!(sink.epoch, Some(2022));
903
904 sink.write_batch(chunk_a.clone()).await.unwrap();
905 assert_eq!(sink.epoch, Some(2022));
906 assert_eq!(sink.batch_id, 1);
907 match request_receiver.recv().await.unwrap() {
908 JniSinkWriterStreamRequest::Chunk {
909 epoch,
910 batch_id,
911 chunk,
912 } => {
913 assert_eq!(epoch, 2022);
914 assert_eq!(batch_id, 0);
915 assert_eq!(chunk, chunk_a);
916 }
917 _ => panic!("test failed: failed to construct write request"),
918 }
919
920 response_sender
922 .send(Ok(SinkWriterStreamResponse {
923 response: Some(Response::Commit(CommitResponse {
924 epoch: 2022,
925 metadata: None,
926 })),
927 }))
928 .await
929 .expect("test failed: failed to sync epoch");
930 sink.barrier(false).await.unwrap();
931 let commit_request = request_receiver.recv().await.unwrap();
932 match commit_request {
933 JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
934 request:
935 Some(Request::Barrier(Barrier {
936 epoch,
937 is_checkpoint: false,
938 })),
939 }) => {
940 assert_eq!(epoch, 2022);
941 }
942 _ => panic!("test failed: failed to construct sync request "),
943 };
944
945 sink.begin_epoch(2023).await.unwrap();
947 assert_eq!(sink.epoch, Some(2023));
948
949 sink.write_batch(chunk_b.clone()).await.unwrap();
951 assert_eq!(sink.epoch, Some(2023));
952 assert_eq!(sink.batch_id, 2);
953 match request_receiver.recv().await.unwrap() {
954 JniSinkWriterStreamRequest::Chunk {
955 epoch,
956 batch_id,
957 chunk,
958 } => {
959 assert_eq!(epoch, 2023);
960 assert_eq!(batch_id, 1);
961 assert_eq!(chunk, chunk_b);
962 }
963 _ => panic!("test failed: failed to construct write request"),
964 }
965 }
966}