risingwave_connector/sink/
remote.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use prost::Message;
29use risingwave_common::array::StreamChunk;
30use risingwave_common::bail;
31use risingwave_common::catalog::{ColumnDesc, ColumnId};
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::types::DataType;
34use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env};
35use risingwave_jni_core::{
36    JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
37};
38use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
39use risingwave_pb::connector_service::sink_writer_stream_request::{
40    Request as SinkRequest, StartSink,
41};
42use risingwave_pb::connector_service::{
43    PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
44    SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
45    ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
46    sink_writer_stream_response,
47};
48use risingwave_rpc_client::error::RpcError;
49use risingwave_rpc_client::{
50    BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
51    SinkWriterStreamHandle,
52};
53use rw_futures_util::drop_either_future;
54use sea_orm::DatabaseConnection;
55use thiserror_ext::AsReport;
56use tokio::sync::mpsc;
57use tokio::sync::mpsc::{Receiver, unbounded_channel};
58use tokio::task::spawn_blocking;
59use tokio_stream::wrappers::ReceiverStream;
60use tracing::warn;
61
62use super::SinkCommittedEpochSubscriber;
63use super::elasticsearch_opensearch::elasticsearch_converter::{
64    StreamChunkConverter, is_remote_es_sink,
65};
66use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
67use crate::error::ConnectorResult;
68use crate::sink::coordinate::CoordinatedLogSinker;
69use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
70use crate::sink::writer::SinkWriter;
71use crate::sink::{
72    DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
73    SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
74};
75
76macro_rules! def_remote_sink {
77    () => {
78        def_remote_sink! {
79            //todo!, delete java impl
80            // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" }
81            // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"}
82            { Cassandra, CassandraSink, "cassandra" }
83            { Jdbc, JdbcSink, "jdbc" }
84            { DeltaLake, DeltaLakeSink, "deltalake" }
85            { HttpJava, HttpJavaSink, "http" }
86        }
87    };
88    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }) => {
89        #[derive(Debug)]
90        pub struct $variant_name;
91        impl RemoteSinkTrait for $variant_name {
92            const SINK_NAME: &'static str = $sink_name;
93        }
94        pub type $sink_type_name = RemoteSink<$variant_name>;
95    };
96    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, |$desc:ident| $body:expr }) => {
97        #[derive(Debug)]
98        pub struct $variant_name;
99        impl RemoteSinkTrait for $variant_name {
100            const SINK_NAME: &'static str = $sink_name;
101            fn default_sink_decouple($desc: &SinkDesc) -> bool {
102                $body
103            }
104        }
105        pub type $sink_type_name = RemoteSink<$variant_name>;
106    };
107    ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
108        def_remote_sink! {
109            {$($first)+}
110        }
111        def_remote_sink! {
112            $({$($rest)+})*
113        }
114    };
115    ($($invalid:tt)*) => {
116        compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
117    }
118}
119
120def_remote_sink!();
121
122pub trait RemoteSinkTrait: Send + Sync + 'static {
123    const SINK_NAME: &'static str;
124    fn default_sink_decouple() -> bool {
125        true
126    }
127}
128
129#[derive(Debug)]
130pub struct RemoteSink<R: RemoteSinkTrait> {
131    param: SinkParam,
132    _phantom: PhantomData<R>,
133}
134
135impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
136    type Error = SinkError;
137
138    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
139        Ok(Self {
140            param,
141            _phantom: PhantomData,
142        })
143    }
144}
145
146impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
147    type Coordinator = DummySinkCommitCoordinator;
148    type LogSinker = RemoteLogSinker;
149
150    const SINK_NAME: &'static str = R::SINK_NAME;
151
152    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
153        match user_specified {
154            SinkDecouple::Default => Ok(R::default_sink_decouple()),
155            SinkDecouple::Enable => Ok(true),
156            SinkDecouple::Disable => Ok(false),
157        }
158    }
159
160    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
161        RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
162    }
163
164    async fn validate(&self) -> Result<()> {
165        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
166        Ok(())
167    }
168}
169
170async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
171    // if sink_name == OpenSearchJavaSink::SINK_NAME {
172    //     risingwave_common::license::Feature::OpenSearchSink
173    //         .check_available()
174    //         .map_err(|e| anyhow::anyhow!(e))?;
175    // }
176    if is_remote_es_sink(sink_name)
177        && param.downstream_pk.len() > 1
178        && !param.properties.contains_key(ES_OPTION_DELIMITER)
179    {
180        bail!("Es sink only supports single pk or pk with delimiter option");
181    }
182    // FIXME: support struct and array in stream sink
183    param.columns.iter().try_for_each(|col| {
184        match &col.data_type {
185            DataType::Int16
186                    | DataType::Int32
187                    | DataType::Int64
188                    | DataType::Float32
189                    | DataType::Float64
190                    | DataType::Boolean
191                    | DataType::Decimal
192                    | DataType::Timestamp
193                    | DataType::Timestamptz
194                    | DataType::Varchar
195                    | DataType::Date
196                    | DataType::Time
197                    | DataType::Interval
198                    | DataType::Jsonb
199                    | DataType::Bytea => Ok(()),
200            DataType::List(list) => {
201                if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
202                    Ok(())
203                } else{
204                    Err(SinkError::Remote(anyhow!(
205                        "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
206                        col.name,
207                        col.data_type,
208                    )))
209                }
210            },
211            DataType::Struct(_) => {
212                if is_remote_es_sink(sink_name){
213                    Ok(())
214                }else{
215                    Err(SinkError::Remote(anyhow!(
216                        "Only Es sink supports struct, got {:?}: {:?}",
217                        col.name,
218                        col.data_type,
219                    )))
220                }
221            },
222            DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
223                            "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
224                            col.name,
225                            col.data_type,
226                        )))}})?;
227
228    let jvm = JVM.get_or_init()?;
229    let sink_param = param.to_proto();
230
231    spawn_blocking(move || -> anyhow::Result<()> {
232        execute_with_jni_env(jvm, |env| {
233            let validate_sink_request = ValidateSinkRequest {
234                sink_param: Some(sink_param),
235            };
236            let validate_sink_request_bytes =
237                env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
238
239            let validate_sink_response_bytes = call_static_method!(
240                env,
241                {com.risingwave.connector.JniSinkValidationHandler},
242                {byte[] validate(byte[] validateSourceRequestBytes)},
243                &validate_sink_request_bytes
244            )?;
245
246            let validate_sink_response: ValidateSinkResponse = Message::decode(
247                risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
248            )?;
249
250            validate_sink_response.error.map_or_else(
251                || Ok(()), // If there is no error message, return Ok here.
252                |err| bail!("sink cannot pass validation: {}", err.error_message),
253            )
254        })
255    })
256    .await
257    .context("JoinHandle returns error")??;
258
259    Ok(())
260}
261
262pub struct RemoteLogSinker {
263    request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
264    response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
265    stream_chunk_converter: StreamChunkConverter,
266    sink_writer_metrics: SinkWriterMetrics,
267}
268
269impl RemoteLogSinker {
270    async fn new(
271        sink_param: SinkParam,
272        writer_param: SinkWriterParam,
273        sink_name: &str,
274    ) -> Result<Self> {
275        let sink_proto = sink_param.to_proto();
276        let payload_schema = if is_remote_es_sink(sink_name) {
277            let columns = vec![
278                ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
279                ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
280                ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
281                ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
282            ];
283            Some(TableSchema {
284                columns,
285                pk_indices: vec![],
286            })
287        } else {
288            sink_proto.table_schema.clone()
289        };
290
291        let SinkWriterStreamHandle {
292            request_sender,
293            response_stream,
294        } = EmbeddedConnectorClient::new()?
295            .start_sink_writer_stream(payload_schema, sink_proto)
296            .await?;
297
298        Ok(RemoteLogSinker {
299            request_sender,
300            response_stream,
301            sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
302            stream_chunk_converter: StreamChunkConverter::new(
303                sink_name,
304                sink_param.schema(),
305                &sink_param.downstream_pk,
306                &sink_param.properties,
307                sink_param.sink_type.is_append_only(),
308            )?,
309        })
310    }
311}
312
313#[async_trait]
314impl LogSinker for RemoteLogSinker {
315    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
316        log_reader.start_from(None).await?;
317        let mut request_tx = self.request_sender;
318        let mut response_err_stream_rx = self.response_stream;
319        let sink_writer_metrics = self.sink_writer_metrics;
320
321        let (response_tx, mut response_rx) = unbounded_channel();
322
323        let poll_response_stream = async move {
324            loop {
325                let result = response_err_stream_rx
326                    .stream
327                    .try_next()
328                    .instrument_await("log_sinker_wait_next_response")
329                    .await;
330                match result {
331                    Ok(Some(response)) => {
332                        response_tx.send(response).map_err(|err| {
333                            SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
334                        })?;
335                    }
336                    Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
337                    Err(e) => return Err(SinkError::Remote(anyhow!(e))),
338                }
339            }
340        };
341
342        let poll_consume_log_and_sink = async move {
343            fn truncate_matched_offset(
344                queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
345                persisted_offset: TruncateOffset,
346                log_reader: &mut impl SinkLogReader,
347                sink_writer_metrics: &SinkWriterMetrics,
348            ) -> Result<()> {
349                while let Some((sent_offset, _)) = queue.front()
350                    && sent_offset < &persisted_offset
351                {
352                    queue.pop_front();
353                }
354
355                let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
356                    anyhow!("get unsent offset {:?} in response", persisted_offset)
357                })?;
358                if sent_offset != persisted_offset {
359                    bail!(
360                        "new response offset {:?} does not match the buffer offset {:?}",
361                        persisted_offset,
362                        sent_offset
363                    );
364                }
365
366                if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
367                    (persisted_offset, start_time)
368                {
369                    sink_writer_metrics
370                        .sink_commit_duration
371                        .observe(start_time.elapsed().as_millis() as f64);
372                }
373
374                log_reader.truncate(persisted_offset)?;
375                Ok(())
376            }
377
378            let mut prev_offset: Option<TruncateOffset> = None;
379            // Push from back and pop from front
380            let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
381                VecDeque::new();
382
383            loop {
384                let either_result: futures::future::Either<
385                    Option<SinkWriterStreamResponse>,
386                    LogStoreResult<(u64, LogStoreReadItem)>,
387                > = drop_either_future(
388                    select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
389                );
390                match either_result {
391                    futures::future::Either::Left(opt) => {
392                        let response = opt.context("end of response stream")?;
393                        match response {
394                            SinkWriterStreamResponse {
395                                response:
396                                    Some(sink_writer_stream_response::Response::Batch(
397                                        sink_writer_stream_response::BatchWrittenResponse {
398                                            epoch,
399                                            batch_id,
400                                        },
401                                    )),
402                            } => {
403                                truncate_matched_offset(
404                                    &mut sent_offset_queue,
405                                    TruncateOffset::Chunk {
406                                        epoch,
407                                        chunk_id: batch_id as _,
408                                    },
409                                    &mut log_reader,
410                                    &sink_writer_metrics,
411                                )?;
412                            }
413                            SinkWriterStreamResponse {
414                                response:
415                                    Some(sink_writer_stream_response::Response::Commit(
416                                        sink_writer_stream_response::CommitResponse {
417                                            epoch,
418                                            metadata,
419                                        },
420                                    )),
421                            } => {
422                                if let Some(metadata) = metadata {
423                                    warn!("get unexpected non-empty metadata: {:?}", metadata);
424                                }
425                                truncate_matched_offset(
426                                    &mut sent_offset_queue,
427                                    TruncateOffset::Barrier { epoch },
428                                    &mut log_reader,
429                                    &sink_writer_metrics,
430                                )?;
431                            }
432                            response => {
433                                return Err(SinkError::Remote(anyhow!(
434                                    "get unexpected response: {:?}",
435                                    response
436                                )));
437                            }
438                        }
439                    }
440                    futures::future::Either::Right(result) => {
441                        let (epoch, item): (u64, LogStoreReadItem) = result?;
442
443                        match item {
444                            LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
445                                let offset = TruncateOffset::Chunk { epoch, chunk_id };
446                                if let Some(prev_offset) = &prev_offset {
447                                    prev_offset.check_next_offset(offset)?;
448                                }
449                                let cardinality = chunk.cardinality();
450                                sink_writer_metrics
451                                    .connector_sink_rows_received
452                                    .inc_by(cardinality as _);
453
454                                let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
455                                request_tx
456                                    .send_request(JniSinkWriterStreamRequest::Chunk {
457                                        epoch,
458                                        batch_id: chunk_id as u64,
459                                        chunk,
460                                    })
461                                    .instrument_await(span!(
462                                        "log_sinker_send_chunk (chunk {chunk_id})"
463                                    ))
464                                    .await?;
465                                prev_offset = Some(offset);
466                                sent_offset_queue
467                                    .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
468                            }
469                            LogStoreReadItem::Barrier { is_checkpoint, .. } => {
470                                let offset = TruncateOffset::Barrier { epoch };
471                                if let Some(prev_offset) = &prev_offset {
472                                    prev_offset.check_next_offset(offset)?;
473                                }
474                                let start_time = if is_checkpoint {
475                                    let start_time = Instant::now();
476                                    request_tx
477                                        .barrier(epoch, true)
478                                        .instrument_await(span!(
479                                            "log_sinker_commit_checkpoint (epoch {epoch})"
480                                        ))
481                                        .await?;
482                                    Some(start_time)
483                                } else {
484                                    request_tx
485                                        .barrier(epoch, false)
486                                        .instrument_await(span!(
487                                            "log_sinker_send_barrier (epoch {epoch})"
488                                        ))
489                                        .await?;
490                                    None
491                                };
492                                prev_offset = Some(offset);
493                                sent_offset_queue
494                                    .push_back((TruncateOffset::Barrier { epoch }, start_time));
495                            }
496                        }
497                    }
498                }
499            }
500        };
501
502        select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
503            .await
504            .factor_first()
505            .0
506    }
507}
508
509#[derive(Debug)]
510pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
511    param: SinkParam,
512    _phantom: PhantomData<R>,
513}
514
515impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
516    type Error = SinkError;
517
518    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
519        Ok(Self {
520            param,
521            _phantom: PhantomData,
522        })
523    }
524}
525
526impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
527    type Coordinator = RemoteCoordinator;
528    type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
529
530    const SINK_NAME: &'static str = R::SINK_NAME;
531
532    async fn validate(&self) -> Result<()> {
533        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
534        Ok(())
535    }
536
537    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
538        let metrics = SinkWriterMetrics::new(&writer_param);
539        CoordinatedLogSinker::new(
540            &writer_param,
541            self.param.clone(),
542            CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
543            NonZero::new(1).unwrap(),
544        )
545        .await
546    }
547
548    fn is_coordinated_sink(&self) -> bool {
549        true
550    }
551
552    async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
553        RemoteCoordinator::new::<R>(self.param.clone()).await
554    }
555}
556
557pub struct CoordinatedRemoteSinkWriter {
558    epoch: Option<u64>,
559    batch_id: u64,
560    stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
561    metrics: SinkWriterMetrics,
562}
563
564impl CoordinatedRemoteSinkWriter {
565    pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
566        let sink_proto = param.to_proto();
567        let stream_handle = EmbeddedConnectorClient::new()?
568            .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
569            .await?;
570
571        Ok(Self {
572            epoch: None,
573            batch_id: 0,
574            stream_handle,
575            metrics,
576        })
577    }
578
579    #[cfg(test)]
580    fn for_test(
581        response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
582        request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
583    ) -> CoordinatedRemoteSinkWriter {
584        use futures::StreamExt;
585
586        let stream_handle = SinkWriterStreamHandle::for_test(
587            request_sender,
588            ReceiverStream::new(response_receiver)
589                .map_err(RpcError::from)
590                .boxed(),
591        );
592
593        CoordinatedRemoteSinkWriter {
594            epoch: None,
595            batch_id: 0,
596            stream_handle,
597            metrics: SinkWriterMetrics::for_test(),
598        }
599    }
600}
601
602#[async_trait]
603impl SinkWriter for CoordinatedRemoteSinkWriter {
604    type CommitMetadata = Option<SinkMetadata>;
605
606    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
607        let cardinality = chunk.cardinality();
608        self.metrics
609            .connector_sink_rows_received
610            .inc_by(cardinality as _);
611
612        let epoch = self.epoch.ok_or_else(|| {
613            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
614        })?;
615        let batch_id = self.batch_id;
616        self.stream_handle
617            .request_sender
618            .send_request(JniSinkWriterStreamRequest::Chunk {
619                chunk,
620                epoch,
621                batch_id,
622            })
623            .await?;
624        self.batch_id += 1;
625        Ok(())
626    }
627
628    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
629        self.epoch = Some(epoch);
630        Ok(())
631    }
632
633    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
634        let epoch = self.epoch.ok_or_else(|| {
635            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
636        })?;
637        if is_checkpoint {
638            // TODO: add metrics to measure commit time
639            let rsp = self.stream_handle.commit(epoch).await?;
640            rsp.metadata
641                .ok_or_else(|| {
642                    SinkError::Remote(anyhow!(
643                        "get none metadata in commit response for coordinated sink writer"
644                    ))
645                })
646                .map(Some)
647        } else {
648            self.stream_handle.barrier(epoch).await?;
649            Ok(None)
650        }
651    }
652}
653
654pub struct RemoteCoordinator {
655    stream_handle: SinkCoordinatorStreamHandle,
656}
657
658impl RemoteCoordinator {
659    pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
660        let stream_handle = EmbeddedConnectorClient::new()?
661            .start_sink_coordinator_stream(param.clone())
662            .await?;
663
664        tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
665
666        Ok(RemoteCoordinator { stream_handle })
667    }
668}
669
670#[async_trait]
671impl SinkCommitCoordinator for RemoteCoordinator {
672    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
673        Ok(None)
674    }
675
676    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
677        Ok(self.stream_handle.commit(epoch, metadata).await?)
678    }
679}
680
681struct EmbeddedConnectorClient {
682    jvm: &'static JavaVM,
683}
684
685impl EmbeddedConnectorClient {
686    fn new() -> Result<Self> {
687        let jvm = JVM
688            .get_or_init()
689            .context("failed to create EmbeddedConnectorClient")?;
690        Ok(EmbeddedConnectorClient { jvm })
691    }
692
693    async fn start_sink_writer_stream(
694        &self,
695        payload_schema: Option<TableSchema>,
696        sink_proto: PbSinkParam,
697    ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
698        let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
699            SinkWriterStreamRequest {
700                request: Some(SinkRequest::Start(StartSink {
701                    sink_param: Some(sink_proto),
702                    payload_schema,
703                })),
704            },
705            |rx| async move {
706                let rx = self.start_jvm_worker_thread(
707                    gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
708                    "runJniSinkWriterThread",
709                    rx,
710                );
711                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
712            },
713        )
714        .await?;
715
716        match first_rsp {
717            SinkWriterStreamResponse {
718                response: Some(sink_writer_stream_response::Response::Start(_)),
719            } => Ok(handle),
720            msg => Err(SinkError::Internal(anyhow!(
721                "should get start response but get {:?}",
722                msg
723            ))),
724        }
725    }
726
727    async fn start_sink_coordinator_stream(
728        &self,
729        param: SinkParam,
730    ) -> Result<SinkCoordinatorStreamHandle> {
731        let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
732            SinkCoordinatorStreamRequest {
733                request: Some(sink_coordinator_stream_request::Request::Start(
734                    StartCoordinator {
735                        param: Some(param.to_proto()),
736                    },
737                )),
738            },
739            |rx| async move {
740                let rx = self.start_jvm_worker_thread(
741                    gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
742                    "runJniSinkCoordinatorThread",
743                    rx,
744                );
745                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
746            },
747        )
748        .await?;
749
750        match first_rsp {
751            SinkCoordinatorStreamResponse {
752                response: Some(sink_coordinator_stream_response::Response::Start(_)),
753            } => Ok(handle),
754            msg => Err(SinkError::Internal(anyhow!(
755                "should get start response but get {:?}",
756                msg
757            ))),
758        }
759    }
760
761    fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
762        &self,
763        class_name: &'static str,
764        method_name: &'static str,
765        mut request_rx: JniReceiverType<REQ>,
766    ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
767        let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
768            mpsc::channel(DEFAULT_BUFFER_SIZE);
769
770        let jvm = self.jvm;
771        std::thread::spawn(move || {
772            let result = execute_with_jni_env(jvm, |env| {
773                let result = call_static_method!(
774                    env,
775                    class_name,
776                    method_name,
777                    {{void}, {long requestRx, long responseTx}},
778                    &mut request_rx as *mut JniReceiverType<REQ>,
779                    &mut response_tx as *mut JniSenderType<RSP>
780                );
781
782                match result {
783                    Ok(_) => {
784                        tracing::debug!("end of jni call {}::{}", class_name, method_name);
785                    }
786                    Err(e) => {
787                        tracing::error!(error = %e.as_report(), "jni call error");
788                    }
789                };
790
791                Ok(())
792            });
793
794            if let Err(e) = result {
795                let _ = response_tx.blocking_send(Err(e));
796            }
797        });
798        response_rx
799    }
800}
801
802#[cfg(test)]
803mod test {
804    use std::time::Duration;
805
806    use risingwave_common::array::StreamChunk;
807    use risingwave_common::test_prelude::StreamChunkTestExt;
808    use risingwave_jni_core::JniSinkWriterStreamRequest;
809    use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
810    use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
811    use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
812    use tokio::sync::mpsc;
813
814    use crate::sink::SinkWriter;
815    use crate::sink::remote::CoordinatedRemoteSinkWriter;
816
817    #[tokio::test]
818    async fn test_epoch_check() {
819        let (request_sender, mut request_recv) = mpsc::channel(16);
820        let (_, resp_recv) = mpsc::channel(16);
821
822        let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
823        let chunk = StreamChunk::from_pretty(
824            " i T
825            + 1 Ripper
826        ",
827        );
828
829        // test epoch check
830        assert!(
831            tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
832                .await
833                .expect("test failed: should not commit without epoch")
834                .is_err(),
835            "test failed: no epoch check for commit()"
836        );
837        assert!(
838            request_recv.try_recv().is_err(),
839            "test failed: unchecked epoch before request"
840        );
841
842        assert!(
843            tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
844                .await
845                .expect("test failed: should not write without epoch")
846                .is_err(),
847            "test failed: no epoch check for write_batch()"
848        );
849        assert!(
850            request_recv.try_recv().is_err(),
851            "test failed: unchecked epoch before request"
852        );
853    }
854
855    #[tokio::test]
856    async fn test_remote_sink() {
857        let (request_sender, mut request_receiver) = mpsc::channel(16);
858        let (response_sender, response_receiver) = mpsc::channel(16);
859        let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
860
861        let chunk_a = StreamChunk::from_pretty(
862            " i T
863            + 1 Alice
864            + 2 Bob
865            + 3 Clare
866        ",
867        );
868        let chunk_b = StreamChunk::from_pretty(
869            " i T
870            + 4 David
871            + 5 Eve
872            + 6 Frank
873        ",
874        );
875
876        // test write batch
877        sink.begin_epoch(2022).await.unwrap();
878        assert_eq!(sink.epoch, Some(2022));
879
880        sink.write_batch(chunk_a.clone()).await.unwrap();
881        assert_eq!(sink.epoch, Some(2022));
882        assert_eq!(sink.batch_id, 1);
883        match request_receiver.recv().await.unwrap() {
884            JniSinkWriterStreamRequest::Chunk {
885                epoch,
886                batch_id,
887                chunk,
888            } => {
889                assert_eq!(epoch, 2022);
890                assert_eq!(batch_id, 0);
891                assert_eq!(chunk, chunk_a);
892            }
893            _ => panic!("test failed: failed to construct write request"),
894        }
895
896        // test commit
897        response_sender
898            .send(Ok(SinkWriterStreamResponse {
899                response: Some(Response::Commit(CommitResponse {
900                    epoch: 2022,
901                    metadata: None,
902                })),
903            }))
904            .await
905            .expect("test failed: failed to sync epoch");
906        sink.barrier(false).await.unwrap();
907        let commit_request = request_receiver.recv().await.unwrap();
908        match commit_request {
909            JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
910                request:
911                    Some(Request::Barrier(Barrier {
912                        epoch,
913                        is_checkpoint: false,
914                    })),
915            }) => {
916                assert_eq!(epoch, 2022);
917            }
918            _ => panic!("test failed: failed to construct sync request "),
919        };
920
921        // begin another epoch
922        sink.begin_epoch(2023).await.unwrap();
923        assert_eq!(sink.epoch, Some(2023));
924
925        // test another write
926        sink.write_batch(chunk_b.clone()).await.unwrap();
927        assert_eq!(sink.epoch, Some(2023));
928        assert_eq!(sink.batch_id, 2);
929        match request_receiver.recv().await.unwrap() {
930            JniSinkWriterStreamRequest::Chunk {
931                epoch,
932                batch_id,
933                chunk,
934            } => {
935                assert_eq!(epoch, 2023);
936                assert_eq!(batch_id, 1);
937                assert_eq!(chunk, chunk_b);
938            }
939            _ => panic!("test failed: failed to construct write request"),
940        }
941    }
942}