risingwave_rpc_client/
connector_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata;
17use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
18use risingwave_pb::connector_service::sink_writer_stream_request::{
19    Barrier, Request as SinkRequest, WriteBatch,
20};
21use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
22use risingwave_pb::connector_service::*;
23
24use crate::error::{Result, RpcError};
25use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};
26
27pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
28pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;
29
30pub type SinkWriterStreamHandle<REQ = SinkWriterStreamRequest> =
31    BidiStreamHandle<REQ, SinkWriterStreamResponse>;
32
33impl<REQ: From<SinkWriterStreamRequest>> SinkWriterRequestSender<REQ> {
34    pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
35        self.send_request(SinkWriterStreamRequest {
36            request: Some(SinkRequest::WriteBatch(WriteBatch {
37                epoch,
38                batch_id,
39                payload: Some(payload),
40            })),
41        })
42        .await
43    }
44
45    pub async fn barrier(&mut self, epoch: u64, is_checkpoint: bool) -> Result<()> {
46        self.send_request(SinkWriterStreamRequest {
47            request: Some(SinkRequest::Barrier(Barrier {
48                epoch,
49                is_checkpoint,
50            })),
51        })
52        .await
53    }
54}
55
56impl SinkWriterResponseReceiver {
57    pub async fn next_commit_response(&mut self) -> Result<CommitResponse> {
58        loop {
59            match self.next_response().await? {
60                SinkWriterStreamResponse {
61                    response: Some(sink_writer_stream_response::Response::Commit(rsp)),
62                } => return Ok(rsp),
63                SinkWriterStreamResponse {
64                    response: Some(sink_writer_stream_response::Response::Batch(_)),
65                } => continue,
66                msg => {
67                    return Err(RpcError::Internal(anyhow!(
68                        "should get Sync response but get {:?}",
69                        msg
70                    )));
71                }
72            }
73        }
74    }
75}
76
77impl<REQ: From<SinkWriterStreamRequest>> SinkWriterStreamHandle<REQ> {
78    pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
79        self.request_sender
80            .write_batch(epoch, batch_id, payload)
81            .await
82    }
83
84    pub async fn barrier(&mut self, epoch: u64) -> Result<()> {
85        self.request_sender.barrier(epoch, false).await
86    }
87
88    pub async fn commit(&mut self, epoch: u64) -> Result<CommitResponse> {
89        self.request_sender.barrier(epoch, true).await?;
90        self.response_stream.next_commit_response().await
91    }
92}
93
94pub type SinkCoordinatorStreamHandle =
95    BidiStreamHandle<SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse>;
96
97impl SinkCoordinatorStreamHandle {
98    pub async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
99        self.send_request(SinkCoordinatorStreamRequest {
100            request: Some(sink_coordinator_stream_request::Request::Commit(
101                CommitMetadata { epoch, metadata },
102            )),
103        })
104        .await?;
105        match self.next_response().await? {
106            SinkCoordinatorStreamResponse {
107                response:
108                    Some(sink_coordinator_stream_response::Response::Commit(
109                        sink_coordinator_stream_response::CommitResponse {
110                            epoch: response_epoch,
111                        },
112                    )),
113            } => {
114                if epoch == response_epoch {
115                    Ok(())
116                } else {
117                    Err(RpcError::Internal(anyhow!(
118                        "get different response epoch to commit epoch: {} {}",
119                        epoch,
120                        response_epoch
121                    )))
122                }
123            }
124            msg => Err(RpcError::Internal(anyhow!(
125                "should get Commit response but get {:?}",
126                msg
127            ))),
128        }
129    }
130}