1use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use phf::phf_set;
29use prost::Message;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::bail;
32use risingwave_common::catalog::{ColumnDesc, ColumnId};
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env};
36use risingwave_jni_core::{
37 JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41 Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44 PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45 SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46 ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47 sink_writer_stream_response,
48};
49use risingwave_rpc_client::error::RpcError;
50use risingwave_rpc_client::{
51 BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
52 SinkWriterStreamHandle,
53};
54use rw_futures_util::drop_either_future;
55use sea_orm::DatabaseConnection;
56use thiserror_ext::AsReport;
57use tokio::sync::mpsc;
58use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
59use tokio::task::spawn_blocking;
60use tokio_stream::wrappers::ReceiverStream;
61use tracing::warn;
62
63use super::SinkCommittedEpochSubscriber;
64use super::elasticsearch_opensearch::elasticsearch_converter::{
65 StreamChunkConverter, is_remote_es_sink,
66};
67use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
68use crate::connector_common::IcebergSinkCompactionUpdate;
69use crate::enforce_secret::EnforceSecret;
70use crate::error::ConnectorResult;
71use crate::sink::coordinate::CoordinatedLogSinker;
72use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
73use crate::sink::writer::SinkWriter;
74use crate::sink::{
75 DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
76 SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
77};
78
79macro_rules! def_remote_sink {
80 () => {
81 def_remote_sink! {
82 { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
86 { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
87 }
88 };
89 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
90 #[derive(Debug)]
91 pub struct $variant_name;
92 impl RemoteSinkTrait for $variant_name {
93 const SINK_NAME: &'static str = $sink_name;
94 }
95 impl EnforceSecret for $variant_name {
96 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
97 $($enforce_secret_prop),*
98 };
99 }
100 pub type $sink_type_name = RemoteSink<$variant_name>;
101 };
102 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
103 #[derive(Debug)]
104 pub struct $variant_name;
105 impl RemoteSinkTrait for $variant_name {
106 const SINK_NAME: &'static str = $sink_name;
107 fn default_sink_decouple($desc: &SinkDesc) -> bool {
108 $body
109 }
110 }
111 impl EnforceSecret for $variant_name {
112 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
113 $($enforce_secret_prop),*
114 };
115 }
116 pub type $sink_type_name = RemoteSink<$variant_name>;
117 };
118 ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
119 def_remote_sink! {
120 {$($first)+}
121 }
122 def_remote_sink! {
123 $({$($rest)+})*
124 }
125 };
126 ($($invalid:tt)*) => {
127 compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
128 }
129}
130
131def_remote_sink!();
132
133pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
134 const SINK_NAME: &'static str;
135 fn default_sink_decouple() -> bool {
136 true
137 }
138}
139
140#[derive(Debug)]
141pub struct RemoteSink<R: RemoteSinkTrait> {
142 param: SinkParam,
143 _phantom: PhantomData<R>,
144}
145
146impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
147 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
148}
149
150impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
151 type Error = SinkError;
152
153 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
154 Ok(Self {
155 param,
156 _phantom: PhantomData,
157 })
158 }
159}
160
161impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
162 type Coordinator = DummySinkCommitCoordinator;
163 type LogSinker = RemoteLogSinker;
164
165 const SINK_NAME: &'static str = R::SINK_NAME;
166
167 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
168 match user_specified {
169 SinkDecouple::Default => Ok(R::default_sink_decouple()),
170 SinkDecouple::Enable => Ok(true),
171 SinkDecouple::Disable => Ok(false),
172 }
173 }
174
175 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
176 RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
177 }
178
179 async fn validate(&self) -> Result<()> {
180 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
181 Ok(())
182 }
183}
184
185async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
186 if is_remote_es_sink(sink_name)
192 && param.downstream_pk.len() > 1
193 && !param.properties.contains_key(ES_OPTION_DELIMITER)
194 {
195 bail!("Es sink only supports single pk or pk with delimiter option");
196 }
197 param.columns.iter().try_for_each(|col| {
199 match &col.data_type {
200 DataType::Int16
201 | DataType::Int32
202 | DataType::Int64
203 | DataType::Float32
204 | DataType::Float64
205 | DataType::Boolean
206 | DataType::Decimal
207 | DataType::Timestamp
208 | DataType::Timestamptz
209 | DataType::Varchar
210 | DataType::Date
211 | DataType::Time
212 | DataType::Interval
213 | DataType::Jsonb
214 | DataType::Bytea => Ok(()),
215 DataType::List(list) => {
216 if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
217 Ok(())
218 } else{
219 Err(SinkError::Remote(anyhow!(
220 "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
221 col.name,
222 col.data_type,
223 )))
224 }
225 },
226 DataType::Struct(_) => {
227 if is_remote_es_sink(sink_name){
228 Ok(())
229 }else{
230 Err(SinkError::Remote(anyhow!(
231 "Only Es sink supports struct, got {:?}: {:?}",
232 col.name,
233 col.data_type,
234 )))
235 }
236 },
237 DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
238 "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
239 col.name,
240 col.data_type,
241 )))}})?;
242
243 let jvm = JVM.get_or_init()?;
244 let sink_param = param.to_proto();
245
246 spawn_blocking(move || -> anyhow::Result<()> {
247 execute_with_jni_env(jvm, |env| {
248 let validate_sink_request = ValidateSinkRequest {
249 sink_param: Some(sink_param),
250 };
251 let validate_sink_request_bytes =
252 env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
253
254 let validate_sink_response_bytes = call_static_method!(
255 env,
256 {com.risingwave.connector.JniSinkValidationHandler},
257 {byte[] validate(byte[] validateSourceRequestBytes)},
258 &validate_sink_request_bytes
259 )?;
260
261 let validate_sink_response: ValidateSinkResponse = Message::decode(
262 risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
263 )?;
264
265 validate_sink_response.error.map_or_else(
266 || Ok(()), |err| bail!("sink cannot pass validation: {}", err.error_message),
268 )
269 })
270 })
271 .await
272 .context("JoinHandle returns error")??;
273
274 Ok(())
275}
276
277pub struct RemoteLogSinker {
278 request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
279 response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
280 stream_chunk_converter: StreamChunkConverter,
281 sink_writer_metrics: SinkWriterMetrics,
282}
283
284impl RemoteLogSinker {
285 async fn new(
286 sink_param: SinkParam,
287 writer_param: SinkWriterParam,
288 sink_name: &str,
289 ) -> Result<Self> {
290 let sink_proto = sink_param.to_proto();
291 let payload_schema = if is_remote_es_sink(sink_name) {
292 let columns = vec![
293 ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
294 ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
295 ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
296 ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
297 ];
298 Some(TableSchema {
299 columns,
300 pk_indices: vec![],
301 })
302 } else {
303 sink_proto.table_schema.clone()
304 };
305
306 let SinkWriterStreamHandle {
307 request_sender,
308 response_stream,
309 } = EmbeddedConnectorClient::new()?
310 .start_sink_writer_stream(payload_schema, sink_proto)
311 .await?;
312
313 Ok(RemoteLogSinker {
314 request_sender,
315 response_stream,
316 sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
317 stream_chunk_converter: StreamChunkConverter::new(
318 sink_name,
319 sink_param.schema(),
320 &sink_param.downstream_pk,
321 &sink_param.properties,
322 sink_param.sink_type.is_append_only(),
323 )?,
324 })
325 }
326}
327
328#[async_trait]
329impl LogSinker for RemoteLogSinker {
330 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
331 log_reader.start_from(None).await?;
332 let mut request_tx = self.request_sender;
333 let mut response_err_stream_rx = self.response_stream;
334 let sink_writer_metrics = self.sink_writer_metrics;
335
336 let (response_tx, mut response_rx) = unbounded_channel();
337
338 let poll_response_stream = async move {
339 loop {
340 let result = response_err_stream_rx
341 .stream
342 .try_next()
343 .instrument_await("log_sinker_wait_next_response")
344 .await;
345 match result {
346 Ok(Some(response)) => {
347 response_tx.send(response).map_err(|err| {
348 SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
349 })?;
350 }
351 Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
352 Err(e) => return Err(SinkError::Remote(anyhow!(e))),
353 }
354 }
355 };
356
357 let poll_consume_log_and_sink = async move {
358 fn truncate_matched_offset(
359 queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
360 persisted_offset: TruncateOffset,
361 log_reader: &mut impl SinkLogReader,
362 sink_writer_metrics: &SinkWriterMetrics,
363 ) -> Result<()> {
364 while let Some((sent_offset, _)) = queue.front()
365 && sent_offset < &persisted_offset
366 {
367 queue.pop_front();
368 }
369
370 let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
371 anyhow!("get unsent offset {:?} in response", persisted_offset)
372 })?;
373 if sent_offset != persisted_offset {
374 bail!(
375 "new response offset {:?} does not match the buffer offset {:?}",
376 persisted_offset,
377 sent_offset
378 );
379 }
380
381 if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
382 (persisted_offset, start_time)
383 {
384 sink_writer_metrics
385 .sink_commit_duration
386 .observe(start_time.elapsed().as_millis() as f64);
387 }
388
389 log_reader.truncate(persisted_offset)?;
390 Ok(())
391 }
392
393 let mut prev_offset: Option<TruncateOffset> = None;
394 let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
396 VecDeque::new();
397
398 loop {
399 let either_result: futures::future::Either<
400 Option<SinkWriterStreamResponse>,
401 LogStoreResult<(u64, LogStoreReadItem)>,
402 > = drop_either_future(
403 select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
404 );
405 match either_result {
406 futures::future::Either::Left(opt) => {
407 let response = opt.context("end of response stream")?;
408 match response {
409 SinkWriterStreamResponse {
410 response:
411 Some(sink_writer_stream_response::Response::Batch(
412 sink_writer_stream_response::BatchWrittenResponse {
413 epoch,
414 batch_id,
415 },
416 )),
417 } => {
418 truncate_matched_offset(
419 &mut sent_offset_queue,
420 TruncateOffset::Chunk {
421 epoch,
422 chunk_id: batch_id as _,
423 },
424 &mut log_reader,
425 &sink_writer_metrics,
426 )?;
427 }
428 SinkWriterStreamResponse {
429 response:
430 Some(sink_writer_stream_response::Response::Commit(
431 sink_writer_stream_response::CommitResponse {
432 epoch,
433 metadata,
434 },
435 )),
436 } => {
437 if let Some(metadata) = metadata {
438 warn!("get unexpected non-empty metadata: {:?}", metadata);
439 }
440 truncate_matched_offset(
441 &mut sent_offset_queue,
442 TruncateOffset::Barrier { epoch },
443 &mut log_reader,
444 &sink_writer_metrics,
445 )?;
446 }
447 response => {
448 return Err(SinkError::Remote(anyhow!(
449 "get unexpected response: {:?}",
450 response
451 )));
452 }
453 }
454 }
455 futures::future::Either::Right(result) => {
456 let (epoch, item): (u64, LogStoreReadItem) = result?;
457
458 match item {
459 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
460 let offset = TruncateOffset::Chunk { epoch, chunk_id };
461 if let Some(prev_offset) = &prev_offset {
462 prev_offset.check_next_offset(offset)?;
463 }
464 let cardinality = chunk.cardinality();
465 sink_writer_metrics
466 .connector_sink_rows_received
467 .inc_by(cardinality as _);
468
469 let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
470 request_tx
471 .send_request(JniSinkWriterStreamRequest::Chunk {
472 epoch,
473 batch_id: chunk_id as u64,
474 chunk,
475 })
476 .instrument_await(span!(
477 "log_sinker_send_chunk (chunk {chunk_id})"
478 ))
479 .await?;
480 prev_offset = Some(offset);
481 sent_offset_queue
482 .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
483 }
484 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
485 let offset = TruncateOffset::Barrier { epoch };
486 if let Some(prev_offset) = &prev_offset {
487 prev_offset.check_next_offset(offset)?;
488 }
489 let start_time = if is_checkpoint {
490 let start_time = Instant::now();
491 request_tx
492 .barrier(epoch, true)
493 .instrument_await(span!(
494 "log_sinker_commit_checkpoint (epoch {epoch})"
495 ))
496 .await?;
497 Some(start_time)
498 } else {
499 request_tx
500 .barrier(epoch, false)
501 .instrument_await(span!(
502 "log_sinker_send_barrier (epoch {epoch})"
503 ))
504 .await?;
505 None
506 };
507 prev_offset = Some(offset);
508 sent_offset_queue
509 .push_back((TruncateOffset::Barrier { epoch }, start_time));
510 }
511 }
512 }
513 }
514 }
515 };
516
517 select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
518 .await
519 .factor_first()
520 .0
521 }
522}
523
524#[derive(Debug)]
525pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
526 param: SinkParam,
527 _phantom: PhantomData<R>,
528}
529
530impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
531 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
532}
533
534impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
535 type Error = SinkError;
536
537 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
538 Ok(Self {
539 param,
540 _phantom: PhantomData,
541 })
542 }
543}
544
545impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
546 type Coordinator = RemoteCoordinator;
547 type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
548
549 const SINK_NAME: &'static str = R::SINK_NAME;
550
551 async fn validate(&self) -> Result<()> {
552 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
553 Ok(())
554 }
555
556 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
557 let metrics = SinkWriterMetrics::new(&writer_param);
558 CoordinatedLogSinker::new(
559 &writer_param,
560 self.param.clone(),
561 CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
562 NonZero::new(1).unwrap(),
563 )
564 .await
565 }
566
567 fn is_coordinated_sink(&self) -> bool {
568 true
569 }
570
571 async fn new_coordinator(
572 &self,
573 _db: DatabaseConnection,
574 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
575 ) -> Result<Self::Coordinator> {
576 RemoteCoordinator::new::<R>(self.param.clone()).await
577 }
578}
579
580pub struct CoordinatedRemoteSinkWriter {
581 epoch: Option<u64>,
582 batch_id: u64,
583 stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
584 metrics: SinkWriterMetrics,
585}
586
587impl CoordinatedRemoteSinkWriter {
588 pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
589 let sink_proto = param.to_proto();
590 let stream_handle = EmbeddedConnectorClient::new()?
591 .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
592 .await?;
593
594 Ok(Self {
595 epoch: None,
596 batch_id: 0,
597 stream_handle,
598 metrics,
599 })
600 }
601
602 #[cfg(test)]
603 fn for_test(
604 response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
605 request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
606 ) -> CoordinatedRemoteSinkWriter {
607 use futures::StreamExt;
608
609 let stream_handle = SinkWriterStreamHandle::for_test(
610 request_sender,
611 ReceiverStream::new(response_receiver)
612 .map_err(RpcError::from)
613 .boxed(),
614 );
615
616 CoordinatedRemoteSinkWriter {
617 epoch: None,
618 batch_id: 0,
619 stream_handle,
620 metrics: SinkWriterMetrics::for_test(),
621 }
622 }
623}
624
625#[async_trait]
626impl SinkWriter for CoordinatedRemoteSinkWriter {
627 type CommitMetadata = Option<SinkMetadata>;
628
629 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
630 let cardinality = chunk.cardinality();
631 self.metrics
632 .connector_sink_rows_received
633 .inc_by(cardinality as _);
634
635 let epoch = self.epoch.ok_or_else(|| {
636 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
637 })?;
638 let batch_id = self.batch_id;
639 self.stream_handle
640 .request_sender
641 .send_request(JniSinkWriterStreamRequest::Chunk {
642 chunk,
643 epoch,
644 batch_id,
645 })
646 .await?;
647 self.batch_id += 1;
648 Ok(())
649 }
650
651 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
652 self.epoch = Some(epoch);
653 Ok(())
654 }
655
656 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
657 let epoch = self.epoch.ok_or_else(|| {
658 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
659 })?;
660 if is_checkpoint {
661 let rsp = self.stream_handle.commit(epoch).await?;
663 rsp.metadata
664 .ok_or_else(|| {
665 SinkError::Remote(anyhow!(
666 "get none metadata in commit response for coordinated sink writer"
667 ))
668 })
669 .map(Some)
670 } else {
671 self.stream_handle.barrier(epoch).await?;
672 Ok(None)
673 }
674 }
675}
676
677pub struct RemoteCoordinator {
678 stream_handle: SinkCoordinatorStreamHandle,
679}
680
681impl RemoteCoordinator {
682 pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
683 let stream_handle = EmbeddedConnectorClient::new()?
684 .start_sink_coordinator_stream(param.clone())
685 .await?;
686
687 tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
688
689 Ok(RemoteCoordinator { stream_handle })
690 }
691}
692
693#[async_trait]
694impl SinkCommitCoordinator for RemoteCoordinator {
695 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
696 Ok(None)
697 }
698
699 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
700 Ok(self.stream_handle.commit(epoch, metadata).await?)
701 }
702}
703
704struct EmbeddedConnectorClient {
705 jvm: &'static JavaVM,
706}
707
708impl EmbeddedConnectorClient {
709 fn new() -> Result<Self> {
710 let jvm = JVM
711 .get_or_init()
712 .context("failed to create EmbeddedConnectorClient")?;
713 Ok(EmbeddedConnectorClient { jvm })
714 }
715
716 async fn start_sink_writer_stream(
717 &self,
718 payload_schema: Option<TableSchema>,
719 sink_proto: PbSinkParam,
720 ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
721 let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
722 SinkWriterStreamRequest {
723 request: Some(SinkRequest::Start(StartSink {
724 sink_param: Some(sink_proto),
725 payload_schema,
726 })),
727 },
728 |rx| async move {
729 let rx = self.start_jvm_worker_thread(
730 gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
731 "runJniSinkWriterThread",
732 rx,
733 );
734 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
735 },
736 )
737 .await?;
738
739 match first_rsp {
740 SinkWriterStreamResponse {
741 response: Some(sink_writer_stream_response::Response::Start(_)),
742 } => Ok(handle),
743 msg => Err(SinkError::Internal(anyhow!(
744 "should get start response but get {:?}",
745 msg
746 ))),
747 }
748 }
749
750 async fn start_sink_coordinator_stream(
751 &self,
752 param: SinkParam,
753 ) -> Result<SinkCoordinatorStreamHandle> {
754 let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
755 SinkCoordinatorStreamRequest {
756 request: Some(sink_coordinator_stream_request::Request::Start(
757 StartCoordinator {
758 param: Some(param.to_proto()),
759 },
760 )),
761 },
762 |rx| async move {
763 let rx = self.start_jvm_worker_thread(
764 gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
765 "runJniSinkCoordinatorThread",
766 rx,
767 );
768 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
769 },
770 )
771 .await?;
772
773 match first_rsp {
774 SinkCoordinatorStreamResponse {
775 response: Some(sink_coordinator_stream_response::Response::Start(_)),
776 } => Ok(handle),
777 msg => Err(SinkError::Internal(anyhow!(
778 "should get start response but get {:?}",
779 msg
780 ))),
781 }
782 }
783
784 fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
785 &self,
786 class_name: &'static str,
787 method_name: &'static str,
788 mut request_rx: JniReceiverType<REQ>,
789 ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
790 let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
791 mpsc::channel(DEFAULT_BUFFER_SIZE);
792
793 let jvm = self.jvm;
794 std::thread::spawn(move || {
795 let result = execute_with_jni_env(jvm, |env| {
796 let result = call_static_method!(
797 env,
798 class_name,
799 method_name,
800 {{void}, {long requestRx, long responseTx}},
801 &mut request_rx as *mut JniReceiverType<REQ>,
802 &mut response_tx as *mut JniSenderType<RSP>
803 );
804
805 match result {
806 Ok(_) => {
807 tracing::debug!("end of jni call {}::{}", class_name, method_name);
808 }
809 Err(e) => {
810 tracing::error!(error = %e.as_report(), "jni call error");
811 }
812 };
813
814 Ok(())
815 });
816
817 if let Err(e) = result {
818 let _ = response_tx.blocking_send(Err(e));
819 }
820 });
821 response_rx
822 }
823}
824
825#[cfg(test)]
826mod test {
827 use std::time::Duration;
828
829 use risingwave_common::array::StreamChunk;
830 use risingwave_common::test_prelude::StreamChunkTestExt;
831 use risingwave_jni_core::JniSinkWriterStreamRequest;
832 use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
833 use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
834 use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
835 use tokio::sync::mpsc;
836
837 use crate::sink::SinkWriter;
838 use crate::sink::remote::CoordinatedRemoteSinkWriter;
839
840 #[tokio::test]
841 async fn test_epoch_check() {
842 let (request_sender, mut request_recv) = mpsc::channel(16);
843 let (_, resp_recv) = mpsc::channel(16);
844
845 let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
846 let chunk = StreamChunk::from_pretty(
847 " i T
848 + 1 Ripper
849 ",
850 );
851
852 assert!(
854 tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
855 .await
856 .expect("test failed: should not commit without epoch")
857 .is_err(),
858 "test failed: no epoch check for commit()"
859 );
860 assert!(
861 request_recv.try_recv().is_err(),
862 "test failed: unchecked epoch before request"
863 );
864
865 assert!(
866 tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
867 .await
868 .expect("test failed: should not write without epoch")
869 .is_err(),
870 "test failed: no epoch check for write_batch()"
871 );
872 assert!(
873 request_recv.try_recv().is_err(),
874 "test failed: unchecked epoch before request"
875 );
876 }
877
878 #[tokio::test]
879 async fn test_remote_sink() {
880 let (request_sender, mut request_receiver) = mpsc::channel(16);
881 let (response_sender, response_receiver) = mpsc::channel(16);
882 let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
883
884 let chunk_a = StreamChunk::from_pretty(
885 " i T
886 + 1 Alice
887 + 2 Bob
888 + 3 Clare
889 ",
890 );
891 let chunk_b = StreamChunk::from_pretty(
892 " i T
893 + 4 David
894 + 5 Eve
895 + 6 Frank
896 ",
897 );
898
899 sink.begin_epoch(2022).await.unwrap();
901 assert_eq!(sink.epoch, Some(2022));
902
903 sink.write_batch(chunk_a.clone()).await.unwrap();
904 assert_eq!(sink.epoch, Some(2022));
905 assert_eq!(sink.batch_id, 1);
906 match request_receiver.recv().await.unwrap() {
907 JniSinkWriterStreamRequest::Chunk {
908 epoch,
909 batch_id,
910 chunk,
911 } => {
912 assert_eq!(epoch, 2022);
913 assert_eq!(batch_id, 0);
914 assert_eq!(chunk, chunk_a);
915 }
916 _ => panic!("test failed: failed to construct write request"),
917 }
918
919 response_sender
921 .send(Ok(SinkWriterStreamResponse {
922 response: Some(Response::Commit(CommitResponse {
923 epoch: 2022,
924 metadata: None,
925 })),
926 }))
927 .await
928 .expect("test failed: failed to sync epoch");
929 sink.barrier(false).await.unwrap();
930 let commit_request = request_receiver.recv().await.unwrap();
931 match commit_request {
932 JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
933 request:
934 Some(Request::Barrier(Barrier {
935 epoch,
936 is_checkpoint: false,
937 })),
938 }) => {
939 assert_eq!(epoch, 2022);
940 }
941 _ => panic!("test failed: failed to construct sync request "),
942 };
943
944 sink.begin_epoch(2023).await.unwrap();
946 assert_eq!(sink.epoch, Some(2023));
947
948 sink.write_batch(chunk_b.clone()).await.unwrap();
950 assert_eq!(sink.epoch, Some(2023));
951 assert_eq!(sink.batch_id, 2);
952 match request_receiver.recv().await.unwrap() {
953 JniSinkWriterStreamRequest::Chunk {
954 epoch,
955 batch_id,
956 chunk,
957 } => {
958 assert_eq!(epoch, 2023);
959 assert_eq!(batch_id, 1);
960 assert_eq!(chunk, chunk_b);
961 }
962 _ => panic!("test failed: failed to construct write request"),
963 }
964 }
965}