risingwave_connector/sink/
remote.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use jni::JavaVM;
28use phf::phf_set;
29use prost::Message;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::bail;
32use risingwave_common::catalog::{ColumnDesc, ColumnId};
33use risingwave_common::global_jvm::JVM;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::types::DataType;
36use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
37use risingwave_jni_core::{
38    JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
39};
40use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
41use risingwave_pb::connector_service::sink_writer_stream_request::{
42    Request as SinkRequest, StartSink,
43};
44use risingwave_pb::connector_service::{
45    PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
46    SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
47    ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
48    sink_writer_stream_response,
49};
50use risingwave_rpc_client::error::RpcError;
51use risingwave_rpc_client::{
52    BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
53    SinkWriterStreamHandle,
54};
55use rw_futures_util::drop_either_future;
56use sea_orm::DatabaseConnection;
57use thiserror_ext::AsReport;
58use tokio::sync::mpsc;
59use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
60use tokio::task::spawn_blocking;
61use tokio_stream::wrappers::ReceiverStream;
62use tracing::warn;
63
64use super::SinkCommittedEpochSubscriber;
65use super::elasticsearch_opensearch::elasticsearch_converter::{
66    StreamChunkConverter, is_remote_es_sink,
67};
68use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
69use crate::connector_common::IcebergSinkCompactionUpdate;
70use crate::enforce_secret::EnforceSecret;
71use crate::error::ConnectorResult;
72use crate::sink::coordinate::CoordinatedLogSinker;
73use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
74use crate::sink::writer::SinkWriter;
75use crate::sink::{
76    DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
77    SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
78};
79
80macro_rules! def_remote_sink {
81    () => {
82        def_remote_sink! {
83            //todo!, delete java impl
84            // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" }
85            // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"}
86            { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
87            { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
88        }
89    };
90    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
91        #[derive(Debug)]
92        pub struct $variant_name;
93        impl RemoteSinkTrait for $variant_name {
94            const SINK_NAME: &'static str = $sink_name;
95        }
96        impl EnforceSecret for $variant_name {
97            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
98                $($enforce_secret_prop),*
99            };
100        }
101        pub type $sink_type_name = RemoteSink<$variant_name>;
102    };
103    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
104        #[derive(Debug)]
105        pub struct $variant_name;
106        impl RemoteSinkTrait for $variant_name {
107            const SINK_NAME: &'static str = $sink_name;
108            fn default_sink_decouple($desc: &SinkDesc) -> bool {
109                $body
110            }
111        }
112        impl EnforceSecret for $variant_name {
113            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
114                $($enforce_secret_prop),*
115            };
116        }
117        pub type $sink_type_name = RemoteSink<$variant_name>;
118    };
119    ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
120        def_remote_sink! {
121            {$($first)+}
122        }
123        def_remote_sink! {
124            $({$($rest)+})*
125        }
126    };
127    ($($invalid:tt)*) => {
128        compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
129    }
130}
131
132def_remote_sink!();
133
134pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
135    const SINK_NAME: &'static str;
136    fn default_sink_decouple() -> bool {
137        true
138    }
139}
140
141#[derive(Debug)]
142pub struct RemoteSink<R: RemoteSinkTrait> {
143    param: SinkParam,
144    _phantom: PhantomData<R>,
145}
146
147impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
148    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
149}
150
151impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
152    type Error = SinkError;
153
154    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
155        Ok(Self {
156            param,
157            _phantom: PhantomData,
158        })
159    }
160}
161
162impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
163    type Coordinator = DummySinkCommitCoordinator;
164    type LogSinker = RemoteLogSinker;
165
166    const SINK_NAME: &'static str = R::SINK_NAME;
167
168    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
169        match user_specified {
170            SinkDecouple::Default => Ok(R::default_sink_decouple()),
171            SinkDecouple::Enable => Ok(true),
172            SinkDecouple::Disable => Ok(false),
173        }
174    }
175
176    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
177        RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
178    }
179
180    async fn validate(&self) -> Result<()> {
181        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
182        Ok(())
183    }
184}
185
186async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
187    // if sink_name == OpenSearchJavaSink::SINK_NAME {
188    //     risingwave_common::license::Feature::OpenSearchSink
189    //         .check_available()
190    //         .map_err(|e| anyhow::anyhow!(e))?;
191    // }
192    if is_remote_es_sink(sink_name)
193        && param.downstream_pk.len() > 1
194        && !param.properties.contains_key(ES_OPTION_DELIMITER)
195    {
196        bail!("Es sink only supports single pk or pk with delimiter option");
197    }
198    // FIXME: support struct and array in stream sink
199    param.columns.iter().try_for_each(|col| {
200        match &col.data_type {
201            DataType::Int16
202                    | DataType::Int32
203                    | DataType::Int64
204                    | DataType::Float32
205                    | DataType::Float64
206                    | DataType::Boolean
207                    | DataType::Decimal
208                    | DataType::Timestamp
209                    | DataType::Timestamptz
210                    | DataType::Varchar
211                    | DataType::Date
212                    | DataType::Time
213                    | DataType::Interval
214                    | DataType::Jsonb
215                    | DataType::Bytea => Ok(()),
216            DataType::List(list) => {
217                if is_remote_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
218                    Ok(())
219                } else{
220                    Err(SinkError::Remote(anyhow!(
221                        "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
222                        col.name,
223                        col.data_type,
224                    )))
225                }
226            },
227            DataType::Struct(_) => {
228                if is_remote_es_sink(sink_name){
229                    Ok(())
230                }else{
231                    Err(SinkError::Remote(anyhow!(
232                        "Only Es sink supports struct, got {:?}: {:?}",
233                        col.name,
234                        col.data_type,
235                    )))
236                }
237            },
238            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
239            DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
240                            "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
241                            col.name,
242                            col.data_type,
243                        )))}})?;
244
245    let jvm = JVM.get_or_init()?;
246    let sink_param = param.to_proto();
247
248    spawn_blocking(move || -> anyhow::Result<()> {
249        execute_with_jni_env(jvm, |env| {
250            let validate_sink_request = ValidateSinkRequest {
251                sink_param: Some(sink_param),
252            };
253            let validate_sink_request_bytes =
254                env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
255
256            let validate_sink_response_bytes = call_static_method!(
257                env,
258                {com.risingwave.connector.JniSinkValidationHandler},
259                {byte[] validate(byte[] validateSourceRequestBytes)},
260                &validate_sink_request_bytes
261            )?;
262
263            let validate_sink_response: ValidateSinkResponse = Message::decode(
264                risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
265            )?;
266
267            validate_sink_response.error.map_or_else(
268                || Ok(()), // If there is no error message, return Ok here.
269                |err| bail!("sink cannot pass validation: {}", err.error_message),
270            )
271        })
272    })
273    .await
274    .context("JoinHandle returns error")??;
275
276    Ok(())
277}
278
279pub struct RemoteLogSinker {
280    request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
281    response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
282    stream_chunk_converter: StreamChunkConverter,
283    sink_writer_metrics: SinkWriterMetrics,
284}
285
286impl RemoteLogSinker {
287    async fn new(
288        sink_param: SinkParam,
289        writer_param: SinkWriterParam,
290        sink_name: &str,
291    ) -> Result<Self> {
292        let sink_proto = sink_param.to_proto();
293        let payload_schema = if is_remote_es_sink(sink_name) {
294            let columns = vec![
295                ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
296                ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
297                ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
298                ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
299            ];
300            Some(TableSchema {
301                columns,
302                pk_indices: vec![],
303            })
304        } else {
305            sink_proto.table_schema.clone()
306        };
307
308        let SinkWriterStreamHandle {
309            request_sender,
310            response_stream,
311        } = EmbeddedConnectorClient::new()?
312            .start_sink_writer_stream(payload_schema, sink_proto)
313            .await?;
314
315        Ok(RemoteLogSinker {
316            request_sender,
317            response_stream,
318            sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
319            stream_chunk_converter: StreamChunkConverter::new(
320                sink_name,
321                sink_param.schema(),
322                &sink_param.downstream_pk,
323                &sink_param.properties,
324                sink_param.sink_type.is_append_only(),
325            )?,
326        })
327    }
328}
329
330#[async_trait]
331impl LogSinker for RemoteLogSinker {
332    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
333        log_reader.start_from(None).await?;
334        let mut request_tx = self.request_sender;
335        let mut response_err_stream_rx = self.response_stream;
336        let sink_writer_metrics = self.sink_writer_metrics;
337
338        let (response_tx, mut response_rx) = unbounded_channel();
339
340        let poll_response_stream = async move {
341            loop {
342                let result = response_err_stream_rx
343                    .stream
344                    .try_next()
345                    .instrument_await("log_sinker_wait_next_response")
346                    .await;
347                match result {
348                    Ok(Some(response)) => {
349                        response_tx.send(response).map_err(|err| {
350                            SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
351                        })?;
352                    }
353                    Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
354                    Err(e) => return Err(SinkError::Remote(anyhow!(e))),
355                }
356            }
357        };
358
359        let poll_consume_log_and_sink = async move {
360            fn truncate_matched_offset(
361                queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
362                persisted_offset: TruncateOffset,
363                log_reader: &mut impl SinkLogReader,
364                sink_writer_metrics: &SinkWriterMetrics,
365            ) -> Result<()> {
366                while let Some((sent_offset, _)) = queue.front()
367                    && sent_offset < &persisted_offset
368                {
369                    queue.pop_front();
370                }
371
372                let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
373                    anyhow!("get unsent offset {:?} in response", persisted_offset)
374                })?;
375                if sent_offset != persisted_offset {
376                    bail!(
377                        "new response offset {:?} does not match the buffer offset {:?}",
378                        persisted_offset,
379                        sent_offset
380                    );
381                }
382
383                if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
384                    (persisted_offset, start_time)
385                {
386                    sink_writer_metrics
387                        .sink_commit_duration
388                        .observe(start_time.elapsed().as_millis() as f64);
389                }
390
391                log_reader.truncate(persisted_offset)?;
392                Ok(())
393            }
394
395            let mut prev_offset: Option<TruncateOffset> = None;
396            // Push from back and pop from front
397            let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
398                VecDeque::new();
399
400            loop {
401                let either_result: futures::future::Either<
402                    Option<SinkWriterStreamResponse>,
403                    LogStoreResult<(u64, LogStoreReadItem)>,
404                > = drop_either_future(
405                    select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
406                );
407                match either_result {
408                    futures::future::Either::Left(opt) => {
409                        let response = opt.context("end of response stream")?;
410                        match response {
411                            SinkWriterStreamResponse {
412                                response:
413                                    Some(sink_writer_stream_response::Response::Batch(
414                                        sink_writer_stream_response::BatchWrittenResponse {
415                                            epoch,
416                                            batch_id,
417                                        },
418                                    )),
419                            } => {
420                                truncate_matched_offset(
421                                    &mut sent_offset_queue,
422                                    TruncateOffset::Chunk {
423                                        epoch,
424                                        chunk_id: batch_id as _,
425                                    },
426                                    &mut log_reader,
427                                    &sink_writer_metrics,
428                                )?;
429                            }
430                            SinkWriterStreamResponse {
431                                response:
432                                    Some(sink_writer_stream_response::Response::Commit(
433                                        sink_writer_stream_response::CommitResponse {
434                                            epoch,
435                                            metadata,
436                                        },
437                                    )),
438                            } => {
439                                if let Some(metadata) = metadata {
440                                    warn!("get unexpected non-empty metadata: {:?}", metadata);
441                                }
442                                truncate_matched_offset(
443                                    &mut sent_offset_queue,
444                                    TruncateOffset::Barrier { epoch },
445                                    &mut log_reader,
446                                    &sink_writer_metrics,
447                                )?;
448                            }
449                            response => {
450                                return Err(SinkError::Remote(anyhow!(
451                                    "get unexpected response: {:?}",
452                                    response
453                                )));
454                            }
455                        }
456                    }
457                    futures::future::Either::Right(result) => {
458                        let (epoch, item): (u64, LogStoreReadItem) = result?;
459
460                        match item {
461                            LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
462                                let offset = TruncateOffset::Chunk { epoch, chunk_id };
463                                if let Some(prev_offset) = &prev_offset {
464                                    prev_offset.check_next_offset(offset)?;
465                                }
466                                let cardinality = chunk.cardinality();
467                                sink_writer_metrics
468                                    .connector_sink_rows_received
469                                    .inc_by(cardinality as _);
470
471                                let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
472                                request_tx
473                                    .send_request(JniSinkWriterStreamRequest::Chunk {
474                                        epoch,
475                                        batch_id: chunk_id as u64,
476                                        chunk,
477                                    })
478                                    .instrument_await(span!(
479                                        "log_sinker_send_chunk (chunk {chunk_id})"
480                                    ))
481                                    .await?;
482                                prev_offset = Some(offset);
483                                sent_offset_queue
484                                    .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
485                            }
486                            LogStoreReadItem::Barrier { is_checkpoint, .. } => {
487                                let offset = TruncateOffset::Barrier { epoch };
488                                if let Some(prev_offset) = &prev_offset {
489                                    prev_offset.check_next_offset(offset)?;
490                                }
491                                let start_time = if is_checkpoint {
492                                    let start_time = Instant::now();
493                                    request_tx
494                                        .barrier(epoch, true)
495                                        .instrument_await(span!(
496                                            "log_sinker_commit_checkpoint (epoch {epoch})"
497                                        ))
498                                        .await?;
499                                    Some(start_time)
500                                } else {
501                                    request_tx
502                                        .barrier(epoch, false)
503                                        .instrument_await(span!(
504                                            "log_sinker_send_barrier (epoch {epoch})"
505                                        ))
506                                        .await?;
507                                    None
508                                };
509                                prev_offset = Some(offset);
510                                sent_offset_queue
511                                    .push_back((TruncateOffset::Barrier { epoch }, start_time));
512                            }
513                        }
514                    }
515                }
516            }
517        };
518
519        select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
520            .await
521            .factor_first()
522            .0
523    }
524}
525
526#[derive(Debug)]
527pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
528    param: SinkParam,
529    _phantom: PhantomData<R>,
530}
531
532impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
533    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
534}
535
536impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
537    type Error = SinkError;
538
539    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
540        Ok(Self {
541            param,
542            _phantom: PhantomData,
543        })
544    }
545}
546
547impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
548    type Coordinator = RemoteCoordinator;
549    type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
550
551    const SINK_NAME: &'static str = R::SINK_NAME;
552
553    async fn validate(&self) -> Result<()> {
554        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
555        Ok(())
556    }
557
558    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
559        let metrics = SinkWriterMetrics::new(&writer_param);
560        CoordinatedLogSinker::new(
561            &writer_param,
562            self.param.clone(),
563            CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
564            NonZero::new(1).unwrap(),
565        )
566        .await
567    }
568
569    fn is_coordinated_sink(&self) -> bool {
570        true
571    }
572
573    async fn new_coordinator(
574        &self,
575        _db: DatabaseConnection,
576        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
577    ) -> Result<Self::Coordinator> {
578        RemoteCoordinator::new::<R>(self.param.clone()).await
579    }
580}
581
582pub struct CoordinatedRemoteSinkWriter {
583    epoch: Option<u64>,
584    batch_id: u64,
585    stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
586    metrics: SinkWriterMetrics,
587}
588
589impl CoordinatedRemoteSinkWriter {
590    pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
591        let sink_proto = param.to_proto();
592        let stream_handle = EmbeddedConnectorClient::new()?
593            .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
594            .await?;
595
596        Ok(Self {
597            epoch: None,
598            batch_id: 0,
599            stream_handle,
600            metrics,
601        })
602    }
603
604    #[cfg(test)]
605    fn for_test(
606        response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
607        request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
608    ) -> CoordinatedRemoteSinkWriter {
609        use futures::StreamExt;
610
611        let stream_handle = SinkWriterStreamHandle::for_test(
612            request_sender,
613            ReceiverStream::new(response_receiver)
614                .map_err(RpcError::from)
615                .boxed(),
616        );
617
618        CoordinatedRemoteSinkWriter {
619            epoch: None,
620            batch_id: 0,
621            stream_handle,
622            metrics: SinkWriterMetrics::for_test(),
623        }
624    }
625}
626
627#[async_trait]
628impl SinkWriter for CoordinatedRemoteSinkWriter {
629    type CommitMetadata = Option<SinkMetadata>;
630
631    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
632        let cardinality = chunk.cardinality();
633        self.metrics
634            .connector_sink_rows_received
635            .inc_by(cardinality as _);
636
637        let epoch = self.epoch.ok_or_else(|| {
638            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
639        })?;
640        let batch_id = self.batch_id;
641        self.stream_handle
642            .request_sender
643            .send_request(JniSinkWriterStreamRequest::Chunk {
644                chunk,
645                epoch,
646                batch_id,
647            })
648            .await?;
649        self.batch_id += 1;
650        Ok(())
651    }
652
653    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
654        self.epoch = Some(epoch);
655        Ok(())
656    }
657
658    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
659        let epoch = self.epoch.ok_or_else(|| {
660            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
661        })?;
662        if is_checkpoint {
663            // TODO: add metrics to measure commit time
664            let rsp = self.stream_handle.commit(epoch).await?;
665            rsp.metadata
666                .ok_or_else(|| {
667                    SinkError::Remote(anyhow!(
668                        "get none metadata in commit response for coordinated sink writer"
669                    ))
670                })
671                .map(Some)
672        } else {
673            self.stream_handle.barrier(epoch).await?;
674            Ok(None)
675        }
676    }
677}
678
679pub struct RemoteCoordinator {
680    stream_handle: SinkCoordinatorStreamHandle,
681}
682
683impl RemoteCoordinator {
684    pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
685        let stream_handle = EmbeddedConnectorClient::new()?
686            .start_sink_coordinator_stream(param.clone())
687            .await?;
688
689        tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
690
691        Ok(RemoteCoordinator { stream_handle })
692    }
693}
694
695#[async_trait]
696impl SinkCommitCoordinator for RemoteCoordinator {
697    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
698        Ok(None)
699    }
700
701    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
702        Ok(self.stream_handle.commit(epoch, metadata).await?)
703    }
704}
705
706struct EmbeddedConnectorClient {
707    jvm: &'static JavaVM,
708}
709
710impl EmbeddedConnectorClient {
711    fn new() -> Result<Self> {
712        let jvm = JVM
713            .get_or_init()
714            .context("failed to create EmbeddedConnectorClient")?;
715        Ok(EmbeddedConnectorClient { jvm })
716    }
717
718    async fn start_sink_writer_stream(
719        &self,
720        payload_schema: Option<TableSchema>,
721        sink_proto: PbSinkParam,
722    ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
723        let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
724            SinkWriterStreamRequest {
725                request: Some(SinkRequest::Start(StartSink {
726                    sink_param: Some(sink_proto),
727                    payload_schema,
728                })),
729            },
730            |rx| async move {
731                let rx = self.start_jvm_worker_thread(
732                    gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
733                    "runJniSinkWriterThread",
734                    rx,
735                );
736                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
737            },
738        )
739        .await?;
740
741        match first_rsp {
742            SinkWriterStreamResponse {
743                response: Some(sink_writer_stream_response::Response::Start(_)),
744            } => Ok(handle),
745            msg => Err(SinkError::Internal(anyhow!(
746                "should get start response but get {:?}",
747                msg
748            ))),
749        }
750    }
751
752    async fn start_sink_coordinator_stream(
753        &self,
754        param: SinkParam,
755    ) -> Result<SinkCoordinatorStreamHandle> {
756        let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
757            SinkCoordinatorStreamRequest {
758                request: Some(sink_coordinator_stream_request::Request::Start(
759                    StartCoordinator {
760                        param: Some(param.to_proto()),
761                    },
762                )),
763            },
764            |rx| async move {
765                let rx = self.start_jvm_worker_thread(
766                    gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
767                    "runJniSinkCoordinatorThread",
768                    rx,
769                );
770                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
771            },
772        )
773        .await?;
774
775        match first_rsp {
776            SinkCoordinatorStreamResponse {
777                response: Some(sink_coordinator_stream_response::Response::Start(_)),
778            } => Ok(handle),
779            msg => Err(SinkError::Internal(anyhow!(
780                "should get start response but get {:?}",
781                msg
782            ))),
783        }
784    }
785
786    fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
787        &self,
788        class_name: &'static str,
789        method_name: &'static str,
790        mut request_rx: JniReceiverType<REQ>,
791    ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
792        let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
793            mpsc::channel(DEFAULT_BUFFER_SIZE);
794
795        let jvm = self.jvm;
796        std::thread::spawn(move || {
797            let result = execute_with_jni_env(jvm, |env| {
798                let result = call_static_method!(
799                    env,
800                    class_name,
801                    method_name,
802                    {{void}, {long requestRx, long responseTx}},
803                    &mut request_rx as *mut JniReceiverType<REQ>,
804                    &mut response_tx as *mut JniSenderType<RSP>
805                );
806
807                match result {
808                    Ok(_) => {
809                        tracing::debug!("end of jni call {}::{}", class_name, method_name);
810                    }
811                    Err(e) => {
812                        tracing::error!(error = %e.as_report(), "jni call error");
813                    }
814                };
815
816                Ok(())
817            });
818
819            if let Err(e) = result {
820                let _ = response_tx.blocking_send(Err(e));
821            }
822        });
823        response_rx
824    }
825}
826
827#[cfg(test)]
828mod test {
829    use std::time::Duration;
830
831    use risingwave_common::array::StreamChunk;
832    use risingwave_common::test_prelude::StreamChunkTestExt;
833    use risingwave_jni_core::JniSinkWriterStreamRequest;
834    use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
835    use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
836    use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
837    use tokio::sync::mpsc;
838
839    use crate::sink::SinkWriter;
840    use crate::sink::remote::CoordinatedRemoteSinkWriter;
841
842    #[tokio::test]
843    async fn test_epoch_check() {
844        let (request_sender, mut request_recv) = mpsc::channel(16);
845        let (_, resp_recv) = mpsc::channel(16);
846
847        let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
848        let chunk = StreamChunk::from_pretty(
849            " i T
850            + 1 Ripper
851        ",
852        );
853
854        // test epoch check
855        assert!(
856            tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
857                .await
858                .expect("test failed: should not commit without epoch")
859                .is_err(),
860            "test failed: no epoch check for commit()"
861        );
862        assert!(
863            request_recv.try_recv().is_err(),
864            "test failed: unchecked epoch before request"
865        );
866
867        assert!(
868            tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
869                .await
870                .expect("test failed: should not write without epoch")
871                .is_err(),
872            "test failed: no epoch check for write_batch()"
873        );
874        assert!(
875            request_recv.try_recv().is_err(),
876            "test failed: unchecked epoch before request"
877        );
878    }
879
880    #[tokio::test]
881    async fn test_remote_sink() {
882        let (request_sender, mut request_receiver) = mpsc::channel(16);
883        let (response_sender, response_receiver) = mpsc::channel(16);
884        let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
885
886        let chunk_a = StreamChunk::from_pretty(
887            " i T
888            + 1 Alice
889            + 2 Bob
890            + 3 Clare
891        ",
892        );
893        let chunk_b = StreamChunk::from_pretty(
894            " i T
895            + 4 David
896            + 5 Eve
897            + 6 Frank
898        ",
899        );
900
901        // test write batch
902        sink.begin_epoch(2022).await.unwrap();
903        assert_eq!(sink.epoch, Some(2022));
904
905        sink.write_batch(chunk_a.clone()).await.unwrap();
906        assert_eq!(sink.epoch, Some(2022));
907        assert_eq!(sink.batch_id, 1);
908        match request_receiver.recv().await.unwrap() {
909            JniSinkWriterStreamRequest::Chunk {
910                epoch,
911                batch_id,
912                chunk,
913            } => {
914                assert_eq!(epoch, 2022);
915                assert_eq!(batch_id, 0);
916                assert_eq!(chunk, chunk_a);
917            }
918            _ => panic!("test failed: failed to construct write request"),
919        }
920
921        // test commit
922        response_sender
923            .send(Ok(SinkWriterStreamResponse {
924                response: Some(Response::Commit(CommitResponse {
925                    epoch: 2022,
926                    metadata: None,
927                })),
928            }))
929            .await
930            .expect("test failed: failed to sync epoch");
931        sink.barrier(false).await.unwrap();
932        let commit_request = request_receiver.recv().await.unwrap();
933        match commit_request {
934            JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
935                request:
936                    Some(Request::Barrier(Barrier {
937                        epoch,
938                        is_checkpoint: false,
939                    })),
940            }) => {
941                assert_eq!(epoch, 2022);
942            }
943            _ => panic!("test failed: failed to construct sync request "),
944        };
945
946        // begin another epoch
947        sink.begin_epoch(2023).await.unwrap();
948        assert_eq!(sink.epoch, Some(2023));
949
950        // test another write
951        sink.write_batch(chunk_b.clone()).await.unwrap();
952        assert_eq!(sink.epoch, Some(2023));
953        assert_eq!(sink.batch_id, 2);
954        match request_receiver.recv().await.unwrap() {
955            JniSinkWriterStreamRequest::Chunk {
956                epoch,
957                batch_id,
958                chunk,
959            } => {
960                assert_eq!(epoch, 2023);
961                assert_eq!(batch_id, 1);
962                assert_eq!(chunk, chunk_b);
963            }
964            _ => panic!("test failed: failed to construct write request"),
965        }
966    }
967}