risingwave_rpc_client/
connector_client.rs1use anyhow::anyhow;
16use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata;
17use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
18use risingwave_pb::connector_service::sink_writer_stream_request::{
19 Barrier, Request as SinkRequest, WriteBatch,
20};
21use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
22use risingwave_pb::connector_service::*;
23
24use crate::error::{Result, RpcError};
25use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};
26
27pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
28pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;
29
30pub type SinkWriterStreamHandle<REQ = SinkWriterStreamRequest> =
31 BidiStreamHandle<REQ, SinkWriterStreamResponse>;
32
33impl<REQ: From<SinkWriterStreamRequest>> SinkWriterRequestSender<REQ> {
34 pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
35 self.send_request(SinkWriterStreamRequest {
36 request: Some(SinkRequest::WriteBatch(WriteBatch {
37 epoch,
38 batch_id,
39 payload: Some(payload),
40 })),
41 })
42 .await
43 }
44
45 pub async fn barrier(&mut self, epoch: u64, is_checkpoint: bool) -> Result<()> {
46 self.send_request(SinkWriterStreamRequest {
47 request: Some(SinkRequest::Barrier(Barrier {
48 epoch,
49 is_checkpoint,
50 })),
51 })
52 .await
53 }
54}
55
56impl SinkWriterResponseReceiver {
57 pub async fn next_commit_response(&mut self) -> Result<CommitResponse> {
58 match self.next_response().await? {
59 SinkWriterStreamResponse {
60 response: Some(sink_writer_stream_response::Response::Commit(rsp)),
61 } => Ok(rsp),
62 msg => Err(RpcError::Internal(anyhow!(
63 "should get Sync response but get {:?}",
64 msg
65 ))),
66 }
67 }
68}
69
70impl<REQ: From<SinkWriterStreamRequest>> SinkWriterStreamHandle<REQ> {
71 pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
72 self.request_sender
73 .write_batch(epoch, batch_id, payload)
74 .await
75 }
76
77 pub async fn barrier(&mut self, epoch: u64) -> Result<()> {
78 self.request_sender.barrier(epoch, false).await
79 }
80
81 pub async fn commit(&mut self, epoch: u64) -> Result<CommitResponse> {
82 self.request_sender.barrier(epoch, true).await?;
83 self.response_stream.next_commit_response().await
84 }
85}
86
87pub type SinkCoordinatorStreamHandle =
88 BidiStreamHandle<SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse>;
89
90impl SinkCoordinatorStreamHandle {
91 pub async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
92 self.send_request(SinkCoordinatorStreamRequest {
93 request: Some(sink_coordinator_stream_request::Request::Commit(
94 CommitMetadata { epoch, metadata },
95 )),
96 })
97 .await?;
98 match self.next_response().await? {
99 SinkCoordinatorStreamResponse {
100 response:
101 Some(sink_coordinator_stream_response::Response::Commit(
102 sink_coordinator_stream_response::CommitResponse {
103 epoch: response_epoch,
104 },
105 )),
106 } => {
107 if epoch == response_epoch {
108 Ok(())
109 } else {
110 Err(RpcError::Internal(anyhow!(
111 "get different response epoch to commit epoch: {} {}",
112 epoch,
113 response_epoch
114 )))
115 }
116 }
117 msg => Err(RpcError::Internal(anyhow!(
118 "should get Commit response but get {:?}",
119 msg
120 ))),
121 }
122 }
123}