risingwave_connector/sink/
remote.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use phf::phf_set;
28use prost::Message;
29use risingwave_common::array::StreamChunk;
30use risingwave_common::bail;
31use risingwave_common::catalog::{ColumnDesc, ColumnId, Field};
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
36use risingwave_jni_core::{
37    JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41    Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44    PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45    SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46    ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47    sink_writer_stream_response,
48};
49use risingwave_rpc_client::error::RpcError;
50use risingwave_rpc_client::{
51    BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
52    SinkWriterStreamHandle,
53};
54use rw_futures_util::drop_either_future;
55use thiserror_ext::AsReport;
56use tokio::sync::mpsc;
57use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
58use tokio::task::spawn_blocking;
59use tokio_stream::wrappers::ReceiverStream;
60use tracing::warn;
61
62use super::elasticsearch_opensearch::elasticsearch_converter::{
63    StreamChunkConverter, is_remote_es_sink,
64};
65use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
66use crate::connector_common::IcebergSinkCompactionUpdate;
67use crate::enforce_secret::EnforceSecret;
68use crate::error::ConnectorResult;
69use crate::sink::coordinate::CoordinatedLogSinker;
70use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
71use crate::sink::writer::SinkWriter;
72use crate::sink::{
73    LogSinker, Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError,
74    SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
75};
76
77macro_rules! def_remote_sink {
78    () => {
79        def_remote_sink! {
80            //todo!, delete java impl
81            // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" }
82            // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"}
83            { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
84            { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
85        }
86    };
87    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
88        #[derive(Debug)]
89        pub struct $variant_name;
90        impl RemoteSinkTrait for $variant_name {
91            const SINK_NAME: &'static str = $sink_name;
92        }
93        impl EnforceSecret for $variant_name {
94            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
95                $($enforce_secret_prop),*
96            };
97        }
98        pub type $sink_type_name = RemoteSink<$variant_name>;
99    };
100    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
101        #[derive(Debug)]
102        pub struct $variant_name;
103        impl RemoteSinkTrait for $variant_name {
104            const SINK_NAME: &'static str = $sink_name;
105            fn default_sink_decouple($desc: &SinkDesc) -> bool {
106                $body
107            }
108        }
109        impl EnforceSecret for $variant_name {
110            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
111                $($enforce_secret_prop),*
112            };
113        }
114        pub type $sink_type_name = RemoteSink<$variant_name>;
115    };
116    ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
117        def_remote_sink! {
118            {$($first)+}
119        }
120        def_remote_sink! {
121            $({$($rest)+})*
122        }
123    };
124    ($($invalid:tt)*) => {
125        compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
126    }
127}
128
129def_remote_sink!();
130
131pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
132    const SINK_NAME: &'static str;
133    fn default_sink_decouple() -> bool {
134        true
135    }
136}
137
138#[derive(Debug)]
139pub struct RemoteSink<R: RemoteSinkTrait> {
140    param: SinkParam,
141    _phantom: PhantomData<R>,
142}
143
144impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
145    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
146}
147
148impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
149    type Error = SinkError;
150
151    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
152        Ok(Self {
153            param,
154            _phantom: PhantomData,
155        })
156    }
157}
158
159impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
160    type LogSinker = RemoteLogSinker;
161
162    const SINK_NAME: &'static str = R::SINK_NAME;
163
164    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
165        match user_specified {
166            SinkDecouple::Default => Ok(R::default_sink_decouple()),
167            SinkDecouple::Enable => Ok(true),
168            SinkDecouple::Disable => Ok(false),
169        }
170    }
171
172    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
173        RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
174    }
175
176    async fn validate(&self) -> Result<()> {
177        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
178        Ok(())
179    }
180}
181
182async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
183    // if sink_name == OpenSearchJavaSink::SINK_NAME {
184    //     risingwave_common::license::Feature::OpenSearchSink
185    //         .check_available()
186    //         .map_err(|e| anyhow::anyhow!(e))?;
187    // }
188    if is_remote_es_sink(sink_name)
189        && param.downstream_pk_or_empty().len() > 1
190        && !param.properties.contains_key(ES_OPTION_DELIMITER)
191    {
192        bail!("Es sink only supports single pk or pk with delimiter option");
193    }
194    // FIXME: support struct and array in stream sink
195    param.columns.iter().try_for_each(|col| {
196        match &col.data_type {
197            DataType::Int16
198                    | DataType::Int32
199                    | DataType::Int64
200                    | DataType::Float32
201                    | DataType::Float64
202                    | DataType::Boolean
203                    | DataType::Decimal
204                    | DataType::Timestamp
205                    | DataType::Timestamptz
206                    | DataType::Varchar
207                    | DataType::Date
208                    | DataType::Time
209                    | DataType::Interval
210                    | DataType::Jsonb
211                    | DataType::Bytea => Ok(()),
212            DataType::List(list) => {
213                if is_remote_es_sink(sink_name) || matches!(list.elem(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
214                    Ok(())
215                } else{
216                    Err(SinkError::Remote(anyhow!(
217                        "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
218                        col.name,
219                        col.data_type,
220                    )))
221                }
222            },
223            DataType::Struct(_) => {
224                if is_remote_es_sink(sink_name){
225                    Ok(())
226                }else{
227                    Err(SinkError::Remote(anyhow!(
228                        "Only Es sink supports struct, got {:?}: {:?}",
229                        col.name,
230                        col.data_type,
231                    )))
232                }
233            },
234            DataType::Vector(_) |
235            DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
236                            "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
237                            col.name,
238                            col.data_type,
239                        )))}})?;
240
241    let jvm = Jvm::get_or_init()?;
242    let sink_param = param.to_proto();
243
244    spawn_blocking(move || -> anyhow::Result<()> {
245        execute_with_jni_env(jvm, |env| {
246            let validate_sink_request = ValidateSinkRequest {
247                sink_param: Some(sink_param),
248            };
249            let validate_sink_request_bytes =
250                env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
251
252            let validate_sink_response_bytes = call_static_method!(
253                env,
254                {com.risingwave.connector.JniSinkValidationHandler},
255                {byte[] validate(byte[] validateSourceRequestBytes)},
256                &validate_sink_request_bytes
257            )?;
258
259            let validate_sink_response: ValidateSinkResponse = Message::decode(
260                risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
261            )?;
262
263            validate_sink_response.error.map_or_else(
264                || Ok(()), // If there is no error message, return Ok here.
265                |err| bail!("sink cannot pass validation: {}", err.error_message),
266            )
267        })
268    })
269    .await
270    .context("JoinHandle returns error")??;
271
272    Ok(())
273}
274
275pub struct RemoteLogSinker {
276    request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
277    response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
278    stream_chunk_converter: StreamChunkConverter,
279    sink_writer_metrics: SinkWriterMetrics,
280}
281
282impl RemoteLogSinker {
283    async fn new(
284        sink_param: SinkParam,
285        writer_param: SinkWriterParam,
286        sink_name: &str,
287    ) -> Result<Self> {
288        let sink_proto = sink_param.to_proto();
289        let payload_schema = if is_remote_es_sink(sink_name) {
290            let columns = vec![
291                ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
292                ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
293                ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
294                ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
295            ];
296            Some(TableSchema {
297                columns,
298                pk_indices: vec![],
299            })
300        } else {
301            sink_proto.table_schema.clone()
302        };
303
304        let SinkWriterStreamHandle {
305            request_sender,
306            response_stream,
307        } = EmbeddedConnectorClient::new()?
308            .start_sink_writer_stream(payload_schema, sink_proto)
309            .await?;
310
311        Ok(RemoteLogSinker {
312            request_sender,
313            response_stream,
314            sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
315            stream_chunk_converter: StreamChunkConverter::new(
316                sink_name,
317                sink_param.schema(),
318                &sink_param.downstream_pk_or_empty(),
319                &sink_param.properties,
320                sink_param.sink_type.is_append_only(),
321            )?,
322        })
323    }
324}
325
326#[async_trait]
327impl LogSinker for RemoteLogSinker {
328    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
329        log_reader.start_from(None).await?;
330        let mut request_tx = self.request_sender;
331        let mut response_err_stream_rx = self.response_stream;
332        let sink_writer_metrics = self.sink_writer_metrics;
333
334        let (response_tx, mut response_rx) = unbounded_channel();
335
336        let poll_response_stream = async move {
337            loop {
338                let result = response_err_stream_rx
339                    .stream
340                    .try_next()
341                    .instrument_await("log_sinker_wait_next_response")
342                    .await;
343                match result {
344                    Ok(Some(response)) => {
345                        response_tx.send(response).map_err(|err| {
346                            SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
347                        })?;
348                    }
349                    Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
350                    Err(e) => return Err(SinkError::Remote(anyhow!(e))),
351                }
352            }
353        };
354
355        let poll_consume_log_and_sink = async move {
356            fn truncate_matched_offset(
357                queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
358                persisted_offset: TruncateOffset,
359                log_reader: &mut impl SinkLogReader,
360                sink_writer_metrics: &SinkWriterMetrics,
361            ) -> Result<()> {
362                while let Some((sent_offset, _)) = queue.front()
363                    && sent_offset < &persisted_offset
364                {
365                    queue.pop_front();
366                }
367
368                let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
369                    anyhow!("get unsent offset {:?} in response", persisted_offset)
370                })?;
371                if sent_offset != persisted_offset {
372                    bail!(
373                        "new response offset {:?} does not match the buffer offset {:?}",
374                        persisted_offset,
375                        sent_offset
376                    );
377                }
378
379                if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
380                    (persisted_offset, start_time)
381                {
382                    sink_writer_metrics
383                        .sink_commit_duration
384                        .observe(start_time.elapsed().as_millis() as f64);
385                }
386
387                log_reader.truncate(persisted_offset)?;
388                Ok(())
389            }
390
391            let mut prev_offset: Option<TruncateOffset> = None;
392            // Push from back and pop from front
393            let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
394                VecDeque::new();
395
396            loop {
397                let either_result: futures::future::Either<
398                    Option<SinkWriterStreamResponse>,
399                    LogStoreResult<(u64, LogStoreReadItem)>,
400                > = drop_either_future(
401                    select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
402                );
403                match either_result {
404                    futures::future::Either::Left(opt) => {
405                        let response = opt.context("end of response stream")?;
406                        match response {
407                            SinkWriterStreamResponse {
408                                response:
409                                    Some(sink_writer_stream_response::Response::Batch(
410                                        sink_writer_stream_response::BatchWrittenResponse {
411                                            epoch,
412                                            batch_id,
413                                        },
414                                    )),
415                            } => {
416                                truncate_matched_offset(
417                                    &mut sent_offset_queue,
418                                    TruncateOffset::Chunk {
419                                        epoch,
420                                        chunk_id: batch_id as _,
421                                    },
422                                    &mut log_reader,
423                                    &sink_writer_metrics,
424                                )?;
425                            }
426                            SinkWriterStreamResponse {
427                                response:
428                                    Some(sink_writer_stream_response::Response::Commit(
429                                        sink_writer_stream_response::CommitResponse {
430                                            epoch,
431                                            metadata,
432                                        },
433                                    )),
434                            } => {
435                                if let Some(metadata) = metadata {
436                                    warn!("get unexpected non-empty metadata: {:?}", metadata);
437                                }
438                                truncate_matched_offset(
439                                    &mut sent_offset_queue,
440                                    TruncateOffset::Barrier { epoch },
441                                    &mut log_reader,
442                                    &sink_writer_metrics,
443                                )?;
444                            }
445                            response => {
446                                return Err(SinkError::Remote(anyhow!(
447                                    "get unexpected response: {:?}",
448                                    response
449                                )));
450                            }
451                        }
452                    }
453                    futures::future::Either::Right(result) => {
454                        let (epoch, item): (u64, LogStoreReadItem) = result?;
455
456                        match item {
457                            LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
458                                let offset = TruncateOffset::Chunk { epoch, chunk_id };
459                                if let Some(prev_offset) = &prev_offset {
460                                    prev_offset.check_next_offset(offset)?;
461                                }
462                                let cardinality = chunk.cardinality();
463                                sink_writer_metrics
464                                    .connector_sink_rows_received
465                                    .inc_by(cardinality as _);
466
467                                let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
468                                request_tx
469                                    .send_request(JniSinkWriterStreamRequest::Chunk {
470                                        epoch,
471                                        batch_id: chunk_id as u64,
472                                        chunk,
473                                    })
474                                    .instrument_await(span!(
475                                        "log_sinker_send_chunk (chunk {chunk_id})"
476                                    ))
477                                    .await?;
478                                prev_offset = Some(offset);
479                                sent_offset_queue
480                                    .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
481                            }
482                            LogStoreReadItem::Barrier { is_checkpoint, .. } => {
483                                let offset = TruncateOffset::Barrier { epoch };
484                                if let Some(prev_offset) = &prev_offset {
485                                    prev_offset.check_next_offset(offset)?;
486                                }
487                                let start_time = if is_checkpoint {
488                                    let start_time = Instant::now();
489                                    request_tx
490                                        .barrier(epoch, true)
491                                        .instrument_await(span!(
492                                            "log_sinker_commit_checkpoint (epoch {epoch})"
493                                        ))
494                                        .await?;
495                                    Some(start_time)
496                                } else {
497                                    request_tx
498                                        .barrier(epoch, false)
499                                        .instrument_await(span!(
500                                            "log_sinker_send_barrier (epoch {epoch})"
501                                        ))
502                                        .await?;
503                                    None
504                                };
505                                prev_offset = Some(offset);
506                                sent_offset_queue
507                                    .push_back((TruncateOffset::Barrier { epoch }, start_time));
508                            }
509                        }
510                    }
511                }
512            }
513        };
514
515        select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
516            .await
517            .factor_first()
518            .0
519    }
520}
521
522#[derive(Debug)]
523pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
524    param: SinkParam,
525    _phantom: PhantomData<R>,
526}
527
528impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
529    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
530}
531
532impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
533    type Error = SinkError;
534
535    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
536        Ok(Self {
537            param,
538            _phantom: PhantomData,
539        })
540    }
541}
542
543impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
544    type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
545
546    const SINK_NAME: &'static str = R::SINK_NAME;
547
548    async fn validate(&self) -> Result<()> {
549        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
550        Ok(())
551    }
552
553    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
554        let metrics = SinkWriterMetrics::new(&writer_param);
555        CoordinatedLogSinker::new(
556            &writer_param,
557            self.param.clone(),
558            CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
559            NonZero::new(1).unwrap(),
560        )
561        .await
562    }
563
564    fn is_coordinated_sink(&self) -> bool {
565        true
566    }
567
568    async fn new_coordinator(
569        &self,
570        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
571    ) -> Result<SinkCommitCoordinator> {
572        let coordinator = RemoteCoordinator::new::<R>(self.param.clone()).await?;
573        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
574    }
575}
576
577pub struct CoordinatedRemoteSinkWriter {
578    epoch: Option<u64>,
579    batch_id: u64,
580    stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
581    metrics: SinkWriterMetrics,
582}
583
584impl CoordinatedRemoteSinkWriter {
585    pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
586        let sink_proto = param.to_proto();
587        let stream_handle = EmbeddedConnectorClient::new()?
588            .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
589            .await?;
590
591        Ok(Self {
592            epoch: None,
593            batch_id: 0,
594            stream_handle,
595            metrics,
596        })
597    }
598
599    #[cfg(test)]
600    fn for_test(
601        response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
602        request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
603    ) -> CoordinatedRemoteSinkWriter {
604        use futures::StreamExt;
605
606        let stream_handle = SinkWriterStreamHandle::for_test(
607            request_sender,
608            ReceiverStream::new(response_receiver)
609                .map_err(RpcError::from)
610                .boxed(),
611        );
612
613        CoordinatedRemoteSinkWriter {
614            epoch: None,
615            batch_id: 0,
616            stream_handle,
617            metrics: SinkWriterMetrics::for_test(),
618        }
619    }
620}
621
622#[async_trait]
623impl SinkWriter for CoordinatedRemoteSinkWriter {
624    type CommitMetadata = Option<SinkMetadata>;
625
626    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
627        let cardinality = chunk.cardinality();
628        self.metrics
629            .connector_sink_rows_received
630            .inc_by(cardinality as _);
631
632        let epoch = self.epoch.ok_or_else(|| {
633            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
634        })?;
635        let batch_id = self.batch_id;
636        self.stream_handle
637            .request_sender
638            .send_request(JniSinkWriterStreamRequest::Chunk {
639                chunk,
640                epoch,
641                batch_id,
642            })
643            .await?;
644        self.batch_id += 1;
645        Ok(())
646    }
647
648    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
649        self.epoch = Some(epoch);
650        Ok(())
651    }
652
653    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
654        let epoch = self.epoch.ok_or_else(|| {
655            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
656        })?;
657        if is_checkpoint {
658            // TODO: add metrics to measure commit time
659            let rsp = self.stream_handle.commit(epoch).await?;
660            rsp.metadata
661                .ok_or_else(|| {
662                    SinkError::Remote(anyhow!(
663                        "get none metadata in commit response for coordinated sink writer"
664                    ))
665                })
666                .map(Some)
667        } else {
668            self.stream_handle.barrier(epoch).await?;
669            Ok(None)
670        }
671    }
672}
673
674pub struct RemoteCoordinator {
675    stream_handle: SinkCoordinatorStreamHandle,
676}
677
678impl RemoteCoordinator {
679    pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
680        let stream_handle = EmbeddedConnectorClient::new()?
681            .start_sink_coordinator_stream(param.clone())
682            .await?;
683
684        tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
685
686        Ok(RemoteCoordinator { stream_handle })
687    }
688}
689
690#[async_trait]
691impl SinglePhaseCommitCoordinator for RemoteCoordinator {
692    async fn init(&mut self) -> Result<()> {
693        Ok(())
694    }
695
696    async fn commit(
697        &mut self,
698        epoch: u64,
699        metadata: Vec<SinkMetadata>,
700        add_columns: Option<Vec<Field>>,
701    ) -> Result<()> {
702        if let Some(add_columns) = add_columns {
703            return Err(anyhow!(
704                "remote coordinator not support add columns, but got: {:?}",
705                add_columns
706            )
707            .into());
708        }
709        Ok(self.stream_handle.commit(epoch, metadata).await?)
710    }
711}
712
713struct EmbeddedConnectorClient {
714    jvm: Jvm,
715}
716
717impl EmbeddedConnectorClient {
718    fn new() -> Result<Self> {
719        let jvm = Jvm::get_or_init().context("failed to create EmbeddedConnectorClient")?;
720        Ok(EmbeddedConnectorClient { jvm })
721    }
722
723    async fn start_sink_writer_stream(
724        &self,
725        payload_schema: Option<TableSchema>,
726        sink_proto: PbSinkParam,
727    ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
728        let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
729            SinkWriterStreamRequest {
730                request: Some(SinkRequest::Start(StartSink {
731                    sink_param: Some(sink_proto),
732                    payload_schema,
733                })),
734            },
735            |rx| async move {
736                let rx = self.start_jvm_worker_thread(
737                    gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
738                    "runJniSinkWriterThread",
739                    rx,
740                );
741                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
742            },
743        )
744        .await?;
745
746        match first_rsp {
747            SinkWriterStreamResponse {
748                response: Some(sink_writer_stream_response::Response::Start(_)),
749            } => Ok(handle),
750            msg => Err(SinkError::Internal(anyhow!(
751                "should get start response but get {:?}",
752                msg
753            ))),
754        }
755    }
756
757    async fn start_sink_coordinator_stream(
758        &self,
759        param: SinkParam,
760    ) -> Result<SinkCoordinatorStreamHandle> {
761        let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
762            SinkCoordinatorStreamRequest {
763                request: Some(sink_coordinator_stream_request::Request::Start(
764                    StartCoordinator {
765                        param: Some(param.to_proto()),
766                    },
767                )),
768            },
769            |rx| async move {
770                let rx = self.start_jvm_worker_thread(
771                    gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
772                    "runJniSinkCoordinatorThread",
773                    rx,
774                );
775                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
776            },
777        )
778        .await?;
779
780        match first_rsp {
781            SinkCoordinatorStreamResponse {
782                response: Some(sink_coordinator_stream_response::Response::Start(_)),
783            } => Ok(handle),
784            msg => Err(SinkError::Internal(anyhow!(
785                "should get start response but get {:?}",
786                msg
787            ))),
788        }
789    }
790
791    fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
792        &self,
793        class_name: &'static str,
794        method_name: &'static str,
795        mut request_rx: JniReceiverType<REQ>,
796    ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
797        let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
798            mpsc::channel(DEFAULT_BUFFER_SIZE);
799
800        let jvm = self.jvm;
801        std::thread::spawn(move || {
802            let result = execute_with_jni_env(jvm, |env| {
803                let result = call_static_method!(
804                    env,
805                    class_name,
806                    method_name,
807                    {{void}, {long requestRx, long responseTx}},
808                    &mut request_rx as *mut JniReceiverType<REQ>,
809                    &mut response_tx as *mut JniSenderType<RSP>
810                );
811
812                match result {
813                    Ok(_) => {
814                        tracing::debug!("end of jni call {}::{}", class_name, method_name);
815                    }
816                    Err(e) => {
817                        tracing::error!(error = %e.as_report(), "jni call error");
818                    }
819                };
820
821                Ok(())
822            });
823
824            if let Err(e) = result {
825                let _ = response_tx.blocking_send(Err(e));
826            }
827        });
828        response_rx
829    }
830}
831
832#[cfg(test)]
833mod test {
834    use std::time::Duration;
835
836    use risingwave_common::array::StreamChunk;
837    use risingwave_common::test_prelude::StreamChunkTestExt;
838    use risingwave_jni_core::JniSinkWriterStreamRequest;
839    use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
840    use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
841    use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
842    use tokio::sync::mpsc;
843
844    use crate::sink::remote::CoordinatedRemoteSinkWriter;
845    use crate::sink::writer::SinkWriter;
846
847    #[tokio::test]
848    async fn test_epoch_check() {
849        let (request_sender, mut request_recv) = mpsc::channel(16);
850        let (_, resp_recv) = mpsc::channel(16);
851
852        let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
853        let chunk = StreamChunk::from_pretty(
854            " i T
855            + 1 Ripper
856        ",
857        );
858
859        // test epoch check
860        assert!(
861            tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
862                .await
863                .expect("test failed: should not commit without epoch")
864                .is_err(),
865            "test failed: no epoch check for commit()"
866        );
867        assert!(
868            request_recv.try_recv().is_err(),
869            "test failed: unchecked epoch before request"
870        );
871
872        assert!(
873            tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
874                .await
875                .expect("test failed: should not write without epoch")
876                .is_err(),
877            "test failed: no epoch check for write_batch()"
878        );
879        assert!(
880            request_recv.try_recv().is_err(),
881            "test failed: unchecked epoch before request"
882        );
883    }
884
885    #[tokio::test]
886    async fn test_remote_sink() {
887        let (request_sender, mut request_receiver) = mpsc::channel(16);
888        let (response_sender, response_receiver) = mpsc::channel(16);
889        let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
890
891        let chunk_a = StreamChunk::from_pretty(
892            " i T
893            + 1 Alice
894            + 2 Bob
895            + 3 Clare
896        ",
897        );
898        let chunk_b = StreamChunk::from_pretty(
899            " i T
900            + 4 David
901            + 5 Eve
902            + 6 Frank
903        ",
904        );
905
906        // test write batch
907        sink.begin_epoch(2022).await.unwrap();
908        assert_eq!(sink.epoch, Some(2022));
909
910        sink.write_batch(chunk_a.clone()).await.unwrap();
911        assert_eq!(sink.epoch, Some(2022));
912        assert_eq!(sink.batch_id, 1);
913        match request_receiver.recv().await.unwrap() {
914            JniSinkWriterStreamRequest::Chunk {
915                epoch,
916                batch_id,
917                chunk,
918            } => {
919                assert_eq!(epoch, 2022);
920                assert_eq!(batch_id, 0);
921                assert_eq!(chunk, chunk_a);
922            }
923            _ => panic!("test failed: failed to construct write request"),
924        }
925
926        // test commit
927        response_sender
928            .send(Ok(SinkWriterStreamResponse {
929                response: Some(Response::Commit(CommitResponse {
930                    epoch: 2022,
931                    metadata: None,
932                })),
933            }))
934            .await
935            .expect("test failed: failed to sync epoch");
936        sink.barrier(false).await.unwrap();
937        let commit_request = request_receiver.recv().await.unwrap();
938        match commit_request {
939            JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
940                request:
941                    Some(Request::Barrier(Barrier {
942                        epoch,
943                        is_checkpoint: false,
944                    })),
945            }) => {
946                assert_eq!(epoch, 2022);
947            }
948            _ => panic!("test failed: failed to construct sync request "),
949        };
950
951        // begin another epoch
952        sink.begin_epoch(2023).await.unwrap();
953        assert_eq!(sink.epoch, Some(2023));
954
955        // test another write
956        sink.write_batch(chunk_b.clone()).await.unwrap();
957        assert_eq!(sink.epoch, Some(2023));
958        assert_eq!(sink.batch_id, 2);
959        match request_receiver.recv().await.unwrap() {
960            JniSinkWriterStreamRequest::Chunk {
961                epoch,
962                batch_id,
963                chunk,
964            } => {
965                assert_eq!(epoch, 2023);
966                assert_eq!(batch_id, 1);
967                assert_eq!(chunk, chunk_b);
968            }
969            _ => panic!("test failed: failed to construct write request"),
970        }
971    }
972}