1use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use phf::phf_set;
28use prost::Message;
29use risingwave_common::array::StreamChunk;
30use risingwave_common::bail;
31use risingwave_common::catalog::{ColumnDesc, ColumnId, Field};
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
36use risingwave_jni_core::{
37 JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41 Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44 PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45 SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46 ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47 sink_writer_stream_response,
48};
49use risingwave_rpc_client::error::RpcError;
50use risingwave_rpc_client::{
51 BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
52 SinkWriterStreamHandle,
53};
54use rw_futures_util::drop_either_future;
55use thiserror_ext::AsReport;
56use tokio::sync::mpsc;
57use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
58use tokio::task::spawn_blocking;
59use tokio_stream::wrappers::ReceiverStream;
60use tracing::warn;
61
62use super::elasticsearch_opensearch::elasticsearch_converter::{
63 StreamChunkConverter, is_remote_es_sink,
64};
65use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
66use crate::connector_common::IcebergSinkCompactionUpdate;
67use crate::enforce_secret::EnforceSecret;
68use crate::error::ConnectorResult;
69use crate::sink::coordinate::CoordinatedLogSinker;
70use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
71use crate::sink::writer::SinkWriter;
72use crate::sink::{
73 LogSinker, Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError,
74 SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
75};
76
77macro_rules! def_remote_sink {
78 () => {
79 def_remote_sink! {
80 { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
84 { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
85 }
86 };
87 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
88 #[derive(Debug)]
89 pub struct $variant_name;
90 impl RemoteSinkTrait for $variant_name {
91 const SINK_NAME: &'static str = $sink_name;
92 }
93 impl EnforceSecret for $variant_name {
94 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
95 $($enforce_secret_prop),*
96 };
97 }
98 pub type $sink_type_name = RemoteSink<$variant_name>;
99 };
100 ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
101 #[derive(Debug)]
102 pub struct $variant_name;
103 impl RemoteSinkTrait for $variant_name {
104 const SINK_NAME: &'static str = $sink_name;
105 fn default_sink_decouple($desc: &SinkDesc) -> bool {
106 $body
107 }
108 }
109 impl EnforceSecret for $variant_name {
110 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
111 $($enforce_secret_prop),*
112 };
113 }
114 pub type $sink_type_name = RemoteSink<$variant_name>;
115 };
116 ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
117 def_remote_sink! {
118 {$($first)+}
119 }
120 def_remote_sink! {
121 $({$($rest)+})*
122 }
123 };
124 ($($invalid:tt)*) => {
125 compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
126 }
127}
128
129def_remote_sink!();
130
131pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
132 const SINK_NAME: &'static str;
133 fn default_sink_decouple() -> bool {
134 true
135 }
136}
137
138#[derive(Debug)]
139pub struct RemoteSink<R: RemoteSinkTrait> {
140 param: SinkParam,
141 _phantom: PhantomData<R>,
142}
143
144impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
145 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
146}
147
148impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
149 type Error = SinkError;
150
151 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
152 Ok(Self {
153 param,
154 _phantom: PhantomData,
155 })
156 }
157}
158
159impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
160 type LogSinker = RemoteLogSinker;
161
162 const SINK_NAME: &'static str = R::SINK_NAME;
163
164 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
165 match user_specified {
166 SinkDecouple::Default => Ok(R::default_sink_decouple()),
167 SinkDecouple::Enable => Ok(true),
168 SinkDecouple::Disable => Ok(false),
169 }
170 }
171
172 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
173 RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
174 }
175
176 async fn validate(&self) -> Result<()> {
177 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
178 Ok(())
179 }
180}
181
182async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
183 if is_remote_es_sink(sink_name)
189 && param.downstream_pk_or_empty().len() > 1
190 && !param.properties.contains_key(ES_OPTION_DELIMITER)
191 {
192 bail!("Es sink only supports single pk or pk with delimiter option");
193 }
194 param.columns.iter().try_for_each(|col| {
196 match &col.data_type {
197 DataType::Int16
198 | DataType::Int32
199 | DataType::Int64
200 | DataType::Float32
201 | DataType::Float64
202 | DataType::Boolean
203 | DataType::Decimal
204 | DataType::Timestamp
205 | DataType::Timestamptz
206 | DataType::Varchar
207 | DataType::Date
208 | DataType::Time
209 | DataType::Interval
210 | DataType::Jsonb
211 | DataType::Bytea => Ok(()),
212 DataType::List(list) => {
213 if is_remote_es_sink(sink_name) || matches!(list.elem(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
214 Ok(())
215 } else{
216 Err(SinkError::Remote(anyhow!(
217 "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
218 col.name,
219 col.data_type,
220 )))
221 }
222 },
223 DataType::Struct(_) => {
224 if is_remote_es_sink(sink_name){
225 Ok(())
226 }else{
227 Err(SinkError::Remote(anyhow!(
228 "Only Es sink supports struct, got {:?}: {:?}",
229 col.name,
230 col.data_type,
231 )))
232 }
233 },
234 DataType::Vector(_) |
235 DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
236 "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
237 col.name,
238 col.data_type,
239 )))}})?;
240
241 let jvm = Jvm::get_or_init()?;
242 let sink_param = param.to_proto();
243
244 spawn_blocking(move || -> anyhow::Result<()> {
245 execute_with_jni_env(jvm, |env| {
246 let validate_sink_request = ValidateSinkRequest {
247 sink_param: Some(sink_param),
248 };
249 let validate_sink_request_bytes =
250 env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
251
252 let validate_sink_response_bytes = call_static_method!(
253 env,
254 {com.risingwave.connector.JniSinkValidationHandler},
255 {byte[] validate(byte[] validateSourceRequestBytes)},
256 &validate_sink_request_bytes
257 )?;
258
259 let validate_sink_response: ValidateSinkResponse = Message::decode(
260 risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
261 )?;
262
263 validate_sink_response.error.map_or_else(
264 || Ok(()), |err| bail!("sink cannot pass validation: {}", err.error_message),
266 )
267 })
268 })
269 .await
270 .context("JoinHandle returns error")??;
271
272 Ok(())
273}
274
275pub struct RemoteLogSinker {
276 request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
277 response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
278 stream_chunk_converter: StreamChunkConverter,
279 sink_writer_metrics: SinkWriterMetrics,
280}
281
282impl RemoteLogSinker {
283 async fn new(
284 sink_param: SinkParam,
285 writer_param: SinkWriterParam,
286 sink_name: &str,
287 ) -> Result<Self> {
288 let sink_proto = sink_param.to_proto();
289 let payload_schema = if is_remote_es_sink(sink_name) {
290 let columns = vec![
291 ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
292 ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
293 ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
294 ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
295 ];
296 Some(TableSchema {
297 columns,
298 pk_indices: vec![],
299 })
300 } else {
301 sink_proto.table_schema.clone()
302 };
303
304 let SinkWriterStreamHandle {
305 request_sender,
306 response_stream,
307 } = EmbeddedConnectorClient::new()?
308 .start_sink_writer_stream(payload_schema, sink_proto)
309 .await?;
310
311 Ok(RemoteLogSinker {
312 request_sender,
313 response_stream,
314 sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
315 stream_chunk_converter: StreamChunkConverter::new(
316 sink_name,
317 sink_param.schema(),
318 &sink_param.downstream_pk_or_empty(),
319 &sink_param.properties,
320 sink_param.sink_type.is_append_only(),
321 )?,
322 })
323 }
324}
325
326#[async_trait]
327impl LogSinker for RemoteLogSinker {
328 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
329 log_reader.start_from(None).await?;
330 let mut request_tx = self.request_sender;
331 let mut response_err_stream_rx = self.response_stream;
332 let sink_writer_metrics = self.sink_writer_metrics;
333
334 let (response_tx, mut response_rx) = unbounded_channel();
335
336 let poll_response_stream = async move {
337 loop {
338 let result = response_err_stream_rx
339 .stream
340 .try_next()
341 .instrument_await("log_sinker_wait_next_response")
342 .await;
343 match result {
344 Ok(Some(response)) => {
345 response_tx.send(response).map_err(|err| {
346 SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
347 })?;
348 }
349 Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
350 Err(e) => return Err(SinkError::Remote(anyhow!(e))),
351 }
352 }
353 };
354
355 let poll_consume_log_and_sink = async move {
356 fn truncate_matched_offset(
357 queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
358 persisted_offset: TruncateOffset,
359 log_reader: &mut impl SinkLogReader,
360 sink_writer_metrics: &SinkWriterMetrics,
361 ) -> Result<()> {
362 while let Some((sent_offset, _)) = queue.front()
363 && sent_offset < &persisted_offset
364 {
365 queue.pop_front();
366 }
367
368 let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
369 anyhow!("get unsent offset {:?} in response", persisted_offset)
370 })?;
371 if sent_offset != persisted_offset {
372 bail!(
373 "new response offset {:?} does not match the buffer offset {:?}",
374 persisted_offset,
375 sent_offset
376 );
377 }
378
379 if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
380 (persisted_offset, start_time)
381 {
382 sink_writer_metrics
383 .sink_commit_duration
384 .observe(start_time.elapsed().as_millis() as f64);
385 }
386
387 log_reader.truncate(persisted_offset)?;
388 Ok(())
389 }
390
391 let mut prev_offset: Option<TruncateOffset> = None;
392 let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
394 VecDeque::new();
395
396 loop {
397 let either_result: futures::future::Either<
398 Option<SinkWriterStreamResponse>,
399 LogStoreResult<(u64, LogStoreReadItem)>,
400 > = drop_either_future(
401 select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
402 );
403 match either_result {
404 futures::future::Either::Left(opt) => {
405 let response = opt.context("end of response stream")?;
406 match response {
407 SinkWriterStreamResponse {
408 response:
409 Some(sink_writer_stream_response::Response::Batch(
410 sink_writer_stream_response::BatchWrittenResponse {
411 epoch,
412 batch_id,
413 },
414 )),
415 } => {
416 truncate_matched_offset(
417 &mut sent_offset_queue,
418 TruncateOffset::Chunk {
419 epoch,
420 chunk_id: batch_id as _,
421 },
422 &mut log_reader,
423 &sink_writer_metrics,
424 )?;
425 }
426 SinkWriterStreamResponse {
427 response:
428 Some(sink_writer_stream_response::Response::Commit(
429 sink_writer_stream_response::CommitResponse {
430 epoch,
431 metadata,
432 },
433 )),
434 } => {
435 if let Some(metadata) = metadata {
436 warn!("get unexpected non-empty metadata: {:?}", metadata);
437 }
438 truncate_matched_offset(
439 &mut sent_offset_queue,
440 TruncateOffset::Barrier { epoch },
441 &mut log_reader,
442 &sink_writer_metrics,
443 )?;
444 }
445 response => {
446 return Err(SinkError::Remote(anyhow!(
447 "get unexpected response: {:?}",
448 response
449 )));
450 }
451 }
452 }
453 futures::future::Either::Right(result) => {
454 let (epoch, item): (u64, LogStoreReadItem) = result?;
455
456 match item {
457 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
458 let offset = TruncateOffset::Chunk { epoch, chunk_id };
459 if let Some(prev_offset) = &prev_offset {
460 prev_offset.check_next_offset(offset)?;
461 }
462 let cardinality = chunk.cardinality();
463 sink_writer_metrics
464 .connector_sink_rows_received
465 .inc_by(cardinality as _);
466
467 let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
468 request_tx
469 .send_request(JniSinkWriterStreamRequest::Chunk {
470 epoch,
471 batch_id: chunk_id as u64,
472 chunk,
473 })
474 .instrument_await(span!(
475 "log_sinker_send_chunk (chunk {chunk_id})"
476 ))
477 .await?;
478 prev_offset = Some(offset);
479 sent_offset_queue
480 .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
481 }
482 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
483 let offset = TruncateOffset::Barrier { epoch };
484 if let Some(prev_offset) = &prev_offset {
485 prev_offset.check_next_offset(offset)?;
486 }
487 let start_time = if is_checkpoint {
488 let start_time = Instant::now();
489 request_tx
490 .barrier(epoch, true)
491 .instrument_await(span!(
492 "log_sinker_commit_checkpoint (epoch {epoch})"
493 ))
494 .await?;
495 Some(start_time)
496 } else {
497 request_tx
498 .barrier(epoch, false)
499 .instrument_await(span!(
500 "log_sinker_send_barrier (epoch {epoch})"
501 ))
502 .await?;
503 None
504 };
505 prev_offset = Some(offset);
506 sent_offset_queue
507 .push_back((TruncateOffset::Barrier { epoch }, start_time));
508 }
509 }
510 }
511 }
512 }
513 };
514
515 select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
516 .await
517 .factor_first()
518 .0
519 }
520}
521
522#[derive(Debug)]
523pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
524 param: SinkParam,
525 _phantom: PhantomData<R>,
526}
527
528impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
529 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
530}
531
532impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
533 type Error = SinkError;
534
535 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
536 Ok(Self {
537 param,
538 _phantom: PhantomData,
539 })
540 }
541}
542
543impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
544 type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
545
546 const SINK_NAME: &'static str = R::SINK_NAME;
547
548 async fn validate(&self) -> Result<()> {
549 validate_remote_sink(&self.param, Self::SINK_NAME).await?;
550 Ok(())
551 }
552
553 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
554 let metrics = SinkWriterMetrics::new(&writer_param);
555 CoordinatedLogSinker::new(
556 &writer_param,
557 self.param.clone(),
558 CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
559 NonZero::new(1).unwrap(),
560 )
561 .await
562 }
563
564 fn is_coordinated_sink(&self) -> bool {
565 true
566 }
567
568 async fn new_coordinator(
569 &self,
570 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
571 ) -> Result<SinkCommitCoordinator> {
572 let coordinator = RemoteCoordinator::new::<R>(self.param.clone()).await?;
573 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
574 }
575}
576
577pub struct CoordinatedRemoteSinkWriter {
578 epoch: Option<u64>,
579 batch_id: u64,
580 stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
581 metrics: SinkWriterMetrics,
582}
583
584impl CoordinatedRemoteSinkWriter {
585 pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
586 let sink_proto = param.to_proto();
587 let stream_handle = EmbeddedConnectorClient::new()?
588 .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
589 .await?;
590
591 Ok(Self {
592 epoch: None,
593 batch_id: 0,
594 stream_handle,
595 metrics,
596 })
597 }
598
599 #[cfg(test)]
600 fn for_test(
601 response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
602 request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
603 ) -> CoordinatedRemoteSinkWriter {
604 use futures::StreamExt;
605
606 let stream_handle = SinkWriterStreamHandle::for_test(
607 request_sender,
608 ReceiverStream::new(response_receiver)
609 .map_err(RpcError::from)
610 .boxed(),
611 );
612
613 CoordinatedRemoteSinkWriter {
614 epoch: None,
615 batch_id: 0,
616 stream_handle,
617 metrics: SinkWriterMetrics::for_test(),
618 }
619 }
620}
621
622#[async_trait]
623impl SinkWriter for CoordinatedRemoteSinkWriter {
624 type CommitMetadata = Option<SinkMetadata>;
625
626 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
627 let cardinality = chunk.cardinality();
628 self.metrics
629 .connector_sink_rows_received
630 .inc_by(cardinality as _);
631
632 let epoch = self.epoch.ok_or_else(|| {
633 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
634 })?;
635 let batch_id = self.batch_id;
636 self.stream_handle
637 .request_sender
638 .send_request(JniSinkWriterStreamRequest::Chunk {
639 chunk,
640 epoch,
641 batch_id,
642 })
643 .await?;
644 self.batch_id += 1;
645 Ok(())
646 }
647
648 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
649 self.epoch = Some(epoch);
650 Ok(())
651 }
652
653 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
654 let epoch = self.epoch.ok_or_else(|| {
655 SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
656 })?;
657 if is_checkpoint {
658 let rsp = self.stream_handle.commit(epoch).await?;
660 rsp.metadata
661 .ok_or_else(|| {
662 SinkError::Remote(anyhow!(
663 "get none metadata in commit response for coordinated sink writer"
664 ))
665 })
666 .map(Some)
667 } else {
668 self.stream_handle.barrier(epoch).await?;
669 Ok(None)
670 }
671 }
672}
673
674pub struct RemoteCoordinator {
675 stream_handle: SinkCoordinatorStreamHandle,
676}
677
678impl RemoteCoordinator {
679 pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
680 let stream_handle = EmbeddedConnectorClient::new()?
681 .start_sink_coordinator_stream(param.clone())
682 .await?;
683
684 tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
685
686 Ok(RemoteCoordinator { stream_handle })
687 }
688}
689
690#[async_trait]
691impl SinglePhaseCommitCoordinator for RemoteCoordinator {
692 async fn init(&mut self) -> Result<()> {
693 Ok(())
694 }
695
696 async fn commit(
697 &mut self,
698 epoch: u64,
699 metadata: Vec<SinkMetadata>,
700 add_columns: Option<Vec<Field>>,
701 ) -> Result<()> {
702 if let Some(add_columns) = add_columns {
703 return Err(anyhow!(
704 "remote coordinator not support add columns, but got: {:?}",
705 add_columns
706 )
707 .into());
708 }
709 Ok(self.stream_handle.commit(epoch, metadata).await?)
710 }
711}
712
713struct EmbeddedConnectorClient {
714 jvm: Jvm,
715}
716
717impl EmbeddedConnectorClient {
718 fn new() -> Result<Self> {
719 let jvm = Jvm::get_or_init().context("failed to create EmbeddedConnectorClient")?;
720 Ok(EmbeddedConnectorClient { jvm })
721 }
722
723 async fn start_sink_writer_stream(
724 &self,
725 payload_schema: Option<TableSchema>,
726 sink_proto: PbSinkParam,
727 ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
728 let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
729 SinkWriterStreamRequest {
730 request: Some(SinkRequest::Start(StartSink {
731 sink_param: Some(sink_proto),
732 payload_schema,
733 })),
734 },
735 |rx| async move {
736 let rx = self.start_jvm_worker_thread(
737 gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
738 "runJniSinkWriterThread",
739 rx,
740 );
741 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
742 },
743 )
744 .await?;
745
746 match first_rsp {
747 SinkWriterStreamResponse {
748 response: Some(sink_writer_stream_response::Response::Start(_)),
749 } => Ok(handle),
750 msg => Err(SinkError::Internal(anyhow!(
751 "should get start response but get {:?}",
752 msg
753 ))),
754 }
755 }
756
757 async fn start_sink_coordinator_stream(
758 &self,
759 param: SinkParam,
760 ) -> Result<SinkCoordinatorStreamHandle> {
761 let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
762 SinkCoordinatorStreamRequest {
763 request: Some(sink_coordinator_stream_request::Request::Start(
764 StartCoordinator {
765 param: Some(param.to_proto()),
766 },
767 )),
768 },
769 |rx| async move {
770 let rx = self.start_jvm_worker_thread(
771 gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
772 "runJniSinkCoordinatorThread",
773 rx,
774 );
775 Ok(ReceiverStream::new(rx).map_err(RpcError::from))
776 },
777 )
778 .await?;
779
780 match first_rsp {
781 SinkCoordinatorStreamResponse {
782 response: Some(sink_coordinator_stream_response::Response::Start(_)),
783 } => Ok(handle),
784 msg => Err(SinkError::Internal(anyhow!(
785 "should get start response but get {:?}",
786 msg
787 ))),
788 }
789 }
790
791 fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
792 &self,
793 class_name: &'static str,
794 method_name: &'static str,
795 mut request_rx: JniReceiverType<REQ>,
796 ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
797 let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
798 mpsc::channel(DEFAULT_BUFFER_SIZE);
799
800 let jvm = self.jvm;
801 std::thread::spawn(move || {
802 let result = execute_with_jni_env(jvm, |env| {
803 let result = call_static_method!(
804 env,
805 class_name,
806 method_name,
807 {{void}, {long requestRx, long responseTx}},
808 &mut request_rx as *mut JniReceiverType<REQ>,
809 &mut response_tx as *mut JniSenderType<RSP>
810 );
811
812 match result {
813 Ok(_) => {
814 tracing::debug!("end of jni call {}::{}", class_name, method_name);
815 }
816 Err(e) => {
817 tracing::error!(error = %e.as_report(), "jni call error");
818 }
819 };
820
821 Ok(())
822 });
823
824 if let Err(e) = result {
825 let _ = response_tx.blocking_send(Err(e));
826 }
827 });
828 response_rx
829 }
830}
831
832#[cfg(test)]
833mod test {
834 use std::time::Duration;
835
836 use risingwave_common::array::StreamChunk;
837 use risingwave_common::test_prelude::StreamChunkTestExt;
838 use risingwave_jni_core::JniSinkWriterStreamRequest;
839 use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
840 use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
841 use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
842 use tokio::sync::mpsc;
843
844 use crate::sink::remote::CoordinatedRemoteSinkWriter;
845 use crate::sink::writer::SinkWriter;
846
847 #[tokio::test]
848 async fn test_epoch_check() {
849 let (request_sender, mut request_recv) = mpsc::channel(16);
850 let (_, resp_recv) = mpsc::channel(16);
851
852 let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
853 let chunk = StreamChunk::from_pretty(
854 " i T
855 + 1 Ripper
856 ",
857 );
858
859 assert!(
861 tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
862 .await
863 .expect("test failed: should not commit without epoch")
864 .is_err(),
865 "test failed: no epoch check for commit()"
866 );
867 assert!(
868 request_recv.try_recv().is_err(),
869 "test failed: unchecked epoch before request"
870 );
871
872 assert!(
873 tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
874 .await
875 .expect("test failed: should not write without epoch")
876 .is_err(),
877 "test failed: no epoch check for write_batch()"
878 );
879 assert!(
880 request_recv.try_recv().is_err(),
881 "test failed: unchecked epoch before request"
882 );
883 }
884
885 #[tokio::test]
886 async fn test_remote_sink() {
887 let (request_sender, mut request_receiver) = mpsc::channel(16);
888 let (response_sender, response_receiver) = mpsc::channel(16);
889 let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
890
891 let chunk_a = StreamChunk::from_pretty(
892 " i T
893 + 1 Alice
894 + 2 Bob
895 + 3 Clare
896 ",
897 );
898 let chunk_b = StreamChunk::from_pretty(
899 " i T
900 + 4 David
901 + 5 Eve
902 + 6 Frank
903 ",
904 );
905
906 sink.begin_epoch(2022).await.unwrap();
908 assert_eq!(sink.epoch, Some(2022));
909
910 sink.write_batch(chunk_a.clone()).await.unwrap();
911 assert_eq!(sink.epoch, Some(2022));
912 assert_eq!(sink.batch_id, 1);
913 match request_receiver.recv().await.unwrap() {
914 JniSinkWriterStreamRequest::Chunk {
915 epoch,
916 batch_id,
917 chunk,
918 } => {
919 assert_eq!(epoch, 2022);
920 assert_eq!(batch_id, 0);
921 assert_eq!(chunk, chunk_a);
922 }
923 _ => panic!("test failed: failed to construct write request"),
924 }
925
926 response_sender
928 .send(Ok(SinkWriterStreamResponse {
929 response: Some(Response::Commit(CommitResponse {
930 epoch: 2022,
931 metadata: None,
932 })),
933 }))
934 .await
935 .expect("test failed: failed to sync epoch");
936 sink.barrier(false).await.unwrap();
937 let commit_request = request_receiver.recv().await.unwrap();
938 match commit_request {
939 JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
940 request:
941 Some(Request::Barrier(Barrier {
942 epoch,
943 is_checkpoint: false,
944 })),
945 }) => {
946 assert_eq!(epoch, 2022);
947 }
948 _ => panic!("test failed: failed to construct sync request "),
949 };
950
951 sink.begin_epoch(2023).await.unwrap();
953 assert_eq!(sink.epoch, Some(2023));
954
955 sink.write_batch(chunk_b.clone()).await.unwrap();
957 assert_eq!(sink.epoch, Some(2023));
958 assert_eq!(sink.batch_id, 2);
959 match request_receiver.recv().await.unwrap() {
960 JniSinkWriterStreamRequest::Chunk {
961 epoch,
962 batch_id,
963 chunk,
964 } => {
965 assert_eq!(epoch, 2023);
966 assert_eq!(batch_id, 1);
967 assert_eq!(chunk, chunk_b);
968 }
969 _ => panic!("test failed: failed to construct write request"),
970 }
971 }
972}