risingwave_rpc_client/
connector_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata;
17use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
18use risingwave_pb::connector_service::sink_writer_stream_request::{
19    Barrier, Request as SinkRequest, WriteBatch,
20};
21use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
22use risingwave_pb::connector_service::*;
23
24use crate::error::{Result, RpcError};
25use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};
26
27pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
28pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;
29
30pub type SinkWriterStreamHandle<REQ = SinkWriterStreamRequest> =
31    BidiStreamHandle<REQ, SinkWriterStreamResponse>;
32
33impl<REQ: From<SinkWriterStreamRequest>> SinkWriterRequestSender<REQ> {
34    pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
35        self.send_request(SinkWriterStreamRequest {
36            request: Some(SinkRequest::WriteBatch(WriteBatch {
37                epoch,
38                batch_id,
39                payload: Some(payload),
40            })),
41        })
42        .await
43    }
44
45    pub async fn barrier(&mut self, epoch: u64, is_checkpoint: bool) -> Result<()> {
46        self.send_request(SinkWriterStreamRequest {
47            request: Some(SinkRequest::Barrier(Barrier {
48                epoch,
49                is_checkpoint,
50            })),
51        })
52        .await
53    }
54}
55
56impl SinkWriterResponseReceiver {
57    pub async fn next_commit_response(&mut self) -> Result<CommitResponse> {
58        match self.next_response().await? {
59            SinkWriterStreamResponse {
60                response: Some(sink_writer_stream_response::Response::Commit(rsp)),
61            } => Ok(rsp),
62            msg => Err(RpcError::Internal(anyhow!(
63                "should get Sync response but get {:?}",
64                msg
65            ))),
66        }
67    }
68}
69
70impl<REQ: From<SinkWriterStreamRequest>> SinkWriterStreamHandle<REQ> {
71    pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
72        self.request_sender
73            .write_batch(epoch, batch_id, payload)
74            .await
75    }
76
77    pub async fn barrier(&mut self, epoch: u64) -> Result<()> {
78        self.request_sender.barrier(epoch, false).await
79    }
80
81    pub async fn commit(&mut self, epoch: u64) -> Result<CommitResponse> {
82        self.request_sender.barrier(epoch, true).await?;
83        self.response_stream.next_commit_response().await
84    }
85}
86
87pub type SinkCoordinatorStreamHandle =
88    BidiStreamHandle<SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse>;
89
90impl SinkCoordinatorStreamHandle {
91    pub async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
92        self.send_request(SinkCoordinatorStreamRequest {
93            request: Some(sink_coordinator_stream_request::Request::Commit(
94                CommitMetadata { epoch, metadata },
95            )),
96        })
97        .await?;
98        match self.next_response().await? {
99            SinkCoordinatorStreamResponse {
100                response:
101                    Some(sink_coordinator_stream_response::Response::Commit(
102                        sink_coordinator_stream_response::CommitResponse {
103                            epoch: response_epoch,
104                        },
105                    )),
106            } => {
107                if epoch == response_epoch {
108                    Ok(())
109                } else {
110                    Err(RpcError::Internal(anyhow!(
111                        "get different response epoch to commit epoch: {} {}",
112                        epoch,
113                        response_epoch
114                    )))
115                }
116            }
117            msg => Err(RpcError::Internal(anyhow!(
118                "should get Commit response but get {:?}",
119                msg
120            ))),
121        }
122    }
123}