risingwave_connector/sink/
remote.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use phf::phf_set;
29use prost::Message;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::bail;
32use risingwave_common::catalog::{ColumnDesc, ColumnId};
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::{JVM, execute_with_jni_env};
36use risingwave_jni_core::{
37    JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41    Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44    PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45    SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46    ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47    sink_writer_stream_response,
48};
49use risingwave_rpc_client::error::RpcError;
50use risingwave_rpc_client::{
51    BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
52    SinkWriterStreamHandle,
53};
54use rw_futures_util::drop_either_future;
55use sea_orm::DatabaseConnection;
56use thiserror_ext::AsReport;
57use tokio::sync::mpsc;
58use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
59use tokio::task::spawn_blocking;
60use tokio_stream::wrappers::ReceiverStream;
61use tracing::warn;
62
63use super::SinkCommittedEpochSubscriber;
64use super::elasticsearch_opensearch::elasticsearch_converter::{
65    StreamChunkConverter, is_remote_es_sink,
66};
67use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
68use crate::connector_common::IcebergSinkCompactionUpdate;
69use crate::enforce_secret::EnforceSecret;
70use crate::error::ConnectorResult;
71use crate::sink::coordinate::CoordinatedLogSinker;
72use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
73use crate::sink::writer::SinkWriter;
74use crate::sink::{
75    DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
76    SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
77};
78
79macro_rules! def_remote_sink {
80    () => {
81        def_remote_sink! {
82            //todo!, delete java impl
83            // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" }
84            // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"}
85            { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
86            { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
87        }
88    };
89    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
90        #[derive(Debug)]
91        pub struct $variant_name;
92        impl RemoteSinkTrait for $variant_name {
93            const SINK_NAME: &'static str = $sink_name;
94        }
95        impl EnforceSecret for $variant_name {
96            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
97                $($enforce_secret_prop),*
98            };
99        }
100        pub type $sink_type_name = RemoteSink<$variant_name>;
101    };
102    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
103        #[derive(Debug)]
104        pub struct $variant_name;
105        impl RemoteSinkTrait for $variant_name {
106            const SINK_NAME: &'static str = $sink_name;
107            fn default_sink_decouple($desc: &SinkDesc) -> bool {
108                $body
109            }
110        }
111        impl EnforceSecret for $variant_name {
112            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
113                $($enforce_secret_prop),*
114            };
115        }
116        pub type $sink_type_name = RemoteSink<$variant_name>;
117    };
118    ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
119        def_remote_sink! {
120            {$($first)+}
121        }
122        def_remote_sink! {
123            $({$($rest)+})*
124        }
125    };
126    ($($invalid:tt)*) => {
127        compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
128    }
129}
130
131def_remote_sink!();
132
133pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
134    const SINK_NAME: &'static str;
135    fn default_sink_decouple() -> bool {
136        true
137    }
138}
139
140#[derive(Debug)]
141pub struct RemoteSink<R: RemoteSinkTrait> {
142    param: SinkParam,
143    _phantom: PhantomData<R>,
144}
145
146impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
147    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
148}
149
150impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
151    type Error = SinkError;
152
153    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
154        Ok(Self {
155            param,
156            _phantom: PhantomData,
157        })
158    }
159}
160
161impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
162    type Coordinator = DummySinkCommitCoordinator;
163    type LogSinker = RemoteLogSinker;
164
165    const SINK_NAME: &'static str = R::SINK_NAME;
166
167    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
168        match user_specified {
169            SinkDecouple::Default => Ok(R::default_sink_decouple()),
170            SinkDecouple::Enable => Ok(true),
171            SinkDecouple::Disable => Ok(false),
172        }
173    }
174
175    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
176        RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
177    }
178
179    async fn validate(&self) -> Result<()> {
180        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
181        Ok(())
182    }
183}
184
185async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
186    // if sink_name == OpenSearchJavaSink::SINK_NAME {
187    //     risingwave_common::license::Feature::OpenSearchSink
188    //         .check_available()
189    //         .map_err(|e| anyhow::anyhow!(e))?;
190    // }
191    if is_remote_es_sink(sink_name)
192        && param.downstream_pk.len() > 1
193        && !param.properties.contains_key(ES_OPTION_DELIMITER)
194    {
195        bail!("Es sink only supports single pk or pk with delimiter option");
196    }
197    // FIXME: support struct and array in stream sink
198    param.columns.iter().try_for_each(|col| {
199        match &col.data_type {
200            DataType::Int16
201                    | DataType::Int32
202                    | DataType::Int64
203                    | DataType::Float32
204                    | DataType::Float64
205                    | DataType::Boolean
206                    | DataType::Decimal
207                    | DataType::Timestamp
208                    | DataType::Timestamptz
209                    | DataType::Varchar
210                    | DataType::Date
211                    | DataType::Time
212                    | DataType::Interval
213                    | DataType::Jsonb
214                    | DataType::Bytea => Ok(()),
215            DataType::List(list) => {
216                if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
217                    Ok(())
218                } else{
219                    Err(SinkError::Remote(anyhow!(
220                        "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
221                        col.name,
222                        col.data_type,
223                    )))
224                }
225            },
226            DataType::Struct(_) => {
227                if is_remote_es_sink(sink_name){
228                    Ok(())
229                }else{
230                    Err(SinkError::Remote(anyhow!(
231                        "Only Es sink supports struct, got {:?}: {:?}",
232                        col.name,
233                        col.data_type,
234                    )))
235                }
236            },
237            DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
238                            "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
239                            col.name,
240                            col.data_type,
241                        )))}})?;
242
243    let jvm = JVM.get_or_init()?;
244    let sink_param = param.to_proto();
245
246    spawn_blocking(move || -> anyhow::Result<()> {
247        execute_with_jni_env(jvm, |env| {
248            let validate_sink_request = ValidateSinkRequest {
249                sink_param: Some(sink_param),
250            };
251            let validate_sink_request_bytes =
252                env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
253
254            let validate_sink_response_bytes = call_static_method!(
255                env,
256                {com.risingwave.connector.JniSinkValidationHandler},
257                {byte[] validate(byte[] validateSourceRequestBytes)},
258                &validate_sink_request_bytes
259            )?;
260
261            let validate_sink_response: ValidateSinkResponse = Message::decode(
262                risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
263            )?;
264
265            validate_sink_response.error.map_or_else(
266                || Ok(()), // If there is no error message, return Ok here.
267                |err| bail!("sink cannot pass validation: {}", err.error_message),
268            )
269        })
270    })
271    .await
272    .context("JoinHandle returns error")??;
273
274    Ok(())
275}
276
277pub struct RemoteLogSinker {
278    request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
279    response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
280    stream_chunk_converter: StreamChunkConverter,
281    sink_writer_metrics: SinkWriterMetrics,
282}
283
284impl RemoteLogSinker {
285    async fn new(
286        sink_param: SinkParam,
287        writer_param: SinkWriterParam,
288        sink_name: &str,
289    ) -> Result<Self> {
290        let sink_proto = sink_param.to_proto();
291        let payload_schema = if is_remote_es_sink(sink_name) {
292            let columns = vec![
293                ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
294                ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
295                ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
296                ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
297            ];
298            Some(TableSchema {
299                columns,
300                pk_indices: vec![],
301            })
302        } else {
303            sink_proto.table_schema.clone()
304        };
305
306        let SinkWriterStreamHandle {
307            request_sender,
308            response_stream,
309        } = EmbeddedConnectorClient::new()?
310            .start_sink_writer_stream(payload_schema, sink_proto)
311            .await?;
312
313        Ok(RemoteLogSinker {
314            request_sender,
315            response_stream,
316            sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
317            stream_chunk_converter: StreamChunkConverter::new(
318                sink_name,
319                sink_param.schema(),
320                &sink_param.downstream_pk,
321                &sink_param.properties,
322                sink_param.sink_type.is_append_only(),
323            )?,
324        })
325    }
326}
327
328#[async_trait]
329impl LogSinker for RemoteLogSinker {
330    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
331        log_reader.start_from(None).await?;
332        let mut request_tx = self.request_sender;
333        let mut response_err_stream_rx = self.response_stream;
334        let sink_writer_metrics = self.sink_writer_metrics;
335
336        let (response_tx, mut response_rx) = unbounded_channel();
337
338        let poll_response_stream = async move {
339            loop {
340                let result = response_err_stream_rx
341                    .stream
342                    .try_next()
343                    .instrument_await("log_sinker_wait_next_response")
344                    .await;
345                match result {
346                    Ok(Some(response)) => {
347                        response_tx.send(response).map_err(|err| {
348                            SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
349                        })?;
350                    }
351                    Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
352                    Err(e) => return Err(SinkError::Remote(anyhow!(e))),
353                }
354            }
355        };
356
357        let poll_consume_log_and_sink = async move {
358            fn truncate_matched_offset(
359                queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
360                persisted_offset: TruncateOffset,
361                log_reader: &mut impl SinkLogReader,
362                sink_writer_metrics: &SinkWriterMetrics,
363            ) -> Result<()> {
364                while let Some((sent_offset, _)) = queue.front()
365                    && sent_offset < &persisted_offset
366                {
367                    queue.pop_front();
368                }
369
370                let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
371                    anyhow!("get unsent offset {:?} in response", persisted_offset)
372                })?;
373                if sent_offset != persisted_offset {
374                    bail!(
375                        "new response offset {:?} does not match the buffer offset {:?}",
376                        persisted_offset,
377                        sent_offset
378                    );
379                }
380
381                if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
382                    (persisted_offset, start_time)
383                {
384                    sink_writer_metrics
385                        .sink_commit_duration
386                        .observe(start_time.elapsed().as_millis() as f64);
387                }
388
389                log_reader.truncate(persisted_offset)?;
390                Ok(())
391            }
392
393            let mut prev_offset: Option<TruncateOffset> = None;
394            // Push from back and pop from front
395            let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
396                VecDeque::new();
397
398            loop {
399                let either_result: futures::future::Either<
400                    Option<SinkWriterStreamResponse>,
401                    LogStoreResult<(u64, LogStoreReadItem)>,
402                > = drop_either_future(
403                    select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
404                );
405                match either_result {
406                    futures::future::Either::Left(opt) => {
407                        let response = opt.context("end of response stream")?;
408                        match response {
409                            SinkWriterStreamResponse {
410                                response:
411                                    Some(sink_writer_stream_response::Response::Batch(
412                                        sink_writer_stream_response::BatchWrittenResponse {
413                                            epoch,
414                                            batch_id,
415                                        },
416                                    )),
417                            } => {
418                                truncate_matched_offset(
419                                    &mut sent_offset_queue,
420                                    TruncateOffset::Chunk {
421                                        epoch,
422                                        chunk_id: batch_id as _,
423                                    },
424                                    &mut log_reader,
425                                    &sink_writer_metrics,
426                                )?;
427                            }
428                            SinkWriterStreamResponse {
429                                response:
430                                    Some(sink_writer_stream_response::Response::Commit(
431                                        sink_writer_stream_response::CommitResponse {
432                                            epoch,
433                                            metadata,
434                                        },
435                                    )),
436                            } => {
437                                if let Some(metadata) = metadata {
438                                    warn!("get unexpected non-empty metadata: {:?}", metadata);
439                                }
440                                truncate_matched_offset(
441                                    &mut sent_offset_queue,
442                                    TruncateOffset::Barrier { epoch },
443                                    &mut log_reader,
444                                    &sink_writer_metrics,
445                                )?;
446                            }
447                            response => {
448                                return Err(SinkError::Remote(anyhow!(
449                                    "get unexpected response: {:?}",
450                                    response
451                                )));
452                            }
453                        }
454                    }
455                    futures::future::Either::Right(result) => {
456                        let (epoch, item): (u64, LogStoreReadItem) = result?;
457
458                        match item {
459                            LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
460                                let offset = TruncateOffset::Chunk { epoch, chunk_id };
461                                if let Some(prev_offset) = &prev_offset {
462                                    prev_offset.check_next_offset(offset)?;
463                                }
464                                let cardinality = chunk.cardinality();
465                                sink_writer_metrics
466                                    .connector_sink_rows_received
467                                    .inc_by(cardinality as _);
468
469                                let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
470                                request_tx
471                                    .send_request(JniSinkWriterStreamRequest::Chunk {
472                                        epoch,
473                                        batch_id: chunk_id as u64,
474                                        chunk,
475                                    })
476                                    .instrument_await(span!(
477                                        "log_sinker_send_chunk (chunk {chunk_id})"
478                                    ))
479                                    .await?;
480                                prev_offset = Some(offset);
481                                sent_offset_queue
482                                    .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
483                            }
484                            LogStoreReadItem::Barrier { is_checkpoint, .. } => {
485                                let offset = TruncateOffset::Barrier { epoch };
486                                if let Some(prev_offset) = &prev_offset {
487                                    prev_offset.check_next_offset(offset)?;
488                                }
489                                let start_time = if is_checkpoint {
490                                    let start_time = Instant::now();
491                                    request_tx
492                                        .barrier(epoch, true)
493                                        .instrument_await(span!(
494                                            "log_sinker_commit_checkpoint (epoch {epoch})"
495                                        ))
496                                        .await?;
497                                    Some(start_time)
498                                } else {
499                                    request_tx
500                                        .barrier(epoch, false)
501                                        .instrument_await(span!(
502                                            "log_sinker_send_barrier (epoch {epoch})"
503                                        ))
504                                        .await?;
505                                    None
506                                };
507                                prev_offset = Some(offset);
508                                sent_offset_queue
509                                    .push_back((TruncateOffset::Barrier { epoch }, start_time));
510                            }
511                        }
512                    }
513                }
514            }
515        };
516
517        select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
518            .await
519            .factor_first()
520            .0
521    }
522}
523
524#[derive(Debug)]
525pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
526    param: SinkParam,
527    _phantom: PhantomData<R>,
528}
529
530impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
531    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
532}
533
534impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
535    type Error = SinkError;
536
537    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
538        Ok(Self {
539            param,
540            _phantom: PhantomData,
541        })
542    }
543}
544
545impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
546    type Coordinator = RemoteCoordinator;
547    type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
548
549    const SINK_NAME: &'static str = R::SINK_NAME;
550
551    async fn validate(&self) -> Result<()> {
552        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
553        Ok(())
554    }
555
556    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
557        let metrics = SinkWriterMetrics::new(&writer_param);
558        CoordinatedLogSinker::new(
559            &writer_param,
560            self.param.clone(),
561            CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
562            NonZero::new(1).unwrap(),
563        )
564        .await
565    }
566
567    fn is_coordinated_sink(&self) -> bool {
568        true
569    }
570
571    async fn new_coordinator(
572        &self,
573        _db: DatabaseConnection,
574        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
575    ) -> Result<Self::Coordinator> {
576        RemoteCoordinator::new::<R>(self.param.clone()).await
577    }
578}
579
580pub struct CoordinatedRemoteSinkWriter {
581    epoch: Option<u64>,
582    batch_id: u64,
583    stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
584    metrics: SinkWriterMetrics,
585}
586
587impl CoordinatedRemoteSinkWriter {
588    pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
589        let sink_proto = param.to_proto();
590        let stream_handle = EmbeddedConnectorClient::new()?
591            .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
592            .await?;
593
594        Ok(Self {
595            epoch: None,
596            batch_id: 0,
597            stream_handle,
598            metrics,
599        })
600    }
601
602    #[cfg(test)]
603    fn for_test(
604        response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
605        request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
606    ) -> CoordinatedRemoteSinkWriter {
607        use futures::StreamExt;
608
609        let stream_handle = SinkWriterStreamHandle::for_test(
610            request_sender,
611            ReceiverStream::new(response_receiver)
612                .map_err(RpcError::from)
613                .boxed(),
614        );
615
616        CoordinatedRemoteSinkWriter {
617            epoch: None,
618            batch_id: 0,
619            stream_handle,
620            metrics: SinkWriterMetrics::for_test(),
621        }
622    }
623}
624
625#[async_trait]
626impl SinkWriter for CoordinatedRemoteSinkWriter {
627    type CommitMetadata = Option<SinkMetadata>;
628
629    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
630        let cardinality = chunk.cardinality();
631        self.metrics
632            .connector_sink_rows_received
633            .inc_by(cardinality as _);
634
635        let epoch = self.epoch.ok_or_else(|| {
636            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
637        })?;
638        let batch_id = self.batch_id;
639        self.stream_handle
640            .request_sender
641            .send_request(JniSinkWriterStreamRequest::Chunk {
642                chunk,
643                epoch,
644                batch_id,
645            })
646            .await?;
647        self.batch_id += 1;
648        Ok(())
649    }
650
651    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
652        self.epoch = Some(epoch);
653        Ok(())
654    }
655
656    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
657        let epoch = self.epoch.ok_or_else(|| {
658            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
659        })?;
660        if is_checkpoint {
661            // TODO: add metrics to measure commit time
662            let rsp = self.stream_handle.commit(epoch).await?;
663            rsp.metadata
664                .ok_or_else(|| {
665                    SinkError::Remote(anyhow!(
666                        "get none metadata in commit response for coordinated sink writer"
667                    ))
668                })
669                .map(Some)
670        } else {
671            self.stream_handle.barrier(epoch).await?;
672            Ok(None)
673        }
674    }
675}
676
677pub struct RemoteCoordinator {
678    stream_handle: SinkCoordinatorStreamHandle,
679}
680
681impl RemoteCoordinator {
682    pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
683        let stream_handle = EmbeddedConnectorClient::new()?
684            .start_sink_coordinator_stream(param.clone())
685            .await?;
686
687        tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
688
689        Ok(RemoteCoordinator { stream_handle })
690    }
691}
692
693#[async_trait]
694impl SinkCommitCoordinator for RemoteCoordinator {
695    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
696        Ok(None)
697    }
698
699    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
700        Ok(self.stream_handle.commit(epoch, metadata).await?)
701    }
702}
703
704struct EmbeddedConnectorClient {
705    jvm: &'static JavaVM,
706}
707
708impl EmbeddedConnectorClient {
709    fn new() -> Result<Self> {
710        let jvm = JVM
711            .get_or_init()
712            .context("failed to create EmbeddedConnectorClient")?;
713        Ok(EmbeddedConnectorClient { jvm })
714    }
715
716    async fn start_sink_writer_stream(
717        &self,
718        payload_schema: Option<TableSchema>,
719        sink_proto: PbSinkParam,
720    ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
721        let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
722            SinkWriterStreamRequest {
723                request: Some(SinkRequest::Start(StartSink {
724                    sink_param: Some(sink_proto),
725                    payload_schema,
726                })),
727            },
728            |rx| async move {
729                let rx = self.start_jvm_worker_thread(
730                    gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
731                    "runJniSinkWriterThread",
732                    rx,
733                );
734                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
735            },
736        )
737        .await?;
738
739        match first_rsp {
740            SinkWriterStreamResponse {
741                response: Some(sink_writer_stream_response::Response::Start(_)),
742            } => Ok(handle),
743            msg => Err(SinkError::Internal(anyhow!(
744                "should get start response but get {:?}",
745                msg
746            ))),
747        }
748    }
749
750    async fn start_sink_coordinator_stream(
751        &self,
752        param: SinkParam,
753    ) -> Result<SinkCoordinatorStreamHandle> {
754        let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
755            SinkCoordinatorStreamRequest {
756                request: Some(sink_coordinator_stream_request::Request::Start(
757                    StartCoordinator {
758                        param: Some(param.to_proto()),
759                    },
760                )),
761            },
762            |rx| async move {
763                let rx = self.start_jvm_worker_thread(
764                    gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
765                    "runJniSinkCoordinatorThread",
766                    rx,
767                );
768                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
769            },
770        )
771        .await?;
772
773        match first_rsp {
774            SinkCoordinatorStreamResponse {
775                response: Some(sink_coordinator_stream_response::Response::Start(_)),
776            } => Ok(handle),
777            msg => Err(SinkError::Internal(anyhow!(
778                "should get start response but get {:?}",
779                msg
780            ))),
781        }
782    }
783
784    fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
785        &self,
786        class_name: &'static str,
787        method_name: &'static str,
788        mut request_rx: JniReceiverType<REQ>,
789    ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
790        let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
791            mpsc::channel(DEFAULT_BUFFER_SIZE);
792
793        let jvm = self.jvm;
794        std::thread::spawn(move || {
795            let result = execute_with_jni_env(jvm, |env| {
796                let result = call_static_method!(
797                    env,
798                    class_name,
799                    method_name,
800                    {{void}, {long requestRx, long responseTx}},
801                    &mut request_rx as *mut JniReceiverType<REQ>,
802                    &mut response_tx as *mut JniSenderType<RSP>
803                );
804
805                match result {
806                    Ok(_) => {
807                        tracing::debug!("end of jni call {}::{}", class_name, method_name);
808                    }
809                    Err(e) => {
810                        tracing::error!(error = %e.as_report(), "jni call error");
811                    }
812                };
813
814                Ok(())
815            });
816
817            if let Err(e) = result {
818                let _ = response_tx.blocking_send(Err(e));
819            }
820        });
821        response_rx
822    }
823}
824
825#[cfg(test)]
826mod test {
827    use std::time::Duration;
828
829    use risingwave_common::array::StreamChunk;
830    use risingwave_common::test_prelude::StreamChunkTestExt;
831    use risingwave_jni_core::JniSinkWriterStreamRequest;
832    use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
833    use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
834    use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
835    use tokio::sync::mpsc;
836
837    use crate::sink::SinkWriter;
838    use crate::sink::remote::CoordinatedRemoteSinkWriter;
839
840    #[tokio::test]
841    async fn test_epoch_check() {
842        let (request_sender, mut request_recv) = mpsc::channel(16);
843        let (_, resp_recv) = mpsc::channel(16);
844
845        let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
846        let chunk = StreamChunk::from_pretty(
847            " i T
848            + 1 Ripper
849        ",
850        );
851
852        // test epoch check
853        assert!(
854            tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
855                .await
856                .expect("test failed: should not commit without epoch")
857                .is_err(),
858            "test failed: no epoch check for commit()"
859        );
860        assert!(
861            request_recv.try_recv().is_err(),
862            "test failed: unchecked epoch before request"
863        );
864
865        assert!(
866            tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
867                .await
868                .expect("test failed: should not write without epoch")
869                .is_err(),
870            "test failed: no epoch check for write_batch()"
871        );
872        assert!(
873            request_recv.try_recv().is_err(),
874            "test failed: unchecked epoch before request"
875        );
876    }
877
878    #[tokio::test]
879    async fn test_remote_sink() {
880        let (request_sender, mut request_receiver) = mpsc::channel(16);
881        let (response_sender, response_receiver) = mpsc::channel(16);
882        let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
883
884        let chunk_a = StreamChunk::from_pretty(
885            " i T
886            + 1 Alice
887            + 2 Bob
888            + 3 Clare
889        ",
890        );
891        let chunk_b = StreamChunk::from_pretty(
892            " i T
893            + 4 David
894            + 5 Eve
895            + 6 Frank
896        ",
897        );
898
899        // test write batch
900        sink.begin_epoch(2022).await.unwrap();
901        assert_eq!(sink.epoch, Some(2022));
902
903        sink.write_batch(chunk_a.clone()).await.unwrap();
904        assert_eq!(sink.epoch, Some(2022));
905        assert_eq!(sink.batch_id, 1);
906        match request_receiver.recv().await.unwrap() {
907            JniSinkWriterStreamRequest::Chunk {
908                epoch,
909                batch_id,
910                chunk,
911            } => {
912                assert_eq!(epoch, 2022);
913                assert_eq!(batch_id, 0);
914                assert_eq!(chunk, chunk_a);
915            }
916            _ => panic!("test failed: failed to construct write request"),
917        }
918
919        // test commit
920        response_sender
921            .send(Ok(SinkWriterStreamResponse {
922                response: Some(Response::Commit(CommitResponse {
923                    epoch: 2022,
924                    metadata: None,
925                })),
926            }))
927            .await
928            .expect("test failed: failed to sync epoch");
929        sink.barrier(false).await.unwrap();
930        let commit_request = request_receiver.recv().await.unwrap();
931        match commit_request {
932            JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
933                request:
934                    Some(Request::Barrier(Barrier {
935                        epoch,
936                        is_checkpoint: false,
937                    })),
938            }) => {
939                assert_eq!(epoch, 2022);
940            }
941            _ => panic!("test failed: failed to construct sync request "),
942        };
943
944        // begin another epoch
945        sink.begin_epoch(2023).await.unwrap();
946        assert_eq!(sink.epoch, Some(2023));
947
948        // test another write
949        sink.write_batch(chunk_b.clone()).await.unwrap();
950        assert_eq!(sink.epoch, Some(2023));
951        assert_eq!(sink.batch_id, 2);
952        match request_receiver.recv().await.unwrap() {
953            JniSinkWriterStreamRequest::Chunk {
954                epoch,
955                batch_id,
956                chunk,
957            } => {
958                assert_eq!(epoch, 2023);
959                assert_eq!(batch_id, 1);
960                assert_eq!(chunk, chunk_b);
961            }
962            _ => panic!("test failed: failed to construct write request"),
963        }
964    }
965}