1use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use phf::phf_set;
29use prost::Message;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::bail;
32use risingwave_common::catalog::{ColumnDesc, ColumnId};
33use risingwave_common::global_jvm::JVM;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::types::DataType;
36use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
37use risingwave_jni_core::{
38 JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
39};
40use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
41use risingwave_pb::connector_service::sink_writer_stream_request::{
42 Request as SinkRequest, StartSink,
43};
44use risingwave_pb::connector_service::{
45 PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
46 SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
47 ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
48 sink_writer_stream_response,
49};
50use risingwave_rpc_client::error::RpcError;
51use risingwave_rpc_client::{
52 BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
53 SinkWriterStreamHandle,
54};
55use rw_futures_util::drop_either_future;
56use sea_orm::DatabaseConnection;
57use thiserror_ext::AsReport;
58use tokio::sync::mpsc;
59use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
60use tokio::task::spawn_blocking;
61use tokio_stream::wrappers::ReceiverStream;
62use tracing::warn;
63
64use super::SinkCommittedEpochSubscriber;
65use super::elasticsearch_opensearch::elasticsearch_converter::{
66 StreamChunkConverter, is_remote_es_sink,
67};
68use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
69use crate::connector_common::IcebergSinkCompactionUpdate;
70use crate::enforce_secret::EnforceSecret;
71use crate::error::ConnectorResult;
72use crate::sink::coordinate::CoordinatedLogSinker;
73use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
74use crate::sink::writer::SinkWriter;
75use crate::sink::{
76 DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
77 SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
78};
79
80macro_rules! def_remote_sink {
81 () => {
82 def_remote_sink! {
83 { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
87 { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
88 }
89 };
90 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
91 #[derive(Debug)]
92 pub struct $variant_name;
93 impl RemoteSinkTrait for $variant_name {
94 const SINK_NAME: &'static str = $sink_name;
95 }
96 impl EnforceSecret for $variant_name {
97 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
98 $($enforce_secret_prop),*
99 };
100 }
101 pub type $sink_type_name = RemoteSink<$variant_name>;
102 };
103 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
104 #[derive(Debug)]
105 pub struct $variant_name;
106 impl RemoteSinkTrait for $variant_name {
107 const SINK_NAME: &'static str = $sink_name;
108 fn default_sink_decouple($desc: &SinkDesc) -> bool {
109 $body
110 }
111 }
112 impl EnforceSecret for $variant_name {
113 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
114 $($enforce_secret_prop),*
115 };
116 }
117 pub type $sink_type_name = RemoteSink<$variant_name>;
118 };
119 ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
120 def_remote_sink! {
121 {$($first)+}
122 }
123 def_remote_sink! {
124 $({$($rest)+})*
125 }
126 };
127 ($($invalid:tt)*) => {
128 compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
129 }
130}
131
132def_remote_sink!();
133
134pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
135 const SINK_NAME: &'static str;
136 fn default_sink_decouple() -> bool {
137 true
138 }
139}
140
141#[derive(Debug)]
142pub struct RemoteSink<R: RemoteSinkTrait> {
143 param: SinkParam,
144 _phantom: PhantomData<R>,
145}
146
147impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
148 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
149}
150
151impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
152 type Error = SinkError;
153
154 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
155 Ok(Self {
156 param,
157 _phantom: PhantomData,
158 })
159 }
160}
161
162impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
163 type Coordinator = DummySinkCommitCoordinator;
164 type LogSinker = RemoteLogSinker;
165
166 const SINK_NAME: &'static str = R::SINK_NAME;
167
168 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
169 match user_specified {
170 SinkDecouple::Default => Ok(R::default_sink_decouple()),
171 SinkDecouple::Enable => Ok(true),
172 SinkDecouple::Disable => Ok(false),
173 }
174 }
175
176 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
177 RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
178 }
179
180 async fn validate(&self) -> Result<()> {
181 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
182 Ok(())
183 }
184}
185
186async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
187 if is_remote_es_sink(sink_name)
193 && param.downstream_pk.len() > 1
194 && !param.properties.contains_key(ES_OPTION_DELIMITER)
195 {
196 bail!("Es sink only supports single pk or pk with delimiter option");
197 }
198 param.columns.iter().try_for_each(|col| {
200 match &col.data_type {
201 DataType::Int16
202 | DataType::Int32
203 | DataType::Int64
204 | DataType::Float32
205 | DataType::Float64
206 | DataType::Boolean
207 | DataType::Decimal
208 | DataType::Timestamp
209 | DataType::Timestamptz
210 | DataType::Varchar
211 | DataType::Date
212 | DataType::Time
213 | DataType::Interval
214 | DataType::Jsonb
215 | DataType::Bytea => Ok(()),
216 DataType::List(list) => {
217 if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
218 Ok(())
219 } else{
220 Err(SinkError::Remote(anyhow!(
221 "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
222 col.name,
223 col.data_type,
224 )))
225 }
226 },
227 DataType::Struct(_) => {
228 if is_remote_es_sink(sink_name){
229 Ok(())
230 }else{
231 Err(SinkError::Remote(anyhow!(
232 "Only Es sink supports struct, got {:?}: {:?}",
233 col.name,
234 col.data_type,
235 )))
236 }
237 },
238 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
239 DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
240 "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
241 col.name,
242 col.data_type,
243 )))}})?;
244
245 let jvm = JVM.get_or_init()?;
246 let sink_param = param.to_proto();
247
248 spawn_blocking(move || -> anyhow::Result<()> {
249 execute_with_jni_env(jvm, |env| {
250 let validate_sink_request = ValidateSinkRequest {
251 sink_param: Some(sink_param),
252 };
253 let validate_sink_request_bytes =
254 env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
255
256 let validate_sink_response_bytes = call_static_method!(
257 env,
258 {com.risingwave.connector.JniSinkValidationHandler},
259 {byte[] validate(byte[] validateSourceRequestBytes)},
260 &validate_sink_request_bytes
261 )?;
262
263 let validate_sink_response: ValidateSinkResponse = Message::decode(
264 risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
265 )?;
266
267 validate_sink_response.error.map_or_else(
268 || Ok(()), |err| bail!("sink cannot pass validation: {}", err.error_message),
270 )
271 })
272 })
273 .await
274 .context("JoinHandle returns error")??;
275
276 Ok(())
277}
278
279pub struct RemoteLogSinker {
280 request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
281 response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
282 stream_chunk_converter: StreamChunkConverter,
283 sink_writer_metrics: SinkWriterMetrics,
284}
285
286impl RemoteLogSinker {
287 async fn new(
288 sink_param: SinkParam,
289 writer_param: SinkWriterParam,
290 sink_name: &str,
291 ) -> Result<Self> {
292 let sink_proto = sink_param.to_proto();
293 let payload_schema = if is_remote_es_sink(sink_name) {
294 let columns = vec![
295 ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
296 ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
297 ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
298 ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
299 ];
300 Some(TableSchema {
301 columns,
302 pk_indices: vec![],
303 })
304 } else {
305 sink_proto.table_schema.clone()
306 };
307
308 let SinkWriterStreamHandle {
309 request_sender,
310 response_stream,
311 } = EmbeddedConnectorClient::new()?
312 .start_sink_writer_stream(payload_schema, sink_proto)
313 .await?;
314
315 Ok(RemoteLogSinker {
316 request_sender,
317 response_stream,
318 sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
319 stream_chunk_converter: StreamChunkConverter::new(
320 sink_name,
321 sink_param.schema(),
322 &sink_param.downstream_pk,
323 &sink_param.properties,
324 sink_param.sink_type.is_append_only(),
325 )?,
326 })
327 }
328}
329
330#[async_trait]
331impl LogSinker for RemoteLogSinker {
332 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
333 log_reader.start_from(None).await?;
334 let mut request_tx = self.request_sender;
335 let mut response_err_stream_rx = self.response_stream;
336 let sink_writer_metrics = self.sink_writer_metrics;
337
338 let (response_tx, mut response_rx) = unbounded_channel();
339
340 let poll_response_stream = async move {
341 loop {
342 let result = response_err_stream_rx
343 .stream
344 .try_next()
345 .instrument_await("log_sinker_wait_next_response")
346 .await;
347 match result {
348 Ok(Some(response)) => {
349 response_tx.send(response).map_err(|err| {
350 SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
351 })?;
352 }
353 Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
354 Err(e) => return Err(SinkError::Remote(anyhow!(e))),
355 }
356 }
357 };
358
359 let poll_consume_log_and_sink = async move {
360 fn truncate_matched_offset(
361 queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
362 persisted_offset: TruncateOffset,
363 log_reader: &mut impl SinkLogReader,
364 sink_writer_metrics: &SinkWriterMetrics,
365 ) -> Result<()> {
366 while let Some((sent_offset, _)) = queue.front()
367 && sent_offset < &persisted_offset
368 {
369 queue.pop_front();
370 }
371
372 let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
373 anyhow!("get unsent offset {:?} in response", persisted_offset)
374 })?;
375 if sent_offset != persisted_offset {
376 bail!(
377 "new response offset {:?} does not match the buffer offset {:?}",
378 persisted_offset,
379 sent_offset
380 );
381 }
382
383 if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
384 (persisted_offset, start_time)
385 {
386 sink_writer_metrics
387 .sink_commit_duration
388 .observe(start_time.elapsed().as_millis() as f64);
389 }
390
391 log_reader.truncate(persisted_offset)?;
392 Ok(())
393 }
394
395 let mut prev_offset: Option<TruncateOffset> = None;
396 let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
398 VecDeque::new();
399
400 loop {
401 let either_result: futures::future::Either<
402 Option<SinkWriterStreamResponse>,
403 LogStoreResult<(u64, LogStoreReadItem)>,
404 > = drop_either_future(
405 select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
406 );
407 match either_result {
408 futures::future::Either::Left(opt) => {
409 let response = opt.context("end of response stream")?;
410 match response {
411 SinkWriterStreamResponse {
412 response:
413 Some(sink_writer_stream_response::Response::Batch(
414 sink_writer_stream_response::BatchWrittenResponse {
415 epoch,
416 batch_id,
417 },
418 )),
419 } => {
420 truncate_matched_offset(
421 &mut sent_offset_queue,
422 TruncateOffset::Chunk {
423 epoch,
424 chunk_id: batch_id as _,
425 },
426 &mut log_reader,
427 &sink_writer_metrics,
428 )?;
429 }
430 SinkWriterStreamResponse {
431 response:
432 Some(sink_writer_stream_response::Response::Commit(
433 sink_writer_stream_response::CommitResponse {
434 epoch,
435 metadata,
436 },
437 )),
438 } => {
439 if let Some(metadata) = metadata {
440 warn!("get unexpected non-empty metadata: {:?}", metadata);
441 }
442 truncate_matched_offset(
443 &mut sent_offset_queue,
444 TruncateOffset::Barrier { epoch },
445 &mut log_reader,
446 &sink_writer_metrics,
447 )?;
448 }
449 response => {
450 return Err(SinkError::Remote(anyhow!(
451 "get unexpected response: {:?}",
452 response
453 )));
454 }
455 }
456 }
457 futures::future::Either::Right(result) => {
458 let (epoch, item): (u64, LogStoreReadItem) = result?;
459
460 match item {
461 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
462 let offset = TruncateOffset::Chunk { epoch, chunk_id };
463 if let Some(prev_offset) = &prev_offset {
464 prev_offset.check_next_offset(offset)?;
465 }
466 let cardinality = chunk.cardinality();
467 sink_writer_metrics
468 .connector_sink_rows_received
469 .inc_by(cardinality as _);
470
471 let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
472 request_tx
473 .send_request(JniSinkWriterStreamRequest::Chunk {
474 epoch,
475 batch_id: chunk_id as u64,
476 chunk,
477 })
478 .instrument_await(span!(
479 "log_sinker_send_chunk (chunk {chunk_id})"
480 ))
481 .await?;
482 prev_offset = Some(offset);
483 sent_offset_queue
484 .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
485 }
486 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
487 let offset = TruncateOffset::Barrier { epoch };
488 if let Some(prev_offset) = &prev_offset {
489 prev_offset.check_next_offset(offset)?;
490 }
491 let start_time = if is_checkpoint {
492 let start_time = Instant::now();
493 request_tx
494 .barrier(epoch, true)
495 .instrument_await(span!(
496 "log_sinker_commit_checkpoint (epoch {epoch})"
497 ))
498 .await?;
499 Some(start_time)
500 } else {
501 request_tx
502 .barrier(epoch, false)
503 .instrument_await(span!(
504 "log_sinker_send_barrier (epoch {epoch})"
505 ))
506 .await?;
507 None
508 };
509 prev_offset = Some(offset);
510 sent_offset_queue
511 .push_back((TruncateOffset::Barrier { epoch }, start_time));
512 }
513 }
514 }
515 }
516 }
517 };
518
519 select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
520 .await
521 .factor_first()
522 .0
523 }
524}
525
526#[derive(Debug)]
527pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
528 param: SinkParam,
529 _phantom: PhantomData<R>,
530}
531
532impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
533 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
534}
535
536impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
537 type Error = SinkError;
538
539 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
540 Ok(Self {
541 param,
542 _phantom: PhantomData,
543 })
544 }
545}
546
547impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
548 type Coordinator = RemoteCoordinator;
549 type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
550
551 const SINK_NAME: &'static str = R::SINK_NAME;
552
553 async fn validate(&self) -> Result<()> {
554 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
555 Ok(())
556 }
557
558 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
559 let metrics = SinkWriterMetrics::new(&writer_param);
560 CoordinatedLogSinker::new(
561 &writer_param,
562 self.param.clone(),
563 CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
564 NonZero::new(1).unwrap(),
565 )
566 .await
567 }
568
569 fn is_coordinated_sink(&self) -> bool {
570 true
571 }
572
573 async fn new_coordinator(
574 &self,
575 _db: DatabaseConnection,
576 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
577 ) -> Result<Self::Coordinator> {
578 RemoteCoordinator::new::<R>(self.param.clone()).await
579 }
580}
581
582pub struct CoordinatedRemoteSinkWriter {
583 epoch: Option<u64>,
584 batch_id: u64,
585 stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
586 metrics: SinkWriterMetrics,
587}
588
589impl CoordinatedRemoteSinkWriter {
590 pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
591 let sink_proto = param.to_proto();
592 let stream_handle = EmbeddedConnectorClient::new()?
593 .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
594 .await?;
595
596 Ok(Self {
597 epoch: None,
598 batch_id: 0,
599 stream_handle,
600 metrics,
601 })
602 }
603
604 #[cfg(test)]
605 fn for_test(
606 response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
607 request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
608 ) -> CoordinatedRemoteSinkWriter {
609 use futures::StreamExt;
610
611 let stream_handle = SinkWriterStreamHandle::for_test(
612 request_sender,
613 ReceiverStream::new(response_receiver)
614 .map_err(RpcError::from)
615 .boxed(),
616 );
617
618 CoordinatedRemoteSinkWriter {
619 epoch: None,
620 batch_id: 0,
621 stream_handle,
622 metrics: SinkWriterMetrics::for_test(),
623 }
624 }
625}
626
627#[async_trait]
628impl SinkWriter for CoordinatedRemoteSinkWriter {
629 type CommitMetadata = Option<SinkMetadata>;
630
631 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
632 let cardinality = chunk.cardinality();
633 self.metrics
634 .connector_sink_rows_received
635 .inc_by(cardinality as _);
636
637 let epoch = self.epoch.ok_or_else(|| {
638 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
639 })?;
640 let batch_id = self.batch_id;
641 self.stream_handle
642 .request_sender
643 .send_request(JniSinkWriterStreamRequest::Chunk {
644 chunk,
645 epoch,
646 batch_id,
647 })
648 .await?;
649 self.batch_id += 1;
650 Ok(())
651 }
652
653 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
654 self.epoch = Some(epoch);
655 Ok(())
656 }
657
658 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
659 let epoch = self.epoch.ok_or_else(|| {
660 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
661 })?;
662 if is_checkpoint {
663 let rsp = self.stream_handle.commit(epoch).await?;
665 rsp.metadata
666 .ok_or_else(|| {
667 SinkError::Remote(anyhow!(
668 "get none metadata in commit response for coordinated sink writer"
669 ))
670 })
671 .map(Some)
672 } else {
673 self.stream_handle.barrier(epoch).await?;
674 Ok(None)
675 }
676 }
677}
678
679pub struct RemoteCoordinator {
680 stream_handle: SinkCoordinatorStreamHandle,
681}
682
683impl RemoteCoordinator {
684 pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
685 let stream_handle = EmbeddedConnectorClient::new()?
686 .start_sink_coordinator_stream(param.clone())
687 .await?;
688
689 tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
690
691 Ok(RemoteCoordinator { stream_handle })
692 }
693}
694
695#[async_trait]
696impl SinkCommitCoordinator for RemoteCoordinator {
697 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
698 Ok(None)
699 }
700
701 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
702 Ok(self.stream_handle.commit(epoch, metadata).await?)
703 }
704}
705
706struct EmbeddedConnectorClient {
707 jvm: &'static JavaVM,
708}
709
710impl EmbeddedConnectorClient {
711 fn new() -> Result<Self> {
712 let jvm = JVM
713 .get_or_init()
714 .context("failed to create EmbeddedConnectorClient")?;
715 Ok(EmbeddedConnectorClient { jvm })
716 }
717
718 async fn start_sink_writer_stream(
719 &self,
720 payload_schema: Option<TableSchema>,
721 sink_proto: PbSinkParam,
722 ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
723 let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
724 SinkWriterStreamRequest {
725 request: Some(SinkRequest::Start(StartSink {
726 sink_param: Some(sink_proto),
727 payload_schema,
728 })),
729 },
730 |rx| async move {
731 let rx = self.start_jvm_worker_thread(
732 gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
733 "runJniSinkWriterThread",
734 rx,
735 );
736 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
737 },
738 )
739 .await?;
740
741 match first_rsp {
742 SinkWriterStreamResponse {
743 response: Some(sink_writer_stream_response::Response::Start(_)),
744 } => Ok(handle),
745 msg => Err(SinkError::Internal(anyhow!(
746 "should get start response but get {:?}",
747 msg
748 ))),
749 }
750 }
751
752 async fn start_sink_coordinator_stream(
753 &self,
754 param: SinkParam,
755 ) -> Result<SinkCoordinatorStreamHandle> {
756 let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
757 SinkCoordinatorStreamRequest {
758 request: Some(sink_coordinator_stream_request::Request::Start(
759 StartCoordinator {
760 param: Some(param.to_proto()),
761 },
762 )),
763 },
764 |rx| async move {
765 let rx = self.start_jvm_worker_thread(
766 gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
767 "runJniSinkCoordinatorThread",
768 rx,
769 );
770 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
771 },
772 )
773 .await?;
774
775 match first_rsp {
776 SinkCoordinatorStreamResponse {
777 response: Some(sink_coordinator_stream_response::Response::Start(_)),
778 } => Ok(handle),
779 msg => Err(SinkError::Internal(anyhow!(
780 "should get start response but get {:?}",
781 msg
782 ))),
783 }
784 }
785
786 fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
787 &self,
788 class_name: &'static str,
789 method_name: &'static str,
790 mut request_rx: JniReceiverType<REQ>,
791 ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
792 let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
793 mpsc::channel(DEFAULT_BUFFER_SIZE);
794
795 let jvm = self.jvm;
796 std::thread::spawn(move || {
797 let result = execute_with_jni_env(jvm, |env| {
798 let result = call_static_method!(
799 env,
800 class_name,
801 method_name,
802 {{void}, {long requestRx, long responseTx}},
803 &mut request_rx as *mut JniReceiverType<REQ>,
804 &mut response_tx as *mut JniSenderType<RSP>
805 );
806
807 match result {
808 Ok(_) => {
809 tracing::debug!("end of jni call {}::{}", class_name, method_name);
810 }
811 Err(e) => {
812 tracing::error!(error = %e.as_report(), "jni call error");
813 }
814 };
815
816 Ok(())
817 });
818
819 if let Err(e) = result {
820 let _ = response_tx.blocking_send(Err(e));
821 }
822 });
823 response_rx
824 }
825}
826
827#[cfg(test)]
828mod test {
829 use std::time::Duration;
830
831 use risingwave_common::array::StreamChunk;
832 use risingwave_common::test_prelude::StreamChunkTestExt;
833 use risingwave_jni_core::JniSinkWriterStreamRequest;
834 use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
835 use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
836 use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
837 use tokio::sync::mpsc;
838
839 use crate::sink::SinkWriter;
840 use crate::sink::remote::CoordinatedRemoteSinkWriter;
841
842 #[tokio::test]
843 async fn test_epoch_check() {
844 let (request_sender, mut request_recv) = mpsc::channel(16);
845 let (_, resp_recv) = mpsc::channel(16);
846
847 let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
848 let chunk = StreamChunk::from_pretty(
849 " i T
850 + 1 Ripper
851 ",
852 );
853
854 assert!(
856 tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
857 .await
858 .expect("test failed: should not commit without epoch")
859 .is_err(),
860 "test failed: no epoch check for commit()"
861 );
862 assert!(
863 request_recv.try_recv().is_err(),
864 "test failed: unchecked epoch before request"
865 );
866
867 assert!(
868 tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
869 .await
870 .expect("test failed: should not write without epoch")
871 .is_err(),
872 "test failed: no epoch check for write_batch()"
873 );
874 assert!(
875 request_recv.try_recv().is_err(),
876 "test failed: unchecked epoch before request"
877 );
878 }
879
880 #[tokio::test]
881 async fn test_remote_sink() {
882 let (request_sender, mut request_receiver) = mpsc::channel(16);
883 let (response_sender, response_receiver) = mpsc::channel(16);
884 let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
885
886 let chunk_a = StreamChunk::from_pretty(
887 " i T
888 + 1 Alice
889 + 2 Bob
890 + 3 Clare
891 ",
892 );
893 let chunk_b = StreamChunk::from_pretty(
894 " i T
895 + 4 David
896 + 5 Eve
897 + 6 Frank
898 ",
899 );
900
901 sink.begin_epoch(2022).await.unwrap();
903 assert_eq!(sink.epoch, Some(2022));
904
905 sink.write_batch(chunk_a.clone()).await.unwrap();
906 assert_eq!(sink.epoch, Some(2022));
907 assert_eq!(sink.batch_id, 1);
908 match request_receiver.recv().await.unwrap() {
909 JniSinkWriterStreamRequest::Chunk {
910 epoch,
911 batch_id,
912 chunk,
913 } => {
914 assert_eq!(epoch, 2022);
915 assert_eq!(batch_id, 0);
916 assert_eq!(chunk, chunk_a);
917 }
918 _ => panic!("test failed: failed to construct write request"),
919 }
920
921 response_sender
923 .send(Ok(SinkWriterStreamResponse {
924 response: Some(Response::Commit(CommitResponse {
925 epoch: 2022,
926 metadata: None,
927 })),
928 }))
929 .await
930 .expect("test failed: failed to sync epoch");
931 sink.barrier(false).await.unwrap();
932 let commit_request = request_receiver.recv().await.unwrap();
933 match commit_request {
934 JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
935 request:
936 Some(Request::Barrier(Barrier {
937 epoch,
938 is_checkpoint: false,
939 })),
940 }) => {
941 assert_eq!(epoch, 2022);
942 }
943 _ => panic!("test failed: failed to construct sync request "),
944 };
945
946 sink.begin_epoch(2023).await.unwrap();
948 assert_eq!(sink.epoch, Some(2023));
949
950 sink.write_batch(chunk_b.clone()).await.unwrap();
952 assert_eq!(sink.epoch, Some(2023));
953 assert_eq!(sink.batch_id, 2);
954 match request_receiver.recv().await.unwrap() {
955 JniSinkWriterStreamRequest::Chunk {
956 epoch,
957 batch_id,
958 chunk,
959 } => {
960 assert_eq!(epoch, 2023);
961 assert_eq!(batch_id, 1);
962 assert_eq!(chunk, chunk_b);
963 }
964 _ => panic!("test failed: failed to construct write request"),
965 }
966 }
967}