1use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use phf::phf_set;
28use prost::Message;
29use risingwave_common::array::StreamChunk;
30use risingwave_common::bail;
31use risingwave_common::catalog::{ColumnDesc, ColumnId};
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
36use risingwave_jni_core::{
37 JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41 Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44 PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45 SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46 ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47 sink_writer_stream_response,
48};
49use risingwave_pb::stream_plan::PbSinkSchemaChange;
50use risingwave_rpc_client::error::RpcError;
51use risingwave_rpc_client::{
52 BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
53 SinkWriterStreamHandle,
54};
55use rw_futures_util::drop_either_future;
56use thiserror_ext::AsReport;
57use tokio::sync::mpsc;
58use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
59use tokio::task::spawn_blocking;
60use tokio_stream::wrappers::ReceiverStream;
61use tracing::warn;
62
63use super::elasticsearch_opensearch::elasticsearch_converter::{
64 StreamChunkConverter, is_remote_es_sink,
65};
66use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
67use crate::connector_common::IcebergSinkCompactionUpdate;
68use crate::enforce_secret::EnforceSecret;
69use crate::error::ConnectorResult;
70use crate::sink::coordinate::CoordinatedLogSinker;
71use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
72use crate::sink::writer::SinkWriter;
73use crate::sink::{
74 LogSinker, Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError,
75 SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
76};
77
78macro_rules! def_remote_sink {
79 () => {
80 def_remote_sink! {
81 { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
85 { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
86 }
87 };
88 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
89 #[derive(Debug)]
90 pub struct $variant_name;
91 impl RemoteSinkTrait for $variant_name {
92 const SINK_NAME: &'static str = $sink_name;
93 }
94 impl EnforceSecret for $variant_name {
95 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
96 $($enforce_secret_prop),*
97 };
98 }
99 pub type $sink_type_name = RemoteSink<$variant_name>;
100 };
101 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
102 #[derive(Debug)]
103 pub struct $variant_name;
104 impl RemoteSinkTrait for $variant_name {
105 const SINK_NAME: &'static str = $sink_name;
106 fn default_sink_decouple($desc: &SinkDesc) -> bool {
107 $body
108 }
109 }
110 impl EnforceSecret for $variant_name {
111 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
112 $($enforce_secret_prop),*
113 };
114 }
115 pub type $sink_type_name = RemoteSink<$variant_name>;
116 };
117 ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
118 def_remote_sink! {
119 {$($first)+}
120 }
121 def_remote_sink! {
122 $({$($rest)+})*
123 }
124 };
125 ($($invalid:tt)*) => {
126 compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
127 }
128}
129
130def_remote_sink!();
131
132pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
133 const SINK_NAME: &'static str;
134 fn default_sink_decouple() -> bool {
135 true
136 }
137}
138
139#[derive(Debug)]
140pub struct RemoteSink<R: RemoteSinkTrait> {
141 param: SinkParam,
142 _phantom: PhantomData<R>,
143}
144
145impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
146 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
147}
148
149impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
150 type Error = SinkError;
151
152 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
153 Ok(Self {
154 param,
155 _phantom: PhantomData,
156 })
157 }
158}
159
160impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
161 type LogSinker = RemoteLogSinker;
162
163 const SINK_NAME: &'static str = R::SINK_NAME;
164
165 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
166 match user_specified {
167 SinkDecouple::Default => Ok(R::default_sink_decouple()),
168 SinkDecouple::Enable => Ok(true),
169 SinkDecouple::Disable => Ok(false),
170 }
171 }
172
173 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
174 RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
175 }
176
177 async fn validate(&self) -> Result<()> {
178 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
179 Ok(())
180 }
181}
182
183async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
184 if is_remote_es_sink(sink_name)
190 && param.downstream_pk_or_empty().len() > 1
191 && !param.properties.contains_key(ES_OPTION_DELIMITER)
192 {
193 bail!("Es sink only supports single pk or pk with delimiter option");
194 }
195 param.columns.iter().try_for_each(|col| {
197 match &col.data_type {
198 DataType::Int16
199 | DataType::Int32
200 | DataType::Int64
201 | DataType::Float32
202 | DataType::Float64
203 | DataType::Boolean
204 | DataType::Decimal
205 | DataType::Timestamp
206 | DataType::Timestamptz
207 | DataType::Varchar
208 | DataType::Date
209 | DataType::Time
210 | DataType::Interval
211 | DataType::Jsonb
212 | DataType::Bytea => Ok(()),
213 DataType::List(list) => {
214 if is_remote_es_sink(sink_name) || matches!(list.elem(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
215 Ok(())
216 } else{
217 Err(SinkError::Remote(anyhow!(
218 "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
219 col.name,
220 col.data_type,
221 )))
222 }
223 },
224 DataType::Struct(_) => {
225 if is_remote_es_sink(sink_name){
226 Ok(())
227 }else{
228 Err(SinkError::Remote(anyhow!(
229 "Only Es sink supports struct, got {:?}: {:?}",
230 col.name,
231 col.data_type,
232 )))
233 }
234 },
235 DataType::Vector(_) |
236 DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
237 "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
238 col.name,
239 col.data_type,
240 )))}})?;
241
242 let jvm = Jvm::get_or_init()?;
243 let sink_param = param.to_proto();
244
245 spawn_blocking(move || -> anyhow::Result<()> {
246 execute_with_jni_env(jvm, |env| {
247 let validate_sink_request = ValidateSinkRequest {
248 sink_param: Some(sink_param),
249 };
250 let validate_sink_request_bytes =
251 env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
252
253 let validate_sink_response_bytes = call_static_method!(
254 env,
255 {com.risingwave.connector.JniSinkValidationHandler},
256 {byte[] validate(byte[] validateSourceRequestBytes)},
257 &validate_sink_request_bytes
258 )?;
259
260 let validate_sink_response: ValidateSinkResponse = Message::decode(
261 risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
262 )?;
263
264 validate_sink_response.error.map_or_else(
265 || Ok(()), |err| bail!("sink cannot pass validation: {}", err.error_message),
267 )
268 })
269 })
270 .await
271 .context("JoinHandle returns error")??;
272
273 Ok(())
274}
275
276pub struct RemoteLogSinker {
277 request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
278 response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
279 stream_chunk_converter: StreamChunkConverter,
280 sink_writer_metrics: SinkWriterMetrics,
281}
282
283impl RemoteLogSinker {
284 async fn new(
285 sink_param: SinkParam,
286 writer_param: SinkWriterParam,
287 sink_name: &str,
288 ) -> Result<Self> {
289 let sink_proto = sink_param.to_proto();
290 let payload_schema = if is_remote_es_sink(sink_name) {
291 let columns = vec![
292 ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
293 ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
294 ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
295 ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
296 ];
297 Some(TableSchema {
298 columns,
299 pk_indices: vec![],
300 })
301 } else {
302 sink_proto.table_schema.clone()
303 };
304
305 let SinkWriterStreamHandle {
306 request_sender,
307 response_stream,
308 } = EmbeddedConnectorClient::new()?
309 .start_sink_writer_stream(payload_schema, sink_proto)
310 .await?;
311
312 Ok(RemoteLogSinker {
313 request_sender,
314 response_stream,
315 sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
316 stream_chunk_converter: StreamChunkConverter::new(
317 sink_name,
318 sink_param.schema(),
319 &sink_param.downstream_pk_or_empty(),
320 &sink_param.properties,
321 sink_param.sink_type.is_append_only(),
322 )?,
323 })
324 }
325}
326
327#[async_trait]
328impl LogSinker for RemoteLogSinker {
329 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
330 log_reader.start_from(None).await?;
331 let mut request_tx = self.request_sender;
332 let mut response_err_stream_rx = self.response_stream;
333 let sink_writer_metrics = self.sink_writer_metrics;
334
335 let (response_tx, mut response_rx) = unbounded_channel();
336
337 let poll_response_stream = async move {
338 loop {
339 let result = response_err_stream_rx
340 .stream
341 .try_next()
342 .instrument_await("log_sinker_wait_next_response")
343 .await;
344 match result {
345 Ok(Some(response)) => {
346 response_tx.send(response).map_err(|err| {
347 SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
348 })?;
349 }
350 Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
351 Err(e) => return Err(SinkError::Remote(anyhow!(e))),
352 }
353 }
354 };
355
356 let poll_consume_log_and_sink = async move {
357 fn truncate_matched_offset(
358 queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
359 persisted_offset: TruncateOffset,
360 log_reader: &mut impl SinkLogReader,
361 sink_writer_metrics: &SinkWriterMetrics,
362 ) -> Result<()> {
363 while let Some((sent_offset, _)) = queue.front()
364 && sent_offset < &persisted_offset
365 {
366 queue.pop_front();
367 }
368
369 let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
370 anyhow!("get unsent offset {:?} in response", persisted_offset)
371 })?;
372 if sent_offset != persisted_offset {
373 bail!(
374 "new response offset {:?} does not match the buffer offset {:?}",
375 persisted_offset,
376 sent_offset
377 );
378 }
379
380 if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
381 (persisted_offset, start_time)
382 {
383 sink_writer_metrics
384 .sink_commit_duration
385 .observe(start_time.elapsed().as_millis() as f64);
386 }
387
388 log_reader.truncate(persisted_offset)?;
389 Ok(())
390 }
391
392 let mut prev_offset: Option<TruncateOffset> = None;
393 let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
395 VecDeque::new();
396
397 loop {
398 let either_result: futures::future::Either<
399 Option<SinkWriterStreamResponse>,
400 LogStoreResult<(u64, LogStoreReadItem)>,
401 > = drop_either_future(
402 select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
403 );
404 match either_result {
405 futures::future::Either::Left(opt) => {
406 let response = opt.context("end of response stream")?;
407 match response {
408 SinkWriterStreamResponse {
409 response:
410 Some(sink_writer_stream_response::Response::Batch(
411 sink_writer_stream_response::BatchWrittenResponse {
412 epoch,
413 batch_id,
414 },
415 )),
416 } => {
417 truncate_matched_offset(
418 &mut sent_offset_queue,
419 TruncateOffset::Chunk {
420 epoch,
421 chunk_id: batch_id as _,
422 },
423 &mut log_reader,
424 &sink_writer_metrics,
425 )?;
426 }
427 SinkWriterStreamResponse {
428 response:
429 Some(sink_writer_stream_response::Response::Commit(
430 sink_writer_stream_response::CommitResponse {
431 epoch,
432 metadata,
433 },
434 )),
435 } => {
436 if let Some(metadata) = metadata {
437 warn!("get unexpected non-empty metadata: {:?}", metadata);
438 }
439 truncate_matched_offset(
440 &mut sent_offset_queue,
441 TruncateOffset::Barrier { epoch },
442 &mut log_reader,
443 &sink_writer_metrics,
444 )?;
445 }
446 response => {
447 return Err(SinkError::Remote(anyhow!(
448 "get unexpected response: {:?}",
449 response
450 )));
451 }
452 }
453 }
454 futures::future::Either::Right(result) => {
455 let (epoch, item): (u64, LogStoreReadItem) = result?;
456
457 match item {
458 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
459 let offset = TruncateOffset::Chunk { epoch, chunk_id };
460 if let Some(prev_offset) = &prev_offset {
461 prev_offset.check_next_offset(offset)?;
462 }
463 let cardinality = chunk.cardinality();
464 sink_writer_metrics
465 .connector_sink_rows_received
466 .inc_by(cardinality as _);
467
468 let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
469 request_tx
470 .send_request(JniSinkWriterStreamRequest::Chunk {
471 epoch,
472 batch_id: chunk_id as u64,
473 chunk,
474 })
475 .instrument_await(span!(
476 "log_sinker_send_chunk (chunk {chunk_id})"
477 ))
478 .await?;
479 prev_offset = Some(offset);
480 sent_offset_queue
481 .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
482 }
483 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
484 let offset = TruncateOffset::Barrier { epoch };
485 if let Some(prev_offset) = &prev_offset {
486 prev_offset.check_next_offset(offset)?;
487 }
488 let start_time = if is_checkpoint {
489 let start_time = Instant::now();
490 request_tx
491 .barrier(epoch, true)
492 .instrument_await(span!(
493 "log_sinker_commit_checkpoint (epoch {epoch})"
494 ))
495 .await?;
496 Some(start_time)
497 } else {
498 request_tx
499 .barrier(epoch, false)
500 .instrument_await(span!(
501 "log_sinker_send_barrier (epoch {epoch})"
502 ))
503 .await?;
504 None
505 };
506 prev_offset = Some(offset);
507 sent_offset_queue
508 .push_back((TruncateOffset::Barrier { epoch }, start_time));
509 }
510 }
511 }
512 }
513 }
514 };
515
516 select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
517 .await
518 .factor_first()
519 .0
520 }
521}
522
523#[derive(Debug)]
524pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
525 param: SinkParam,
526 _phantom: PhantomData<R>,
527}
528
529impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
530 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
531}
532
533impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
534 type Error = SinkError;
535
536 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
537 Ok(Self {
538 param,
539 _phantom: PhantomData,
540 })
541 }
542}
543
544impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
545 type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
546
547 const SINK_NAME: &'static str = R::SINK_NAME;
548
549 async fn validate(&self) -> Result<()> {
550 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
551 Ok(())
552 }
553
554 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
555 let metrics = SinkWriterMetrics::new(&writer_param);
556 CoordinatedLogSinker::new(
557 &writer_param,
558 self.param.clone(),
559 CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
560 NonZero::new(1).unwrap(),
561 )
562 .await
563 }
564
565 fn is_coordinated_sink(&self) -> bool {
566 true
567 }
568
569 async fn new_coordinator(
570 &self,
571 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
572 ) -> Result<SinkCommitCoordinator> {
573 let coordinator = RemoteCoordinator::new::<R>(self.param.clone()).await?;
574 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
575 }
576}
577
578pub struct CoordinatedRemoteSinkWriter {
579 epoch: Option<u64>,
580 batch_id: u64,
581 stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
582 metrics: SinkWriterMetrics,
583}
584
585impl CoordinatedRemoteSinkWriter {
586 pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
587 let sink_proto = param.to_proto();
588 let stream_handle = EmbeddedConnectorClient::new()?
589 .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
590 .await?;
591
592 Ok(Self {
593 epoch: None,
594 batch_id: 0,
595 stream_handle,
596 metrics,
597 })
598 }
599
600 #[cfg(test)]
601 fn for_test(
602 response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
603 request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
604 ) -> CoordinatedRemoteSinkWriter {
605 use futures::StreamExt;
606
607 let stream_handle = SinkWriterStreamHandle::for_test(
608 request_sender,
609 ReceiverStream::new(response_receiver)
610 .map_err(RpcError::from)
611 .boxed(),
612 );
613
614 CoordinatedRemoteSinkWriter {
615 epoch: None,
616 batch_id: 0,
617 stream_handle,
618 metrics: SinkWriterMetrics::for_test(),
619 }
620 }
621}
622
623#[async_trait]
624impl SinkWriter for CoordinatedRemoteSinkWriter {
625 type CommitMetadata = Option<SinkMetadata>;
626
627 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
628 let cardinality = chunk.cardinality();
629 self.metrics
630 .connector_sink_rows_received
631 .inc_by(cardinality as _);
632
633 let epoch = self.epoch.ok_or_else(|| {
634 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
635 })?;
636 let batch_id = self.batch_id;
637 self.stream_handle
638 .request_sender
639 .send_request(JniSinkWriterStreamRequest::Chunk {
640 chunk,
641 epoch,
642 batch_id,
643 })
644 .await?;
645 self.batch_id += 1;
646 Ok(())
647 }
648
649 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
650 self.epoch = Some(epoch);
651 Ok(())
652 }
653
654 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
655 let epoch = self.epoch.ok_or_else(|| {
656 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
657 })?;
658 if is_checkpoint {
659 let rsp = self.stream_handle.commit(epoch).await?;
661 rsp.metadata
662 .ok_or_else(|| {
663 SinkError::Remote(anyhow!(
664 "get none metadata in commit response for coordinated sink writer"
665 ))
666 })
667 .map(Some)
668 } else {
669 self.stream_handle.barrier(epoch).await?;
670 Ok(None)
671 }
672 }
673}
674
675pub struct RemoteCoordinator {
676 stream_handle: SinkCoordinatorStreamHandle,
677}
678
679impl RemoteCoordinator {
680 pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
681 let stream_handle = EmbeddedConnectorClient::new()?
682 .start_sink_coordinator_stream(param.clone())
683 .await?;
684
685 tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
686
687 Ok(RemoteCoordinator { stream_handle })
688 }
689}
690
691#[async_trait]
692impl SinglePhaseCommitCoordinator for RemoteCoordinator {
693 async fn init(&mut self) -> Result<()> {
694 Ok(())
695 }
696
697 async fn commit(
698 &mut self,
699 epoch: u64,
700 metadata: Vec<SinkMetadata>,
701 schema_change: Option<PbSinkSchemaChange>,
702 ) -> Result<()> {
703 if let Some(schema_change) = schema_change {
704 return Err(anyhow!(
705 "remote coordinator not support schema change, but got: {:?}",
706 schema_change
707 )
708 .into());
709 }
710 Ok(self.stream_handle.commit(epoch, metadata).await?)
711 }
712}
713
714struct EmbeddedConnectorClient {
715 jvm: Jvm,
716}
717
718impl EmbeddedConnectorClient {
719 fn new() -> Result<Self> {
720 let jvm = Jvm::get_or_init().context("failed to create EmbeddedConnectorClient")?;
721 Ok(EmbeddedConnectorClient { jvm })
722 }
723
724 async fn start_sink_writer_stream(
725 &self,
726 payload_schema: Option<TableSchema>,
727 sink_proto: PbSinkParam,
728 ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
729 let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
730 SinkWriterStreamRequest {
731 request: Some(SinkRequest::Start(StartSink {
732 sink_param: Some(sink_proto),
733 payload_schema,
734 })),
735 },
736 |rx| async move {
737 let rx = self.start_jvm_worker_thread(
738 gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
739 "runJniSinkWriterThread",
740 rx,
741 );
742 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
743 },
744 )
745 .await?;
746
747 match first_rsp {
748 SinkWriterStreamResponse {
749 response: Some(sink_writer_stream_response::Response::Start(_)),
750 } => Ok(handle),
751 msg => Err(SinkError::Internal(anyhow!(
752 "should get start response but get {:?}",
753 msg
754 ))),
755 }
756 }
757
758 async fn start_sink_coordinator_stream(
759 &self,
760 param: SinkParam,
761 ) -> Result<SinkCoordinatorStreamHandle> {
762 let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
763 SinkCoordinatorStreamRequest {
764 request: Some(sink_coordinator_stream_request::Request::Start(
765 StartCoordinator {
766 param: Some(param.to_proto()),
767 },
768 )),
769 },
770 |rx| async move {
771 let rx = self.start_jvm_worker_thread(
772 gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
773 "runJniSinkCoordinatorThread",
774 rx,
775 );
776 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
777 },
778 )
779 .await?;
780
781 match first_rsp {
782 SinkCoordinatorStreamResponse {
783 response: Some(sink_coordinator_stream_response::Response::Start(_)),
784 } => Ok(handle),
785 msg => Err(SinkError::Internal(anyhow!(
786 "should get start response but get {:?}",
787 msg
788 ))),
789 }
790 }
791
792 fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
793 &self,
794 class_name: &'static str,
795 method_name: &'static str,
796 mut request_rx: JniReceiverType<REQ>,
797 ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
798 let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
799 mpsc::channel(DEFAULT_BUFFER_SIZE);
800
801 let jvm = self.jvm;
802 std::thread::spawn(move || {
803 let result = execute_with_jni_env(jvm, |env| {
804 let result = call_static_method!(
805 env,
806 class_name,
807 method_name,
808 {{void}, {long requestRx, long responseTx}},
809 &mut request_rx as *mut JniReceiverType<REQ>,
810 &mut response_tx as *mut JniSenderType<RSP>
811 );
812
813 match result {
814 Ok(_) => {
815 tracing::debug!("end of jni call {}::{}", class_name, method_name);
816 }
817 Err(e) => {
818 tracing::error!(error = %e.as_report(), "jni call error");
819 }
820 };
821
822 Ok(())
823 });
824
825 if let Err(e) = result {
826 let _ = response_tx.blocking_send(Err(e));
827 }
828 });
829 response_rx
830 }
831}
832
833#[cfg(test)]
834mod test {
835 use std::time::Duration;
836
837 use risingwave_common::array::StreamChunk;
838 use risingwave_common::test_prelude::StreamChunkTestExt;
839 use risingwave_jni_core::JniSinkWriterStreamRequest;
840 use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
841 use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
842 use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
843 use tokio::sync::mpsc;
844
845 use crate::sink::remote::CoordinatedRemoteSinkWriter;
846 use crate::sink::writer::SinkWriter;
847
848 #[tokio::test]
849 async fn test_epoch_check() {
850 let (request_sender, mut request_recv) = mpsc::channel(16);
851 let (_, resp_recv) = mpsc::channel(16);
852
853 let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
854 let chunk = StreamChunk::from_pretty(
855 " i T
856 + 1 Ripper
857 ",
858 );
859
860 assert!(
862 tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
863 .await
864 .expect("test failed: should not commit without epoch")
865 .is_err(),
866 "test failed: no epoch check for commit()"
867 );
868 assert!(
869 request_recv.try_recv().is_err(),
870 "test failed: unchecked epoch before request"
871 );
872
873 assert!(
874 tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
875 .await
876 .expect("test failed: should not write without epoch")
877 .is_err(),
878 "test failed: no epoch check for write_batch()"
879 );
880 assert!(
881 request_recv.try_recv().is_err(),
882 "test failed: unchecked epoch before request"
883 );
884 }
885
886 #[tokio::test]
887 async fn test_remote_sink() {
888 let (request_sender, mut request_receiver) = mpsc::channel(16);
889 let (response_sender, response_receiver) = mpsc::channel(16);
890 let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
891
892 let chunk_a = StreamChunk::from_pretty(
893 " i T
894 + 1 Alice
895 + 2 Bob
896 + 3 Clare
897 ",
898 );
899 let chunk_b = StreamChunk::from_pretty(
900 " i T
901 + 4 David
902 + 5 Eve
903 + 6 Frank
904 ",
905 );
906
907 sink.begin_epoch(2022).await.unwrap();
909 assert_eq!(sink.epoch, Some(2022));
910
911 sink.write_batch(chunk_a.clone()).await.unwrap();
912 assert_eq!(sink.epoch, Some(2022));
913 assert_eq!(sink.batch_id, 1);
914 match request_receiver.recv().await.unwrap() {
915 JniSinkWriterStreamRequest::Chunk {
916 epoch,
917 batch_id,
918 chunk,
919 } => {
920 assert_eq!(epoch, 2022);
921 assert_eq!(batch_id, 0);
922 assert_eq!(chunk, chunk_a);
923 }
924 _ => panic!("test failed: failed to construct write request"),
925 }
926
927 response_sender
929 .send(Ok(SinkWriterStreamResponse {
930 response: Some(Response::Commit(CommitResponse {
931 epoch: 2022,
932 metadata: None,
933 })),
934 }))
935 .await
936 .expect("test failed: failed to sync epoch");
937 sink.barrier(false).await.unwrap();
938 let commit_request = request_receiver.recv().await.unwrap();
939 match commit_request {
940 JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
941 request:
942 Some(Request::Barrier(Barrier {
943 epoch,
944 is_checkpoint: false,
945 })),
946 }) => {
947 assert_eq!(epoch, 2022);
948 }
949 _ => panic!("test failed: failed to construct sync request "),
950 };
951
952 sink.begin_epoch(2023).await.unwrap();
954 assert_eq!(sink.epoch, Some(2023));
955
956 sink.write_batch(chunk_b.clone()).await.unwrap();
958 assert_eq!(sink.epoch, Some(2023));
959 assert_eq!(sink.batch_id, 2);
960 match request_receiver.recv().await.unwrap() {
961 JniSinkWriterStreamRequest::Chunk {
962 epoch,
963 batch_id,
964 chunk,
965 } => {
966 assert_eq!(epoch, 2023);
967 assert_eq!(batch_id, 1);
968 assert_eq!(chunk, chunk_b);
969 }
970 _ => panic!("test failed: failed to construct write request"),
971 }
972 }
973}