risingwave_connector/sink/
remote.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use phf::phf_set;
29use prost::Message;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::bail;
32use risingwave_common::catalog::{ColumnDesc, ColumnId};
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env};
36use risingwave_jni_core::{
37    JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41    Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44    PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45    SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46    ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47    sink_writer_stream_response,
48};
49use risingwave_rpc_client::error::RpcError;
50use risingwave_rpc_client::{
51    BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
52    SinkWriterStreamHandle,
53};
54use rw_futures_util::drop_either_future;
55use sea_orm::DatabaseConnection;
56use thiserror_ext::AsReport;
57use tokio::sync::mpsc;
58use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
59use tokio::task::spawn_blocking;
60use tokio_stream::wrappers::ReceiverStream;
61use tracing::warn;
62
63use super::SinkCommittedEpochSubscriber;
64use super::elasticsearch_opensearch::elasticsearch_converter::{
65    StreamChunkConverter, is_remote_es_sink,
66};
67use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
68use crate::connector_common::IcebergSinkCompactionUpdate;
69use crate::enforce_secret::EnforceSecret;
70use crate::error::ConnectorResult;
71use crate::sink::coordinate::CoordinatedLogSinker;
72use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
73use crate::sink::writer::SinkWriter;
74use crate::sink::{
75    DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
76    SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
77};
78
79macro_rules! def_remote_sink {
80    () => {
81        def_remote_sink! {
82            //todo!, delete java impl
83            // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" }
84            // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"}
85            { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
86            { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
87        }
88    };
89    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
90        #[derive(Debug)]
91        pub struct $variant_name;
92        impl RemoteSinkTrait for $variant_name {
93            const SINK_NAME: &'static str = $sink_name;
94        }
95        impl EnforceSecret for $variant_name {
96            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
97                $($enforce_secret_prop),*
98            };
99        }
100        pub type $sink_type_name = RemoteSink<$variant_name>;
101    };
102    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
103        #[derive(Debug)]
104        pub struct $variant_name;
105        impl RemoteSinkTrait for $variant_name {
106            const SINK_NAME: &'static str = $sink_name;
107            fn default_sink_decouple($desc: &SinkDesc) -> bool {
108                $body
109            }
110        }
111        impl EnforceSecret for $variant_name {
112            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
113                $($enforce_secret_prop),*
114            };
115        }
116        pub type $sink_type_name = RemoteSink<$variant_name>;
117    };
118    ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
119        def_remote_sink! {
120            {$($first)+}
121        }
122        def_remote_sink! {
123            $({$($rest)+})*
124        }
125    };
126    ($($invalid:tt)*) => {
127        compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
128    }
129}
130
131def_remote_sink!();
132
133pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
134    const SINK_NAME: &'static str;
135    fn default_sink_decouple() -> bool {
136        true
137    }
138}
139
140#[derive(Debug)]
141pub struct RemoteSink<R: RemoteSinkTrait> {
142    param: SinkParam,
143    _phantom: PhantomData<R>,
144}
145
146impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
147    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
148}
149
150impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
151    type Error = SinkError;
152
153    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
154        Ok(Self {
155            param,
156            _phantom: PhantomData,
157        })
158    }
159}
160
161impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
162    type Coordinator = DummySinkCommitCoordinator;
163    type LogSinker = RemoteLogSinker;
164
165    const SINK_NAME: &'static str = R::SINK_NAME;
166
167    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
168        match user_specified {
169            SinkDecouple::Default => Ok(R::default_sink_decouple()),
170            SinkDecouple::Enable => Ok(true),
171            SinkDecouple::Disable => Ok(false),
172        }
173    }
174
175    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
176        RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
177    }
178
179    async fn validate(&self) -> Result<()> {
180        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
181        Ok(())
182    }
183}
184
185async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
186    // if sink_name == OpenSearchJavaSink::SINK_NAME {
187    //     risingwave_common::license::Feature::OpenSearchSink
188    //         .check_available()
189    //         .map_err(|e| anyhow::anyhow!(e))?;
190    // }
191    if is_remote_es_sink(sink_name)
192        && param.downstream_pk.len() > 1
193        && !param.properties.contains_key(ES_OPTION_DELIMITER)
194    {
195        bail!("Es sink only supports single pk or pk with delimiter option");
196    }
197    // FIXME: support struct and array in stream sink
198    param.columns.iter().try_for_each(|col| {
199        match &col.data_type {
200            DataType::Int16
201                    | DataType::Int32
202                    | DataType::Int64
203                    | DataType::Float32
204                    | DataType::Float64
205                    | DataType::Boolean
206                    | DataType::Decimal
207                    | DataType::Timestamp
208                    | DataType::Timestamptz
209                    | DataType::Varchar
210                    | DataType::Date
211                    | DataType::Time
212                    | DataType::Interval
213                    | DataType::Jsonb
214                    | DataType::Bytea => Ok(()),
215            DataType::List(list) => {
216                if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
217                    Ok(())
218                } else{
219                    Err(SinkError::Remote(anyhow!(
220                        "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
221                        col.name,
222                        col.data_type,
223                    )))
224                }
225            },
226            DataType::Struct(_) => {
227                if is_remote_es_sink(sink_name){
228                    Ok(())
229                }else{
230                    Err(SinkError::Remote(anyhow!(
231                        "Only Es sink supports struct, got {:?}: {:?}",
232                        col.name,
233                        col.data_type,
234                    )))
235                }
236            },
237            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
238            DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
239                            "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
240                            col.name,
241                            col.data_type,
242                        )))}})?;
243
244    let jvm = JVM.get_or_init()?;
245    let sink_param = param.to_proto();
246
247    spawn_blocking(move || -> anyhow::Result<()> {
248        execute_with_jni_env(jvm, |env| {
249            let validate_sink_request = ValidateSinkRequest {
250                sink_param: Some(sink_param),
251            };
252            let validate_sink_request_bytes =
253                env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
254
255            let validate_sink_response_bytes = call_static_method!(
256                env,
257                {com.risingwave.connector.JniSinkValidationHandler},
258                {byte[] validate(byte[] validateSourceRequestBytes)},
259                &validate_sink_request_bytes
260            )?;
261
262            let validate_sink_response: ValidateSinkResponse = Message::decode(
263                risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
264            )?;
265
266            validate_sink_response.error.map_or_else(
267                || Ok(()), // If there is no error message, return Ok here.
268                |err| bail!("sink cannot pass validation: {}", err.error_message),
269            )
270        })
271    })
272    .await
273    .context("JoinHandle returns error")??;
274
275    Ok(())
276}
277
278pub struct RemoteLogSinker {
279    request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
280    response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
281    stream_chunk_converter: StreamChunkConverter,
282    sink_writer_metrics: SinkWriterMetrics,
283}
284
285impl RemoteLogSinker {
286    async fn new(
287        sink_param: SinkParam,
288        writer_param: SinkWriterParam,
289        sink_name: &str,
290    ) -> Result<Self> {
291        let sink_proto = sink_param.to_proto();
292        let payload_schema = if is_remote_es_sink(sink_name) {
293            let columns = vec![
294                ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
295                ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
296                ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
297                ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
298            ];
299            Some(TableSchema {
300                columns,
301                pk_indices: vec![],
302            })
303        } else {
304            sink_proto.table_schema.clone()
305        };
306
307        let SinkWriterStreamHandle {
308            request_sender,
309            response_stream,
310        } = EmbeddedConnectorClient::new()?
311            .start_sink_writer_stream(payload_schema, sink_proto)
312            .await?;
313
314        Ok(RemoteLogSinker {
315            request_sender,
316            response_stream,
317            sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
318            stream_chunk_converter: StreamChunkConverter::new(
319                sink_name,
320                sink_param.schema(),
321                &sink_param.downstream_pk,
322                &sink_param.properties,
323                sink_param.sink_type.is_append_only(),
324            )?,
325        })
326    }
327}
328
329#[async_trait]
330impl LogSinker for RemoteLogSinker {
331    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
332        log_reader.start_from(None).await?;
333        let mut request_tx = self.request_sender;
334        let mut response_err_stream_rx = self.response_stream;
335        let sink_writer_metrics = self.sink_writer_metrics;
336
337        let (response_tx, mut response_rx) = unbounded_channel();
338
339        let poll_response_stream = async move {
340            loop {
341                let result = response_err_stream_rx
342                    .stream
343                    .try_next()
344                    .instrument_await("log_sinker_wait_next_response")
345                    .await;
346                match result {
347                    Ok(Some(response)) => {
348                        response_tx.send(response).map_err(|err| {
349                            SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
350                        })?;
351                    }
352                    Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
353                    Err(e) => return Err(SinkError::Remote(anyhow!(e))),
354                }
355            }
356        };
357
358        let poll_consume_log_and_sink = async move {
359            fn truncate_matched_offset(
360                queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
361                persisted_offset: TruncateOffset,
362                log_reader: &mut impl SinkLogReader,
363                sink_writer_metrics: &SinkWriterMetrics,
364            ) -> Result<()> {
365                while let Some((sent_offset, _)) = queue.front()
366                    && sent_offset < &persisted_offset
367                {
368                    queue.pop_front();
369                }
370
371                let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
372                    anyhow!("get unsent offset {:?} in response", persisted_offset)
373                })?;
374                if sent_offset != persisted_offset {
375                    bail!(
376                        "new response offset {:?} does not match the buffer offset {:?}",
377                        persisted_offset,
378                        sent_offset
379                    );
380                }
381
382                if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
383                    (persisted_offset, start_time)
384                {
385                    sink_writer_metrics
386                        .sink_commit_duration
387                        .observe(start_time.elapsed().as_millis() as f64);
388                }
389
390                log_reader.truncate(persisted_offset)?;
391                Ok(())
392            }
393
394            let mut prev_offset: Option<TruncateOffset> = None;
395            // Push from back and pop from front
396            let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
397                VecDeque::new();
398
399            loop {
400                let either_result: futures::future::Either<
401                    Option<SinkWriterStreamResponse>,
402                    LogStoreResult<(u64, LogStoreReadItem)>,
403                > = drop_either_future(
404                    select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
405                );
406                match either_result {
407                    futures::future::Either::Left(opt) => {
408                        let response = opt.context("end of response stream")?;
409                        match response {
410                            SinkWriterStreamResponse {
411                                response:
412                                    Some(sink_writer_stream_response::Response::Batch(
413                                        sink_writer_stream_response::BatchWrittenResponse {
414                                            epoch,
415                                            batch_id,
416                                        },
417                                    )),
418                            } => {
419                                truncate_matched_offset(
420                                    &mut sent_offset_queue,
421                                    TruncateOffset::Chunk {
422                                        epoch,
423                                        chunk_id: batch_id as _,
424                                    },
425                                    &mut log_reader,
426                                    &sink_writer_metrics,
427                                )?;
428                            }
429                            SinkWriterStreamResponse {
430                                response:
431                                    Some(sink_writer_stream_response::Response::Commit(
432                                        sink_writer_stream_response::CommitResponse {
433                                            epoch,
434                                            metadata,
435                                        },
436                                    )),
437                            } => {
438                                if let Some(metadata) = metadata {
439                                    warn!("get unexpected non-empty metadata: {:?}", metadata);
440                                }
441                                truncate_matched_offset(
442                                    &mut sent_offset_queue,
443                                    TruncateOffset::Barrier { epoch },
444                                    &mut log_reader,
445                                    &sink_writer_metrics,
446                                )?;
447                            }
448                            response => {
449                                return Err(SinkError::Remote(anyhow!(
450                                    "get unexpected response: {:?}",
451                                    response
452                                )));
453                            }
454                        }
455                    }
456                    futures::future::Either::Right(result) => {
457                        let (epoch, item): (u64, LogStoreReadItem) = result?;
458
459                        match item {
460                            LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
461                                let offset = TruncateOffset::Chunk { epoch, chunk_id };
462                                if let Some(prev_offset) = &prev_offset {
463                                    prev_offset.check_next_offset(offset)?;
464                                }
465                                let cardinality = chunk.cardinality();
466                                sink_writer_metrics
467                                    .connector_sink_rows_received
468                                    .inc_by(cardinality as _);
469
470                                let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
471                                request_tx
472                                    .send_request(JniSinkWriterStreamRequest::Chunk {
473                                        epoch,
474                                        batch_id: chunk_id as u64,
475                                        chunk,
476                                    })
477                                    .instrument_await(span!(
478                                        "log_sinker_send_chunk (chunk {chunk_id})"
479                                    ))
480                                    .await?;
481                                prev_offset = Some(offset);
482                                sent_offset_queue
483                                    .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
484                            }
485                            LogStoreReadItem::Barrier { is_checkpoint, .. } => {
486                                let offset = TruncateOffset::Barrier { epoch };
487                                if let Some(prev_offset) = &prev_offset {
488                                    prev_offset.check_next_offset(offset)?;
489                                }
490                                let start_time = if is_checkpoint {
491                                    let start_time = Instant::now();
492                                    request_tx
493                                        .barrier(epoch, true)
494                                        .instrument_await(span!(
495                                            "log_sinker_commit_checkpoint (epoch {epoch})"
496                                        ))
497                                        .await?;
498                                    Some(start_time)
499                                } else {
500                                    request_tx
501                                        .barrier(epoch, false)
502                                        .instrument_await(span!(
503                                            "log_sinker_send_barrier (epoch {epoch})"
504                                        ))
505                                        .await?;
506                                    None
507                                };
508                                prev_offset = Some(offset);
509                                sent_offset_queue
510                                    .push_back((TruncateOffset::Barrier { epoch }, start_time));
511                            }
512                        }
513                    }
514                }
515            }
516        };
517
518        select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
519            .await
520            .factor_first()
521            .0
522    }
523}
524
525#[derive(Debug)]
526pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
527    param: SinkParam,
528    _phantom: PhantomData<R>,
529}
530
531impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
532    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
533}
534
535impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
536    type Error = SinkError;
537
538    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
539        Ok(Self {
540            param,
541            _phantom: PhantomData,
542        })
543    }
544}
545
546impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
547    type Coordinator = RemoteCoordinator;
548    type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
549
550    const SINK_NAME: &'static str = R::SINK_NAME;
551
552    async fn validate(&self) -> Result<()> {
553        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
554        Ok(())
555    }
556
557    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
558        let metrics = SinkWriterMetrics::new(&writer_param);
559        CoordinatedLogSinker::new(
560            &writer_param,
561            self.param.clone(),
562            CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
563            NonZero::new(1).unwrap(),
564        )
565        .await
566    }
567
568    fn is_coordinated_sink(&self) -> bool {
569        true
570    }
571
572    async fn new_coordinator(
573        &self,
574        _db: DatabaseConnection,
575        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
576    ) -> Result<Self::Coordinator> {
577        RemoteCoordinator::new::<R>(self.param.clone()).await
578    }
579}
580
581pub struct CoordinatedRemoteSinkWriter {
582    epoch: Option<u64>,
583    batch_id: u64,
584    stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
585    metrics: SinkWriterMetrics,
586}
587
588impl CoordinatedRemoteSinkWriter {
589    pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
590        let sink_proto = param.to_proto();
591        let stream_handle = EmbeddedConnectorClient::new()?
592            .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
593            .await?;
594
595        Ok(Self {
596            epoch: None,
597            batch_id: 0,
598            stream_handle,
599            metrics,
600        })
601    }
602
603    #[cfg(test)]
604    fn for_test(
605        response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
606        request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
607    ) -> CoordinatedRemoteSinkWriter {
608        use futures::StreamExt;
609
610        let stream_handle = SinkWriterStreamHandle::for_test(
611            request_sender,
612            ReceiverStream::new(response_receiver)
613                .map_err(RpcError::from)
614                .boxed(),
615        );
616
617        CoordinatedRemoteSinkWriter {
618            epoch: None,
619            batch_id: 0,
620            stream_handle,
621            metrics: SinkWriterMetrics::for_test(),
622        }
623    }
624}
625
626#[async_trait]
627impl SinkWriter for CoordinatedRemoteSinkWriter {
628    type CommitMetadata = Option<SinkMetadata>;
629
630    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
631        let cardinality = chunk.cardinality();
632        self.metrics
633            .connector_sink_rows_received
634            .inc_by(cardinality as _);
635
636        let epoch = self.epoch.ok_or_else(|| {
637            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
638        })?;
639        let batch_id = self.batch_id;
640        self.stream_handle
641            .request_sender
642            .send_request(JniSinkWriterStreamRequest::Chunk {
643                chunk,
644                epoch,
645                batch_id,
646            })
647            .await?;
648        self.batch_id += 1;
649        Ok(())
650    }
651
652    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
653        self.epoch = Some(epoch);
654        Ok(())
655    }
656
657    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
658        let epoch = self.epoch.ok_or_else(|| {
659            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
660        })?;
661        if is_checkpoint {
662            // TODO: add metrics to measure commit time
663            let rsp = self.stream_handle.commit(epoch).await?;
664            rsp.metadata
665                .ok_or_else(|| {
666                    SinkError::Remote(anyhow!(
667                        "get none metadata in commit response for coordinated sink writer"
668                    ))
669                })
670                .map(Some)
671        } else {
672            self.stream_handle.barrier(epoch).await?;
673            Ok(None)
674        }
675    }
676}
677
678pub struct RemoteCoordinator {
679    stream_handle: SinkCoordinatorStreamHandle,
680}
681
682impl RemoteCoordinator {
683    pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
684        let stream_handle = EmbeddedConnectorClient::new()?
685            .start_sink_coordinator_stream(param.clone())
686            .await?;
687
688        tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
689
690        Ok(RemoteCoordinator { stream_handle })
691    }
692}
693
694#[async_trait]
695impl SinkCommitCoordinator for RemoteCoordinator {
696    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
697        Ok(None)
698    }
699
700    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
701        Ok(self.stream_handle.commit(epoch, metadata).await?)
702    }
703}
704
705struct EmbeddedConnectorClient {
706    jvm: &'static JavaVM,
707}
708
709impl EmbeddedConnectorClient {
710    fn new() -> Result<Self> {
711        let jvm = JVM
712            .get_or_init()
713            .context("failed to create EmbeddedConnectorClient")?;
714        Ok(EmbeddedConnectorClient { jvm })
715    }
716
717    async fn start_sink_writer_stream(
718        &self,
719        payload_schema: Option<TableSchema>,
720        sink_proto: PbSinkParam,
721    ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
722        let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
723            SinkWriterStreamRequest {
724                request: Some(SinkRequest::Start(StartSink {
725                    sink_param: Some(sink_proto),
726                    payload_schema,
727                })),
728            },
729            |rx| async move {
730                let rx = self.start_jvm_worker_thread(
731                    gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
732                    "runJniSinkWriterThread",
733                    rx,
734                );
735                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
736            },
737        )
738        .await?;
739
740        match first_rsp {
741            SinkWriterStreamResponse {
742                response: Some(sink_writer_stream_response::Response::Start(_)),
743            } => Ok(handle),
744            msg => Err(SinkError::Internal(anyhow!(
745                "should get start response but get {:?}",
746                msg
747            ))),
748        }
749    }
750
751    async fn start_sink_coordinator_stream(
752        &self,
753        param: SinkParam,
754    ) -> Result<SinkCoordinatorStreamHandle> {
755        let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
756            SinkCoordinatorStreamRequest {
757                request: Some(sink_coordinator_stream_request::Request::Start(
758                    StartCoordinator {
759                        param: Some(param.to_proto()),
760                    },
761                )),
762            },
763            |rx| async move {
764                let rx = self.start_jvm_worker_thread(
765                    gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
766                    "runJniSinkCoordinatorThread",
767                    rx,
768                );
769                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
770            },
771        )
772        .await?;
773
774        match first_rsp {
775            SinkCoordinatorStreamResponse {
776                response: Some(sink_coordinator_stream_response::Response::Start(_)),
777            } => Ok(handle),
778            msg => Err(SinkError::Internal(anyhow!(
779                "should get start response but get {:?}",
780                msg
781            ))),
782        }
783    }
784
785    fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
786        &self,
787        class_name: &'static str,
788        method_name: &'static str,
789        mut request_rx: JniReceiverType<REQ>,
790    ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
791        let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
792            mpsc::channel(DEFAULT_BUFFER_SIZE);
793
794        let jvm = self.jvm;
795        std::thread::spawn(move || {
796            let result = execute_with_jni_env(jvm, |env| {
797                let result = call_static_method!(
798                    env,
799                    class_name,
800                    method_name,
801                    {{void}, {long requestRx, long responseTx}},
802                    &mut request_rx as *mut JniReceiverType<REQ>,
803                    &mut response_tx as *mut JniSenderType<RSP>
804                );
805
806                match result {
807                    Ok(_) => {
808                        tracing::debug!("end of jni call {}::{}", class_name, method_name);
809                    }
810                    Err(e) => {
811                        tracing::error!(error = %e.as_report(), "jni call error");
812                    }
813                };
814
815                Ok(())
816            });
817
818            if let Err(e) = result {
819                let _ = response_tx.blocking_send(Err(e));
820            }
821        });
822        response_rx
823    }
824}
825
826#[cfg(test)]
827mod test {
828    use std::time::Duration;
829
830    use risingwave_common::array::StreamChunk;
831    use risingwave_common::test_prelude::StreamChunkTestExt;
832    use risingwave_jni_core::JniSinkWriterStreamRequest;
833    use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
834    use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
835    use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
836    use tokio::sync::mpsc;
837
838    use crate::sink::SinkWriter;
839    use crate::sink::remote::CoordinatedRemoteSinkWriter;
840
841    #[tokio::test]
842    async fn test_epoch_check() {
843        let (request_sender, mut request_recv) = mpsc::channel(16);
844        let (_, resp_recv) = mpsc::channel(16);
845
846        let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
847        let chunk = StreamChunk::from_pretty(
848            " i T
849            + 1 Ripper
850        ",
851        );
852
853        // test epoch check
854        assert!(
855            tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
856                .await
857                .expect("test failed: should not commit without epoch")
858                .is_err(),
859            "test failed: no epoch check for commit()"
860        );
861        assert!(
862            request_recv.try_recv().is_err(),
863            "test failed: unchecked epoch before request"
864        );
865
866        assert!(
867            tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
868                .await
869                .expect("test failed: should not write without epoch")
870                .is_err(),
871            "test failed: no epoch check for write_batch()"
872        );
873        assert!(
874            request_recv.try_recv().is_err(),
875            "test failed: unchecked epoch before request"
876        );
877    }
878
879    #[tokio::test]
880    async fn test_remote_sink() {
881        let (request_sender, mut request_receiver) = mpsc::channel(16);
882        let (response_sender, response_receiver) = mpsc::channel(16);
883        let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
884
885        let chunk_a = StreamChunk::from_pretty(
886            " i T
887            + 1 Alice
888            + 2 Bob
889            + 3 Clare
890        ",
891        );
892        let chunk_b = StreamChunk::from_pretty(
893            " i T
894            + 4 David
895            + 5 Eve
896            + 6 Frank
897        ",
898        );
899
900        // test write batch
901        sink.begin_epoch(2022).await.unwrap();
902        assert_eq!(sink.epoch, Some(2022));
903
904        sink.write_batch(chunk_a.clone()).await.unwrap();
905        assert_eq!(sink.epoch, Some(2022));
906        assert_eq!(sink.batch_id, 1);
907        match request_receiver.recv().await.unwrap() {
908            JniSinkWriterStreamRequest::Chunk {
909                epoch,
910                batch_id,
911                chunk,
912            } => {
913                assert_eq!(epoch, 2022);
914                assert_eq!(batch_id, 0);
915                assert_eq!(chunk, chunk_a);
916            }
917            _ => panic!("test failed: failed to construct write request"),
918        }
919
920        // test commit
921        response_sender
922            .send(Ok(SinkWriterStreamResponse {
923                response: Some(Response::Commit(CommitResponse {
924                    epoch: 2022,
925                    metadata: None,
926                })),
927            }))
928            .await
929            .expect("test failed: failed to sync epoch");
930        sink.barrier(false).await.unwrap();
931        let commit_request = request_receiver.recv().await.unwrap();
932        match commit_request {
933            JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
934                request:
935                    Some(Request::Barrier(Barrier {
936                        epoch,
937                        is_checkpoint: false,
938                    })),
939            }) => {
940                assert_eq!(epoch, 2022);
941            }
942            _ => panic!("test failed: failed to construct sync request "),
943        };
944
945        // begin another epoch
946        sink.begin_epoch(2023).await.unwrap();
947        assert_eq!(sink.epoch, Some(2023));
948
949        // test another write
950        sink.write_batch(chunk_b.clone()).await.unwrap();
951        assert_eq!(sink.epoch, Some(2023));
952        assert_eq!(sink.batch_id, 2);
953        match request_receiver.recv().await.unwrap() {
954            JniSinkWriterStreamRequest::Chunk {
955                epoch,
956                batch_id,
957                chunk,
958            } => {
959                assert_eq!(epoch, 2023);
960                assert_eq!(batch_id, 1);
961                assert_eq!(chunk, chunk_b);
962            }
963            _ => panic!("test failed: failed to construct write request"),
964        }
965    }
966}