risingwave_connector/sink/
remote.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::marker::PhantomData;
17use std::num::NonZero;
18use std::ops::Deref;
19use std::pin::pin;
20use std::time::Instant;
21
22use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use await_tree::{InstrumentAwait, span};
25use futures::TryStreamExt;
26use futures::future::select;
27use phf::phf_set;
28use prost::Message;
29use risingwave_common::array::StreamChunk;
30use risingwave_common::bail;
31use risingwave_common::catalog::{ColumnDesc, ColumnId};
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::types::DataType;
35use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
36use risingwave_jni_core::{
37    JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, call_static_method, gen_class_name,
38};
39use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator;
40use risingwave_pb::connector_service::sink_writer_stream_request::{
41    Request as SinkRequest, StartSink,
42};
43use risingwave_pb::connector_service::{
44    PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
45    SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
46    ValidateSinkResponse, sink_coordinator_stream_request, sink_coordinator_stream_response,
47    sink_writer_stream_response,
48};
49use risingwave_pb::stream_plan::PbSinkSchemaChange;
50use risingwave_rpc_client::error::RpcError;
51use risingwave_rpc_client::{
52    BidiStreamReceiver, BidiStreamSender, DEFAULT_BUFFER_SIZE, SinkCoordinatorStreamHandle,
53    SinkWriterStreamHandle,
54};
55use rw_futures_util::drop_either_future;
56use thiserror_ext::AsReport;
57use tokio::sync::mpsc;
58use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
59use tokio::task::spawn_blocking;
60use tokio_stream::wrappers::ReceiverStream;
61use tracing::warn;
62
63use super::elasticsearch_opensearch::elasticsearch_converter::{
64    StreamChunkConverter, is_remote_es_sink,
65};
66use super::elasticsearch_opensearch::elasticsearch_opensearch_config::ES_OPTION_DELIMITER;
67use crate::connector_common::IcebergSinkCompactionUpdate;
68use crate::enforce_secret::EnforceSecret;
69use crate::error::ConnectorResult;
70use crate::sink::coordinate::CoordinatedLogSinker;
71use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
72use crate::sink::writer::SinkWriter;
73use crate::sink::{
74    LogSinker, Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError,
75    SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam,
76};
77
78macro_rules! def_remote_sink {
79    () => {
80        def_remote_sink! {
81            //todo!, delete java impl
82            // { ElasticSearchJava, ElasticSearchJavaSink, "elasticsearch_v1" }
83            // { OpensearchJava, OpenSearchJavaSink, "opensearch_v1"}
84            { Cassandra, CassandraSink, "cassandra", [ "cassandra.url" ] }
85            { Jdbc, JdbcSink, "jdbc", [ "jdbc.url" ] }
86        }
87    };
88    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ] }) => {
89        #[derive(Debug)]
90        pub struct $variant_name;
91        impl RemoteSinkTrait for $variant_name {
92            const SINK_NAME: &'static str = $sink_name;
93        }
94        impl EnforceSecret for $variant_name {
95            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
96                $($enforce_secret_prop),*
97            };
98        }
99        pub type $sink_type_name = RemoteSink<$variant_name>;
100    };
101    ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr, [ $($enforce_secret_prop:expr),* ], |$desc:ident| $body:expr }) => {
102        #[derive(Debug)]
103        pub struct $variant_name;
104        impl RemoteSinkTrait for $variant_name {
105            const SINK_NAME: &'static str = $sink_name;
106            fn default_sink_decouple($desc: &SinkDesc) -> bool {
107                $body
108            }
109        }
110        impl EnforceSecret for $variant_name {
111            const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
112                $($enforce_secret_prop),*
113            };
114        }
115        pub type $sink_type_name = RemoteSink<$variant_name>;
116    };
117    ({ $($first:tt)+ } $({$($rest:tt)+})*) => {
118        def_remote_sink! {
119            {$($first)+}
120        }
121        def_remote_sink! {
122            $({$($rest)+})*
123        }
124    };
125    ($($invalid:tt)*) => {
126        compile_error! {concat! {"invalid `", stringify!{$($invalid)*}, "`"}}
127    }
128}
129
130def_remote_sink!();
131
132pub trait RemoteSinkTrait: EnforceSecret + Send + Sync + 'static {
133    const SINK_NAME: &'static str;
134    fn default_sink_decouple() -> bool {
135        true
136    }
137}
138
139#[derive(Debug)]
140pub struct RemoteSink<R: RemoteSinkTrait> {
141    param: SinkParam,
142    _phantom: PhantomData<R>,
143}
144
145impl<R: RemoteSinkTrait> EnforceSecret for RemoteSink<R> {
146    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
147}
148
149impl<R: RemoteSinkTrait> TryFrom<SinkParam> for RemoteSink<R> {
150    type Error = SinkError;
151
152    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
153        Ok(Self {
154            param,
155            _phantom: PhantomData,
156        })
157    }
158}
159
160impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
161    type LogSinker = RemoteLogSinker;
162
163    const SINK_NAME: &'static str = R::SINK_NAME;
164
165    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
166        match user_specified {
167            SinkDecouple::Default => Ok(R::default_sink_decouple()),
168            SinkDecouple::Enable => Ok(true),
169            SinkDecouple::Disable => Ok(false),
170        }
171    }
172
173    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
174        RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await
175    }
176
177    async fn validate(&self) -> Result<()> {
178        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
179        Ok(())
180    }
181}
182
183async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
184    // if sink_name == OpenSearchJavaSink::SINK_NAME {
185    //     risingwave_common::license::Feature::OpenSearchSink
186    //         .check_available()
187    //         .map_err(|e| anyhow::anyhow!(e))?;
188    // }
189    if is_remote_es_sink(sink_name)
190        && param.downstream_pk_or_empty().len() > 1
191        && !param.properties.contains_key(ES_OPTION_DELIMITER)
192    {
193        bail!("Es sink only supports single pk or pk with delimiter option");
194    }
195    // FIXME: support struct and array in stream sink
196    param.columns.iter().try_for_each(|col| {
197        match &col.data_type {
198            DataType::Int16
199                    | DataType::Int32
200                    | DataType::Int64
201                    | DataType::Float32
202                    | DataType::Float64
203                    | DataType::Boolean
204                    | DataType::Decimal
205                    | DataType::Timestamp
206                    | DataType::Timestamptz
207                    | DataType::Varchar
208                    | DataType::Date
209                    | DataType::Time
210                    | DataType::Interval
211                    | DataType::Jsonb
212                    | DataType::Bytea => Ok(()),
213            DataType::List(list) => {
214                if is_remote_es_sink(sink_name) || matches!(list.elem(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
215                    Ok(())
216                } else{
217                    Err(SinkError::Remote(anyhow!(
218                        "Remote sink only supports list<int16, int32, int64, float, double, varchar>, got {:?}: {:?}",
219                        col.name,
220                        col.data_type,
221                    )))
222                }
223            },
224            DataType::Struct(_) => {
225                if is_remote_es_sink(sink_name){
226                    Ok(())
227                }else{
228                    Err(SinkError::Remote(anyhow!(
229                        "Only Es sink supports struct, got {:?}: {:?}",
230                        col.name,
231                        col.data_type,
232                    )))
233                }
234            },
235            DataType::Vector(_) |
236            DataType::Serial | DataType::Int256 | DataType::Map(_) => Err(SinkError::Remote(anyhow!(
237                            "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
238                            col.name,
239                            col.data_type,
240                        )))}})?;
241
242    let jvm = Jvm::get_or_init()?;
243    let sink_param = param.to_proto();
244
245    spawn_blocking(move || -> anyhow::Result<()> {
246        execute_with_jni_env(jvm, |env| {
247            let validate_sink_request = ValidateSinkRequest {
248                sink_param: Some(sink_param),
249            };
250            let validate_sink_request_bytes =
251                env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;
252
253            let validate_sink_response_bytes = call_static_method!(
254                env,
255                {com.risingwave.connector.JniSinkValidationHandler},
256                {byte[] validate(byte[] validateSourceRequestBytes)},
257                &validate_sink_request_bytes
258            )?;
259
260            let validate_sink_response: ValidateSinkResponse = Message::decode(
261                risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, env)?.deref(),
262            )?;
263
264            validate_sink_response.error.map_or_else(
265                || Ok(()), // If there is no error message, return Ok here.
266                |err| bail!("sink cannot pass validation: {}", err.error_message),
267            )
268        })
269    })
270    .await
271    .context("JoinHandle returns error")??;
272
273    Ok(())
274}
275
276pub struct RemoteLogSinker {
277    request_sender: BidiStreamSender<JniSinkWriterStreamRequest>,
278    response_stream: BidiStreamReceiver<SinkWriterStreamResponse>,
279    stream_chunk_converter: StreamChunkConverter,
280    sink_writer_metrics: SinkWriterMetrics,
281}
282
283impl RemoteLogSinker {
284    async fn new(
285        sink_param: SinkParam,
286        writer_param: SinkWriterParam,
287        sink_name: &str,
288    ) -> Result<Self> {
289        let sink_proto = sink_param.to_proto();
290        let payload_schema = if is_remote_es_sink(sink_name) {
291            let columns = vec![
292                ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
293                ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
294                ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(),
295                ColumnDesc::unnamed(ColumnId::from(2), DataType::Varchar).to_protobuf(),
296            ];
297            Some(TableSchema {
298                columns,
299                pk_indices: vec![],
300            })
301        } else {
302            sink_proto.table_schema.clone()
303        };
304
305        let SinkWriterStreamHandle {
306            request_sender,
307            response_stream,
308        } = EmbeddedConnectorClient::new()?
309            .start_sink_writer_stream(payload_schema, sink_proto)
310            .await?;
311
312        Ok(RemoteLogSinker {
313            request_sender,
314            response_stream,
315            sink_writer_metrics: SinkWriterMetrics::new(&writer_param),
316            stream_chunk_converter: StreamChunkConverter::new(
317                sink_name,
318                sink_param.schema(),
319                &sink_param.downstream_pk_or_empty(),
320                &sink_param.properties,
321                sink_param.sink_type.is_append_only(),
322            )?,
323        })
324    }
325}
326
327#[async_trait]
328impl LogSinker for RemoteLogSinker {
329    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
330        log_reader.start_from(None).await?;
331        let mut request_tx = self.request_sender;
332        let mut response_err_stream_rx = self.response_stream;
333        let sink_writer_metrics = self.sink_writer_metrics;
334
335        let (response_tx, mut response_rx) = unbounded_channel();
336
337        let poll_response_stream = async move {
338            loop {
339                let result = response_err_stream_rx
340                    .stream
341                    .try_next()
342                    .instrument_await("log_sinker_wait_next_response")
343                    .await;
344                match result {
345                    Ok(Some(response)) => {
346                        response_tx.send(response).map_err(|err| {
347                            SinkError::Remote(anyhow!("unable to send response: {:?}", err.0))
348                        })?;
349                    }
350                    Ok(None) => return Err(SinkError::Remote(anyhow!("end of response stream"))),
351                    Err(e) => return Err(SinkError::Remote(anyhow!(e))),
352                }
353            }
354        };
355
356        let poll_consume_log_and_sink = async move {
357            fn truncate_matched_offset(
358                queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
359                persisted_offset: TruncateOffset,
360                log_reader: &mut impl SinkLogReader,
361                sink_writer_metrics: &SinkWriterMetrics,
362            ) -> Result<()> {
363                while let Some((sent_offset, _)) = queue.front()
364                    && sent_offset < &persisted_offset
365                {
366                    queue.pop_front();
367                }
368
369                let (sent_offset, start_time) = queue.pop_front().ok_or_else(|| {
370                    anyhow!("get unsent offset {:?} in response", persisted_offset)
371                })?;
372                if sent_offset != persisted_offset {
373                    bail!(
374                        "new response offset {:?} does not match the buffer offset {:?}",
375                        persisted_offset,
376                        sent_offset
377                    );
378                }
379
380                if let (TruncateOffset::Barrier { .. }, Some(start_time)) =
381                    (persisted_offset, start_time)
382                {
383                    sink_writer_metrics
384                        .sink_commit_duration
385                        .observe(start_time.elapsed().as_millis() as f64);
386                }
387
388                log_reader.truncate(persisted_offset)?;
389                Ok(())
390            }
391
392            let mut prev_offset: Option<TruncateOffset> = None;
393            // Push from back and pop from front
394            let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
395                VecDeque::new();
396
397            loop {
398                let either_result: futures::future::Either<
399                    Option<SinkWriterStreamResponse>,
400                    LogStoreResult<(u64, LogStoreReadItem)>,
401                > = drop_either_future(
402                    select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
403                );
404                match either_result {
405                    futures::future::Either::Left(opt) => {
406                        let response = opt.context("end of response stream")?;
407                        match response {
408                            SinkWriterStreamResponse {
409                                response:
410                                    Some(sink_writer_stream_response::Response::Batch(
411                                        sink_writer_stream_response::BatchWrittenResponse {
412                                            epoch,
413                                            batch_id,
414                                        },
415                                    )),
416                            } => {
417                                truncate_matched_offset(
418                                    &mut sent_offset_queue,
419                                    TruncateOffset::Chunk {
420                                        epoch,
421                                        chunk_id: batch_id as _,
422                                    },
423                                    &mut log_reader,
424                                    &sink_writer_metrics,
425                                )?;
426                            }
427                            SinkWriterStreamResponse {
428                                response:
429                                    Some(sink_writer_stream_response::Response::Commit(
430                                        sink_writer_stream_response::CommitResponse {
431                                            epoch,
432                                            metadata,
433                                        },
434                                    )),
435                            } => {
436                                if let Some(metadata) = metadata {
437                                    warn!("get unexpected non-empty metadata: {:?}", metadata);
438                                }
439                                truncate_matched_offset(
440                                    &mut sent_offset_queue,
441                                    TruncateOffset::Barrier { epoch },
442                                    &mut log_reader,
443                                    &sink_writer_metrics,
444                                )?;
445                            }
446                            response => {
447                                return Err(SinkError::Remote(anyhow!(
448                                    "get unexpected response: {:?}",
449                                    response
450                                )));
451                            }
452                        }
453                    }
454                    futures::future::Either::Right(result) => {
455                        let (epoch, item): (u64, LogStoreReadItem) = result?;
456
457                        match item {
458                            LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
459                                let offset = TruncateOffset::Chunk { epoch, chunk_id };
460                                if let Some(prev_offset) = &prev_offset {
461                                    prev_offset.check_next_offset(offset)?;
462                                }
463                                let cardinality = chunk.cardinality();
464                                sink_writer_metrics
465                                    .connector_sink_rows_received
466                                    .inc_by(cardinality as _);
467
468                                let chunk = self.stream_chunk_converter.convert_chunk(chunk)?;
469                                request_tx
470                                    .send_request(JniSinkWriterStreamRequest::Chunk {
471                                        epoch,
472                                        batch_id: chunk_id as u64,
473                                        chunk,
474                                    })
475                                    .instrument_await(span!(
476                                        "log_sinker_send_chunk (chunk {chunk_id})"
477                                    ))
478                                    .await?;
479                                prev_offset = Some(offset);
480                                sent_offset_queue
481                                    .push_back((TruncateOffset::Chunk { epoch, chunk_id }, None));
482                            }
483                            LogStoreReadItem::Barrier { is_checkpoint, .. } => {
484                                let offset = TruncateOffset::Barrier { epoch };
485                                if let Some(prev_offset) = &prev_offset {
486                                    prev_offset.check_next_offset(offset)?;
487                                }
488                                let start_time = if is_checkpoint {
489                                    let start_time = Instant::now();
490                                    request_tx
491                                        .barrier(epoch, true)
492                                        .instrument_await(span!(
493                                            "log_sinker_commit_checkpoint (epoch {epoch})"
494                                        ))
495                                        .await?;
496                                    Some(start_time)
497                                } else {
498                                    request_tx
499                                        .barrier(epoch, false)
500                                        .instrument_await(span!(
501                                            "log_sinker_send_barrier (epoch {epoch})"
502                                        ))
503                                        .await?;
504                                    None
505                                };
506                                prev_offset = Some(offset);
507                                sent_offset_queue
508                                    .push_back((TruncateOffset::Barrier { epoch }, start_time));
509                            }
510                        }
511                    }
512                }
513            }
514        };
515
516        select(pin!(poll_response_stream), pin!(poll_consume_log_and_sink))
517            .await
518            .factor_first()
519            .0
520    }
521}
522
523#[derive(Debug)]
524pub struct CoordinatedRemoteSink<R: RemoteSinkTrait> {
525    param: SinkParam,
526    _phantom: PhantomData<R>,
527}
528
529impl<R: RemoteSinkTrait> EnforceSecret for CoordinatedRemoteSink<R> {
530    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = R::ENFORCE_SECRET_PROPERTIES;
531}
532
533impl<R: RemoteSinkTrait> TryFrom<SinkParam> for CoordinatedRemoteSink<R> {
534    type Error = SinkError;
535
536    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
537        Ok(Self {
538            param,
539            _phantom: PhantomData,
540        })
541    }
542}
543
544impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
545    type LogSinker = CoordinatedLogSinker<CoordinatedRemoteSinkWriter>;
546
547    const SINK_NAME: &'static str = R::SINK_NAME;
548
549    async fn validate(&self) -> Result<()> {
550        validate_remote_sink(&self.param, Self::SINK_NAME).await?;
551        Ok(())
552    }
553
554    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
555        let metrics = SinkWriterMetrics::new(&writer_param);
556        CoordinatedLogSinker::new(
557            &writer_param,
558            self.param.clone(),
559            CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?,
560            NonZero::new(1).unwrap(),
561        )
562        .await
563    }
564
565    fn is_coordinated_sink(&self) -> bool {
566        true
567    }
568
569    async fn new_coordinator(
570        &self,
571        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
572    ) -> Result<SinkCommitCoordinator> {
573        let coordinator = RemoteCoordinator::new::<R>(self.param.clone()).await?;
574        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
575    }
576}
577
578pub struct CoordinatedRemoteSinkWriter {
579    epoch: Option<u64>,
580    batch_id: u64,
581    stream_handle: SinkWriterStreamHandle<JniSinkWriterStreamRequest>,
582    metrics: SinkWriterMetrics,
583}
584
585impl CoordinatedRemoteSinkWriter {
586    pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result<Self> {
587        let sink_proto = param.to_proto();
588        let stream_handle = EmbeddedConnectorClient::new()?
589            .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
590            .await?;
591
592        Ok(Self {
593            epoch: None,
594            batch_id: 0,
595            stream_handle,
596            metrics,
597        })
598    }
599
600    #[cfg(test)]
601    fn for_test(
602        response_receiver: Receiver<ConnectorResult<SinkWriterStreamResponse>>,
603        request_sender: tokio::sync::mpsc::Sender<JniSinkWriterStreamRequest>,
604    ) -> CoordinatedRemoteSinkWriter {
605        use futures::StreamExt;
606
607        let stream_handle = SinkWriterStreamHandle::for_test(
608            request_sender,
609            ReceiverStream::new(response_receiver)
610                .map_err(RpcError::from)
611                .boxed(),
612        );
613
614        CoordinatedRemoteSinkWriter {
615            epoch: None,
616            batch_id: 0,
617            stream_handle,
618            metrics: SinkWriterMetrics::for_test(),
619        }
620    }
621}
622
623#[async_trait]
624impl SinkWriter for CoordinatedRemoteSinkWriter {
625    type CommitMetadata = Option<SinkMetadata>;
626
627    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
628        let cardinality = chunk.cardinality();
629        self.metrics
630            .connector_sink_rows_received
631            .inc_by(cardinality as _);
632
633        let epoch = self.epoch.ok_or_else(|| {
634            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
635        })?;
636        let batch_id = self.batch_id;
637        self.stream_handle
638            .request_sender
639            .send_request(JniSinkWriterStreamRequest::Chunk {
640                chunk,
641                epoch,
642                batch_id,
643            })
644            .await?;
645        self.batch_id += 1;
646        Ok(())
647    }
648
649    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
650        self.epoch = Some(epoch);
651        Ok(())
652    }
653
654    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
655        let epoch = self.epoch.ok_or_else(|| {
656            SinkError::Remote(anyhow!("epoch has not been initialize, call `begin_epoch`"))
657        })?;
658        if is_checkpoint {
659            // TODO: add metrics to measure commit time
660            let rsp = self.stream_handle.commit(epoch).await?;
661            rsp.metadata
662                .ok_or_else(|| {
663                    SinkError::Remote(anyhow!(
664                        "get none metadata in commit response for coordinated sink writer"
665                    ))
666                })
667                .map(Some)
668        } else {
669            self.stream_handle.barrier(epoch).await?;
670            Ok(None)
671        }
672    }
673}
674
675pub struct RemoteCoordinator {
676    stream_handle: SinkCoordinatorStreamHandle,
677}
678
679impl RemoteCoordinator {
680    pub async fn new<R: RemoteSinkTrait>(param: SinkParam) -> Result<Self> {
681        let stream_handle = EmbeddedConnectorClient::new()?
682            .start_sink_coordinator_stream(param.clone())
683            .await?;
684
685        tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,);
686
687        Ok(RemoteCoordinator { stream_handle })
688    }
689}
690
691#[async_trait]
692impl SinglePhaseCommitCoordinator for RemoteCoordinator {
693    async fn init(&mut self) -> Result<()> {
694        Ok(())
695    }
696
697    async fn commit(
698        &mut self,
699        epoch: u64,
700        metadata: Vec<SinkMetadata>,
701        schema_change: Option<PbSinkSchemaChange>,
702    ) -> Result<()> {
703        if let Some(schema_change) = schema_change {
704            return Err(anyhow!(
705                "remote coordinator not support schema change, but got: {:?}",
706                schema_change
707            )
708            .into());
709        }
710        Ok(self.stream_handle.commit(epoch, metadata).await?)
711    }
712}
713
714struct EmbeddedConnectorClient {
715    jvm: Jvm,
716}
717
718impl EmbeddedConnectorClient {
719    fn new() -> Result<Self> {
720        let jvm = Jvm::get_or_init().context("failed to create EmbeddedConnectorClient")?;
721        Ok(EmbeddedConnectorClient { jvm })
722    }
723
724    async fn start_sink_writer_stream(
725        &self,
726        payload_schema: Option<TableSchema>,
727        sink_proto: PbSinkParam,
728    ) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
729        let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
730            SinkWriterStreamRequest {
731                request: Some(SinkRequest::Start(StartSink {
732                    sink_param: Some(sink_proto),
733                    payload_schema,
734                })),
735            },
736            |rx| async move {
737                let rx = self.start_jvm_worker_thread(
738                    gen_class_name!(com.risingwave.connector.JniSinkWriterHandler),
739                    "runJniSinkWriterThread",
740                    rx,
741                );
742                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
743            },
744        )
745        .await?;
746
747        match first_rsp {
748            SinkWriterStreamResponse {
749                response: Some(sink_writer_stream_response::Response::Start(_)),
750            } => Ok(handle),
751            msg => Err(SinkError::Internal(anyhow!(
752                "should get start response but get {:?}",
753                msg
754            ))),
755        }
756    }
757
758    async fn start_sink_coordinator_stream(
759        &self,
760        param: SinkParam,
761    ) -> Result<SinkCoordinatorStreamHandle> {
762        let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
763            SinkCoordinatorStreamRequest {
764                request: Some(sink_coordinator_stream_request::Request::Start(
765                    StartCoordinator {
766                        param: Some(param.to_proto()),
767                    },
768                )),
769            },
770            |rx| async move {
771                let rx = self.start_jvm_worker_thread(
772                    gen_class_name!(com.risingwave.connector.JniSinkCoordinatorHandler),
773                    "runJniSinkCoordinatorThread",
774                    rx,
775                );
776                Ok(ReceiverStream::new(rx).map_err(RpcError::from))
777            },
778        )
779        .await?;
780
781        match first_rsp {
782            SinkCoordinatorStreamResponse {
783                response: Some(sink_coordinator_stream_response::Response::Start(_)),
784            } => Ok(handle),
785            msg => Err(SinkError::Internal(anyhow!(
786                "should get start response but get {:?}",
787                msg
788            ))),
789        }
790    }
791
792    fn start_jvm_worker_thread<REQ: Send + 'static, RSP: Send + 'static>(
793        &self,
794        class_name: &'static str,
795        method_name: &'static str,
796        mut request_rx: JniReceiverType<REQ>,
797    ) -> Receiver<std::result::Result<RSP, anyhow::Error>> {
798        let (mut response_tx, response_rx): (JniSenderType<RSP>, _) =
799            mpsc::channel(DEFAULT_BUFFER_SIZE);
800
801        let jvm = self.jvm;
802        std::thread::spawn(move || {
803            let result = execute_with_jni_env(jvm, |env| {
804                let result = call_static_method!(
805                    env,
806                    class_name,
807                    method_name,
808                    {{void}, {long requestRx, long responseTx}},
809                    &mut request_rx as *mut JniReceiverType<REQ>,
810                    &mut response_tx as *mut JniSenderType<RSP>
811                );
812
813                match result {
814                    Ok(_) => {
815                        tracing::debug!("end of jni call {}::{}", class_name, method_name);
816                    }
817                    Err(e) => {
818                        tracing::error!(error = %e.as_report(), "jni call error");
819                    }
820                };
821
822                Ok(())
823            });
824
825            if let Err(e) = result {
826                let _ = response_tx.blocking_send(Err(e));
827            }
828        });
829        response_rx
830    }
831}
832
833#[cfg(test)]
834mod test {
835    use std::time::Duration;
836
837    use risingwave_common::array::StreamChunk;
838    use risingwave_common::test_prelude::StreamChunkTestExt;
839    use risingwave_jni_core::JniSinkWriterStreamRequest;
840    use risingwave_pb::connector_service::sink_writer_stream_request::{Barrier, Request};
841    use risingwave_pb::connector_service::sink_writer_stream_response::{CommitResponse, Response};
842    use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
843    use tokio::sync::mpsc;
844
845    use crate::sink::remote::CoordinatedRemoteSinkWriter;
846    use crate::sink::writer::SinkWriter;
847
848    #[tokio::test]
849    async fn test_epoch_check() {
850        let (request_sender, mut request_recv) = mpsc::channel(16);
851        let (_, resp_recv) = mpsc::channel(16);
852
853        let mut sink = CoordinatedRemoteSinkWriter::for_test(resp_recv, request_sender);
854        let chunk = StreamChunk::from_pretty(
855            " i T
856            + 1 Ripper
857        ",
858        );
859
860        // test epoch check
861        assert!(
862            tokio::time::timeout(Duration::from_secs(10), sink.barrier(true))
863                .await
864                .expect("test failed: should not commit without epoch")
865                .is_err(),
866            "test failed: no epoch check for commit()"
867        );
868        assert!(
869            request_recv.try_recv().is_err(),
870            "test failed: unchecked epoch before request"
871        );
872
873        assert!(
874            tokio::time::timeout(Duration::from_secs(1), sink.write_batch(chunk))
875                .await
876                .expect("test failed: should not write without epoch")
877                .is_err(),
878            "test failed: no epoch check for write_batch()"
879        );
880        assert!(
881            request_recv.try_recv().is_err(),
882            "test failed: unchecked epoch before request"
883        );
884    }
885
886    #[tokio::test]
887    async fn test_remote_sink() {
888        let (request_sender, mut request_receiver) = mpsc::channel(16);
889        let (response_sender, response_receiver) = mpsc::channel(16);
890        let mut sink = CoordinatedRemoteSinkWriter::for_test(response_receiver, request_sender);
891
892        let chunk_a = StreamChunk::from_pretty(
893            " i T
894            + 1 Alice
895            + 2 Bob
896            + 3 Clare
897        ",
898        );
899        let chunk_b = StreamChunk::from_pretty(
900            " i T
901            + 4 David
902            + 5 Eve
903            + 6 Frank
904        ",
905        );
906
907        // test write batch
908        sink.begin_epoch(2022).await.unwrap();
909        assert_eq!(sink.epoch, Some(2022));
910
911        sink.write_batch(chunk_a.clone()).await.unwrap();
912        assert_eq!(sink.epoch, Some(2022));
913        assert_eq!(sink.batch_id, 1);
914        match request_receiver.recv().await.unwrap() {
915            JniSinkWriterStreamRequest::Chunk {
916                epoch,
917                batch_id,
918                chunk,
919            } => {
920                assert_eq!(epoch, 2022);
921                assert_eq!(batch_id, 0);
922                assert_eq!(chunk, chunk_a);
923            }
924            _ => panic!("test failed: failed to construct write request"),
925        }
926
927        // test commit
928        response_sender
929            .send(Ok(SinkWriterStreamResponse {
930                response: Some(Response::Commit(CommitResponse {
931                    epoch: 2022,
932                    metadata: None,
933                })),
934            }))
935            .await
936            .expect("test failed: failed to sync epoch");
937        sink.barrier(false).await.unwrap();
938        let commit_request = request_receiver.recv().await.unwrap();
939        match commit_request {
940            JniSinkWriterStreamRequest::PbRequest(SinkWriterStreamRequest {
941                request:
942                    Some(Request::Barrier(Barrier {
943                        epoch,
944                        is_checkpoint: false,
945                    })),
946            }) => {
947                assert_eq!(epoch, 2022);
948            }
949            _ => panic!("test failed: failed to construct sync request "),
950        };
951
952        // begin another epoch
953        sink.begin_epoch(2023).await.unwrap();
954        assert_eq!(sink.epoch, Some(2023));
955
956        // test another write
957        sink.write_batch(chunk_b.clone()).await.unwrap();
958        assert_eq!(sink.epoch, Some(2023));
959        assert_eq!(sink.batch_id, 2);
960        match request_receiver.recv().await.unwrap() {
961            JniSinkWriterStreamRequest::Chunk {
962                epoch,
963                batch_id,
964                chunk,
965            } => {
966                assert_eq!(epoch, 2023);
967                assert_eq!(batch_id, 1);
968                assert_eq!(chunk, chunk_b);
969            }
970            _ => panic!("test failed: failed to construct write request"),
971        }
972    }
973}