1use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use prost::Message;
29use risingwave_common::array::StreamChunk;
30use risingwave_common::bail;
31use risingwave_common::catalog::{ColumnDesc, ColumnId};
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::types::DataType;
34use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env};
35use risingwave_jni_core::{
36 JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
37};
38use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
39use risingwave_pb::connector_service::sink_writer_stream_request::{
40 Request as SinkRequest, StartSink,
41};
42use risingwave_pb::connector_service::{
43 PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
44 SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
45 ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
46 sink_writer_stream_response,
47};
48use risingwave_rpc_client::error::RpcError;
49use risingwave_rpc_client::{
50 BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
51 SinkWriterStreamHandle,
52};
53use rw_futures_util::drop_either_future;
54use sea_orm::DatabaseConnection;
55use thiserror_ext::AsReport;
56use tokio::sync::mpsc;
57use tokio::sync::mpsc::{Receiver, unbounded_channel};
58use tokio::task::spawn_blocking;
59use tokio_stream::wrappers::ReceiverStream;
60use tracing::warn;
61
62use super::SinkCommittedEpochSubscriber;
63use super::elasticsearch_opensearch::elasticsearch_converter::{
64 StreamChunkConverter, is_remote_es_sink,
65};
66use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
67use crate::error::ConnectorResult;
68use crate::sink::coordinate::CoordinatedLogSinker;
69use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
70use crate::sink::writer::SinkWriter;
71use crate::sink::{
72 DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
73 SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
74};
75
76macro_rules! def_remote_sink {
77 () => {
78 def_remote_sink! {
79 { Cassandra, CassandraSink, "cassandra" }
83 { Jdbc, JdbcSink, "jdbc" }
84 { DeltaLake, DeltaLakeSink, "deltalake" }
85 { HttpJava, HttpJavaSink, "http" }
86 }
87 };
88 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }) => {
89 #[derive(Debug)]
90 pub struct $variant_name;
91 impl RemoteSinkTrait for $variant_name {
92 const SINK_NAME: &'static str = $sink_name;
93 }
94 pub type $sink_type_name = RemoteSink<$variant_name>;
95 };
96 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, |$desc:ident| $body:expr }) => {
97 #[derive(Debug)]
98 pub struct $variant_name;
99 impl RemoteSinkTrait for $variant_name {
100 const SINK_NAME: &'static str = $sink_name;
101 fn default_sink_decouple($desc: &SinkDesc) -> bool {
102 $body
103 }
104 }
105 pub type $sink_type_name = RemoteSink<$variant_name>;
106 };
107 ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
108 def_remote_sink! {
109 {$($first)+}
110 }
111 def_remote_sink! {
112 $({$($rest)+})*
113 }
114 };
115 ($($invalid:tt)*) => {
116 compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
117 }
118}
119
120def_remote_sink!();
121
122pub trait RemoteSinkTrait: Send + Sync + 'static {
123 const SINK_NAME: &'static str;
124 fn default_sink_decouple() -> bool {
125 true
126 }
127}
128
129#[derive(Debug)]
130pub struct RemoteSink<R: RemoteSinkTrait> {
131 param: SinkParam,
132 _phantom: PhantomData<R>,
133}
134
135impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
136 type Error = SinkError;
137
138 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
139 Ok(Self {
140 param,
141 _phantom: PhantomData,
142 })
143 }
144}
145
146impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
147 type Coordinator = DummySinkCommitCoordinator;
148 type LogSinker = RemoteLogSinker;
149
150 const SINK_NAME: &'static str = R::SINK_NAME;
151
152 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
153 match user_specified {
154 SinkDecouple::Default => Ok(R::default_sink_decouple()),
155 SinkDecouple::Enable => Ok(true),
156 SinkDecouple::Disable => Ok(false),
157 }
158 }
159
160 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
161 RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
162 }
163
164 async fn validate(&self) -> Result<()> {
165 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
166 Ok(())
167 }
168}
169
170async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
171 if is_remote_es_sink(sink_name)
177 && param.downstream_pk.len() > 1
178 && !param.properties.contains_key(ES_OPTION_DELIMITER)
179 {
180 bail!("Es sink only supports single pk or pk with delimiter option");
181 }
182 param.columns.iter().try_for_each(|col| {
184 match &col.data_type {
185 DataType::Int16
186 | DataType::Int32
187 | DataType::Int64
188 | DataType::Float32
189 | DataType::Float64
190 | DataType::Boolean
191 | DataType::Decimal
192 | DataType::Timestamp
193 | DataType::Timestamptz
194 | DataType::Varchar
195 | DataType::Date
196 | DataType::Time
197 | DataType::Interval
198 | DataType::Jsonb
199 | DataType::Bytea => Ok(()),
200 DataType::List(list) => {
201 if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
202 Ok(())
203 } else{
204 Err(SinkError::Remote(anyhow!(
205 "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
206 col.name,
207 col.data_type,
208 )))
209 }
210 },
211 DataType::Struct(_) => {
212 if is_remote_es_sink(sink_name){
213 Ok(())
214 }else{
215 Err(SinkError::Remote(anyhow!(
216 "Only Es sink supports struct, got {:?}: {:?}",
217 col.name,
218 col.data_type,
219 )))
220 }
221 },
222 DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
223 "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
224 col.name,
225 col.data_type,
226 )))}})?;
227
228 let jvm = JVM.get_or_init()?;
229 let sink_param = param.to_proto();
230
231 spawn_blocking(move || -> anyhow::Result<()> {
232 execute_with_jni_env(jvm, |env| {
233 let validate_sink_request = ValidateSinkRequest {
234 sink_param: Some(sink_param),
235 };
236 let validate_sink_request_bytes =
237 env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
238
239 let validate_sink_response_bytes = call_static_method!(
240 env,
241 {com.risingwave.connector.JniSinkValidationHandler},
242 {byte[] validate(byte[] validateSourceRequestBytes)},
243 &validate_sink_request_bytes
244 )?;
245
246 let validate_sink_response: ValidateSinkResponse = Message::decode(
247 risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
248 )?;
249
250 validate_sink_response.error.map_or_else(
251 || Ok(()), |err| bail!("sink cannot pass validation: {}", err.error_message),
253 )
254 })
255 })
256 .await
257 .context("JoinHandle returns error")??;
258
259 Ok(())
260}
261
262pub struct RemoteLogSinker {
263 request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
264 response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
265 stream_chunk_converter: StreamChunkConverter,
266 sink_writer_metrics: SinkWriterMetrics,
267}
268
269impl RemoteLogSinker {
270 async fn new(
271 sink_param: SinkParam,
272 writer_param: SinkWriterParam,
273 sink_name: &str,
274 ) -> Result<Self> {
275 let sink_proto = sink_param.to_proto();
276 let payload_schema = if is_remote_es_sink(sink_name) {
277 let columns = vec![
278 ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
279 ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
280 ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
281 ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
282 ];
283 Some(TableSchema {
284 columns,
285 pk_indices: vec![],
286 })
287 } else {
288 sink_proto.table_schema.clone()
289 };
290
291 let SinkWriterStreamHandle {
292 request_sender,
293 response_stream,
294 } = EmbeddedConnectorClient::new()?
295 .start_sink_writer_stream(payload_schema, sink_proto)
296 .await?;
297
298 Ok(RemoteLogSinker {
299 request_sender,
300 response_stream,
301 sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
302 stream_chunk_converter: StreamChunkConverter::new(
303 sink_name,
304 sink_param.schema(),
305 &sink_param.downstream_pk,
306 &sink_param.properties,
307 sink_param.sink_type.is_append_only(),
308 )?,
309 })
310 }
311}
312
313#[async_trait]
314impl LogSinker for RemoteLogSinker {
315 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
316 log_reader.start_from(None).await?;
317 let mut request_tx = self.request_sender;
318 let mut response_err_stream_rx = self.response_stream;
319 let sink_writer_metrics = self.sink_writer_metrics;
320
321 let (response_tx, mut response_rx) = unbounded_channel();
322
323 let poll_response_stream = async move {
324 loop {
325 let result = response_err_stream_rx
326 .stream
327 .try_next()
328 .instrument_await("log_sinker_wait_next_response")
329 .await;
330 match result {
331 Ok(Some(response)) => {
332 response_tx.send(response).map_err(|err| {
333 SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
334 })?;
335 }
336 Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
337 Err(e) => return Err(SinkError::Remote(anyhow!(e))),
338 }
339 }
340 };
341
342 let poll_consume_log_and_sink = async move {
343 fn truncate_matched_offset(
344 queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
345 persisted_offset: TruncateOffset,
346 log_reader: &mut impl SinkLogReader,
347 sink_writer_metrics: &SinkWriterMetrics,
348 ) -> Result<()> {
349 while let Some((sent_offset, _)) = queue.front()
350 && sent_offset < &persisted_offset
351 {
352 queue.pop_front();
353 }
354
355 let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
356 anyhow!("get unsent offset {:?} in response", persisted_offset)
357 })?;
358 if sent_offset != persisted_offset {
359 bail!(
360 "new response offset {:?} does not match the buffer offset {:?}",
361 persisted_offset,
362 sent_offset
363 );
364 }
365
366 if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
367 (persisted_offset, start_time)
368 {
369 sink_writer_metrics
370 .sink_commit_duration
371 .observe(start_time.elapsed().as_millis() as f64);
372 }
373
374 log_reader.truncate(persisted_offset)?;
375 Ok(())
376 }
377
378 let mut prev_offset: Option<TruncateOffset> = None;
379 let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
381 VecDeque::new();
382
383 loop {
384 let either_result: futures::future::Either<
385 Option<SinkWriterStreamResponse>,
386 LogStoreResult<(u64, LogStoreReadItem)>,
387 > = drop_either_future(
388 select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
389 );
390 match either_result {
391 futures::future::Either::Left(opt) => {
392 let response = opt.context("end of response stream")?;
393 match response {
394 SinkWriterStreamResponse {
395 response:
396 Some(sink_writer_stream_response::Response::Batch(
397 sink_writer_stream_response::BatchWrittenResponse {
398 epoch,
399 batch_id,
400 },
401 )),
402 } => {
403 truncate_matched_offset(
404 &mut sent_offset_queue,
405 TruncateOffset::Chunk {
406 epoch,
407 chunk_id: batch_id as _,
408 },
409 &mut log_reader,
410 &sink_writer_metrics,
411 )?;
412 }
413 SinkWriterStreamResponse {
414 response:
415 Some(sink_writer_stream_response::Response::Commit(
416 sink_writer_stream_response::CommitResponse {
417 epoch,
418 metadata,
419 },
420 )),
421 } => {
422 if let Some(metadata) = metadata {
423 warn!("get unexpected non-empty metadata: {:?}", metadata);
424 }
425 truncate_matched_offset(
426 &mut sent_offset_queue,
427 TruncateOffset::Barrier { epoch },
428 &mut log_reader,
429 &sink_writer_metrics,
430 )?;
431 }
432 response => {
433 return Err(SinkError::Remote(anyhow!(
434 "get unexpected response: {:?}",
435 response
436 )));
437 }
438 }
439 }
440 futures::future::Either::Right(result) => {
441 let (epoch, item): (u64, LogStoreReadItem) = result?;
442
443 match item {
444 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
445 let offset = TruncateOffset::Chunk { epoch, chunk_id };
446 if let Some(prev_offset) = &prev_offset {
447 prev_offset.check_next_offset(offset)?;
448 }
449 let cardinality = chunk.cardinality();
450 sink_writer_metrics
451 .connector_sink_rows_received
452 .inc_by(cardinality as _);
453
454 let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
455 request_tx
456 .send_request(JniSinkWriterStreamRequest::Chunk {
457 epoch,
458 batch_id: chunk_id as u64,
459 chunk,
460 })
461 .instrument_await(span!(
462 "log_sinker_send_chunk (chunk {chunk_id})"
463 ))
464 .await?;
465 prev_offset = Some(offset);
466 sent_offset_queue
467 .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
468 }
469 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
470 let offset = TruncateOffset::Barrier { epoch };
471 if let Some(prev_offset) = &prev_offset {
472 prev_offset.check_next_offset(offset)?;
473 }
474 let start_time = if is_checkpoint {
475 let start_time = Instant::now();
476 request_tx
477 .barrier(epoch, true)
478 .instrument_await(span!(
479 "log_sinker_commit_checkpoint (epoch {epoch})"
480 ))
481 .await?;
482 Some(start_time)
483 } else {
484 request_tx
485 .barrier(epoch, false)
486 .instrument_await(span!(
487 "log_sinker_send_barrier (epoch {epoch})"
488 ))
489 .await?;
490 None
491 };
492 prev_offset = Some(offset);
493 sent_offset_queue
494 .push_back((TruncateOffset::Barrier { epoch }, start_time));
495 }
496 }
497 }
498 }
499 }
500 };
501
502 select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
503 .await
504 .factor_first()
505 .0
506 }
507}
508
509#[derive(Debug)]
510pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
511 param: SinkParam,
512 _phantom: PhantomData<R>,
513}
514
515impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
516 type Error = SinkError;
517
518 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
519 Ok(Self {
520 param,
521 _phantom: PhantomData,
522 })
523 }
524}
525
526impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
527 type Coordinator = RemoteCoordinator;
528 type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
529
530 const SINK_NAME: &'static str = R::SINK_NAME;
531
532 async fn validate(&self) -> Result<()> {
533 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
534 Ok(())
535 }
536
537 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
538 let metrics = SinkWriterMetrics::new(&writer_param);
539 CoordinatedLogSinker::new(
540 &writer_param,
541 self.param.clone(),
542 CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
543 NonZero::new(1).unwrap(),
544 )
545 .await
546 }
547
548 fn is_coordinated_sink(&self) -> bool {
549 true
550 }
551
552 async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
553 RemoteCoordinator::new::<R>(self.param.clone()).await
554 }
555}
556
557pub struct CoordinatedRemoteSinkWriter {
558 epoch: Option<u64>,
559 batch_id: u64,
560 stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
561 metrics: SinkWriterMetrics,
562}
563
564impl CoordinatedRemoteSinkWriter {
565 pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
566 let sink_proto = param.to_proto();
567 let stream_handle = EmbeddedConnectorClient::new()?
568 .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
569 .await?;
570
571 Ok(Self {
572 epoch: None,
573 batch_id: 0,
574 stream_handle,
575 metrics,
576 })
577 }
578
579 #[cfg(test)]
580 fn for_test(
581 response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
582 request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
583 ) -> CoordinatedRemoteSinkWriter {
584 use futures::StreamExt;
585
586 let stream_handle = SinkWriterStreamHandle::for_test(
587 request_sender,
588 ReceiverStream::new(response_receiver)
589 .map_err(RpcError::from)
590 .boxed(),
591 );
592
593 CoordinatedRemoteSinkWriter {
594 epoch: None,
595 batch_id: 0,
596 stream_handle,
597 metrics: SinkWriterMetrics::for_test(),
598 }
599 }
600}
601
602#[async_trait]
603impl SinkWriter for CoordinatedRemoteSinkWriter {
604 type CommitMetadata = Option<SinkMetadata>;
605
606 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
607 let cardinality = chunk.cardinality();
608 self.metrics
609 .connector_sink_rows_received
610 .inc_by(cardinality as _);
611
612 let epoch = self.epoch.ok_or_else(|| {
613 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
614 })?;
615 let batch_id = self.batch_id;
616 self.stream_handle
617 .request_sender
618 .send_request(JniSinkWriterStreamRequest::Chunk {
619 chunk,
620 epoch,
621 batch_id,
622 })
623 .await?;
624 self.batch_id += 1;
625 Ok(())
626 }
627
628 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
629 self.epoch = Some(epoch);
630 Ok(())
631 }
632
633 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
634 let epoch = self.epoch.ok_or_else(|| {
635 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
636 })?;
637 if is_checkpoint {
638 let rsp = self.stream_handle.commit(epoch).await?;
640 rsp.metadata
641 .ok_or_else(|| {
642 SinkError::Remote(anyhow!(
643 "get none metadata in commit response for coordinated sink writer"
644 ))
645 })
646 .map(Some)
647 } else {
648 self.stream_handle.barrier(epoch).await?;
649 Ok(None)
650 }
651 }
652}
653
654pub struct RemoteCoordinator {
655 stream_handle: SinkCoordinatorStreamHandle,
656}
657
658impl RemoteCoordinator {
659 pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
660 let stream_handle = EmbeddedConnectorClient::new()?
661 .start_sink_coordinator_stream(param.clone())
662 .await?;
663
664 tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
665
666 Ok(RemoteCoordinator { stream_handle })
667 }
668}
669
670#[async_trait]
671impl SinkCommitCoordinator for RemoteCoordinator {
672 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
673 Ok(None)
674 }
675
676 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
677 Ok(self.stream_handle.commit(epoch, metadata).await?)
678 }
679}
680
681struct EmbeddedConnectorClient {
682 jvm: &'static JavaVM,
683}
684
685impl EmbeddedConnectorClient {
686 fn new() -> Result<Self> {
687 let jvm = JVM
688 .get_or_init()
689 .context("failed to create EmbeddedConnectorClient")?;
690 Ok(EmbeddedConnectorClient { jvm })
691 }
692
693 async fn start_sink_writer_stream(
694 &self,
695 payload_schema: Option<TableSchema>,
696 sink_proto: PbSinkParam,
697 ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
698 let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
699 SinkWriterStreamRequest {
700 request: Some(SinkRequest::Start(StartSink {
701 sink_param: Some(sink_proto),
702 payload_schema,
703 })),
704 },
705 |rx| async move {
706 let rx = self.start_jvm_worker_thread(
707 gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
708 "runJniSinkWriterThread",
709 rx,
710 );
711 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
712 },
713 )
714 .await?;
715
716 match first_rsp {
717 SinkWriterStreamResponse {
718 response: Some(sink_writer_stream_response::Response::Start(_)),
719 } => Ok(handle),
720 msg => Err(SinkError::Internal(anyhow!(
721 "should get start response but get {:?}",
722 msg
723 ))),
724 }
725 }
726
727 async fn start_sink_coordinator_stream(
728 &self,
729 param: SinkParam,
730 ) -> Result<SinkCoordinatorStreamHandle> {
731 let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
732 SinkCoordinatorStreamRequest {
733 request: Some(sink_coordinator_stream_request::Request::Start(
734 StartCoordinator {
735 param: Some(param.to_proto()),
736 },
737 )),
738 },
739 |rx| async move {
740 let rx = self.start_jvm_worker_thread(
741 gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
742 "runJniSinkCoordinatorThread",
743 rx,
744 );
745 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
746 },
747 )
748 .await?;
749
750 match first_rsp {
751 SinkCoordinatorStreamResponse {
752 response: Some(sink_coordinator_stream_response::Response::Start(_)),
753 } => Ok(handle),
754 msg => Err(SinkError::Internal(anyhow!(
755 "should get start response but get {:?}",
756 msg
757 ))),
758 }
759 }
760
761 fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
762 &self,
763 class_name: &'static str,
764 method_name: &'static str,
765 mut request_rx: JniReceiverType<REQ>,
766 ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
767 let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
768 mpsc::channel(DEFAULT_BUFFER_SIZE);
769
770 let jvm = self.jvm;
771 std::thread::spawn(move || {
772 let result = execute_with_jni_env(jvm, |env| {
773 let result = call_static_method!(
774 env,
775 class_name,
776 method_name,
777 {{void}, {long requestRx, long responseTx}},
778 &mut request_rx as *mut JniReceiverType<REQ>,
779 &mut response_tx as *mut JniSenderType<RSP>
780 );
781
782 match result {
783 Ok(_) => {
784 tracing::debug!("end of jni call {}::{}", class_name, method_name);
785 }
786 Err(e) => {
787 tracing::error!(error = %e.as_report(), "jni call error");
788 }
789 };
790
791 Ok(())
792 });
793
794 if let Err(e) = result {
795 let _ = response_tx.blocking_send(Err(e));
796 }
797 });
798 response_rx
799 }
800}
801
802#[cfg(test)]
803mod test {
804 use std::time::Duration;
805
806 use risingwave_common::array::StreamChunk;
807 use risingwave_common::test_prelude::StreamChunkTestExt;
808 use risingwave_jni_core::JniSinkWriterStreamRequest;
809 use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
810 use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
811 use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
812 use tokio::sync::mpsc;
813
814 use crate::sink::SinkWriter;
815 use crate::sink::remote::CoordinatedRemoteSinkWriter;
816
817 #[tokio::test]
818 async fn test_epoch_check() {
819 let (request_sender, mut request_recv) = mpsc::channel(16);
820 let (_, resp_recv) = mpsc::channel(16);
821
822 let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
823 let chunk = StreamChunk::from_pretty(
824 " i T
825 + 1 Ripper
826 ",
827 );
828
829 assert!(
831 tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
832 .await
833 .expect("test failed: should not commit without epoch")
834 .is_err(),
835 "test failed: no epoch check for commit()"
836 );
837 assert!(
838 request_recv.try_recv().is_err(),
839 "test failed: unchecked epoch before request"
840 );
841
842 assert!(
843 tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
844 .await
845 .expect("test failed: should not write without epoch")
846 .is_err(),
847 "test failed: no epoch check for write_batch()"
848 );
849 assert!(
850 request_recv.try_recv().is_err(),
851 "test failed: unchecked epoch before request"
852 );
853 }
854
855 #[tokio::test]
856 async fn test_remote_sink() {
857 let (request_sender, mut request_receiver) = mpsc::channel(16);
858 let (response_sender, response_receiver) = mpsc::channel(16);
859 let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
860
861 let chunk_a = StreamChunk::from_pretty(
862 " i T
863 + 1 Alice
864 + 2 Bob
865 + 3 Clare
866 ",
867 );
868 let chunk_b = StreamChunk::from_pretty(
869 " i T
870 + 4 David
871 + 5 Eve
872 + 6 Frank
873 ",
874 );
875
876 sink.begin_epoch(2022).await.unwrap();
878 assert_eq!(sink.epoch, Some(2022));
879
880 sink.write_batch(chunk_a.clone()).await.unwrap();
881 assert_eq!(sink.epoch, Some(2022));
882 assert_eq!(sink.batch_id, 1);
883 match request_receiver.recv().await.unwrap() {
884 JniSinkWriterStreamRequest::Chunk {
885 epoch,
886 batch_id,
887 chunk,
888 } => {
889 assert_eq!(epoch, 2022);
890 assert_eq!(batch_id, 0);
891 assert_eq!(chunk, chunk_a);
892 }
893 _ => panic!("test failed: failed to construct write request"),
894 }
895
896 response_sender
898 .send(Ok(SinkWriterStreamResponse {
899 response: Some(Response::Commit(CommitResponse {
900 epoch: 2022,
901 metadata: None,
902 })),
903 }))
904 .await
905 .expect("test failed: failed to sync epoch");
906 sink.barrier(false).await.unwrap();
907 let commit_request = request_receiver.recv().await.unwrap();
908 match commit_request {
909 JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
910 request:
911 Some(Request::Barrier(Barrier {
912 epoch,
913 is_checkpoint: false,
914 })),
915 }) => {
916 assert_eq!(epoch, 2022);
917 }
918 _ => panic!("test failed: failed to construct sync request "),
919 };
920
921 sink.begin_epoch(2023).await.unwrap();
923 assert_eq!(sink.epoch, Some(2023));
924
925 sink.write_batch(chunk_b.clone()).await.unwrap();
927 assert_eq!(sink.epoch, Some(2023));
928 assert_eq!(sink.batch_id, 2);
929 match request_receiver.recv().await.unwrap() {
930 JniSinkWriterStreamRequest::Chunk {
931 epoch,
932 batch_id,
933 chunk,
934 } => {
935 assert_eq!(epoch, 2023);
936 assert_eq!(batch_id, 1);
937 assert_eq!(chunk, chunk_b);
938 }
939 _ => panic!("test failed: failed to construct write request"),
940 }
941 }
942}