risingwave_rpc_client/
connector_client.rs1use anyhow::anyhow;
16use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata;
17use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
18use risingwave_pb::connector_service::sink_writer_stream_request::{
19 Barrier, Request as SinkRequest, WriteBatch,
20};
21use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
22use risingwave_pb::connector_service::*;
23
24use crate::error::{Result, RpcError};
25use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};
26
27pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
28pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;
29
30pub type SinkWriterStreamHandle<REQ = SinkWriterStreamRequest> =
31 BidiStreamHandle<REQ, SinkWriterStreamResponse>;
32
33impl<REQ: From<SinkWriterStreamRequest>> SinkWriterRequestSender<REQ> {
34 pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
35 self.send_request(SinkWriterStreamRequest {
36 request: Some(SinkRequest::WriteBatch(WriteBatch {
37 epoch,
38 batch_id,
39 payload: Some(payload),
40 })),
41 })
42 .await
43 }
44
45 pub async fn barrier(&mut self, epoch: u64, is_checkpoint: bool) -> Result<()> {
46 self.send_request(SinkWriterStreamRequest {
47 request: Some(SinkRequest::Barrier(Barrier {
48 epoch,
49 is_checkpoint,
50 })),
51 })
52 .await
53 }
54}
55
56impl SinkWriterResponseReceiver {
57 pub async fn next_commit_response(&mut self) -> Result<CommitResponse> {
58 loop {
59 match self.next_response().await? {
60 SinkWriterStreamResponse {
61 response: Some(sink_writer_stream_response::Response::Commit(rsp)),
62 } => return Ok(rsp),
63 SinkWriterStreamResponse {
64 response: Some(sink_writer_stream_response::Response::Batch(_)),
65 } => continue,
66 msg => {
67 return Err(RpcError::Internal(anyhow!(
68 "should get Sync response but get {:?}",
69 msg
70 )));
71 }
72 }
73 }
74 }
75}
76
77impl<REQ: From<SinkWriterStreamRequest>> SinkWriterStreamHandle<REQ> {
78 pub async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> {
79 self.request_sender
80 .write_batch(epoch, batch_id, payload)
81 .await
82 }
83
84 pub async fn barrier(&mut self, epoch: u64) -> Result<()> {
85 self.request_sender.barrier(epoch, false).await
86 }
87
88 pub async fn commit(&mut self, epoch: u64) -> Result<CommitResponse> {
89 self.request_sender.barrier(epoch, true).await?;
90 self.response_stream.next_commit_response().await
91 }
92}
93
94pub type SinkCoordinatorStreamHandle =
95 BidiStreamHandle<SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse>;
96
97impl SinkCoordinatorStreamHandle {
98 pub async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
99 self.send_request(SinkCoordinatorStreamRequest {
100 request: Some(sink_coordinator_stream_request::Request::Commit(
101 CommitMetadata { epoch, metadata },
102 )),
103 })
104 .await?;
105 match self.next_response().await? {
106 SinkCoordinatorStreamResponse {
107 response:
108 Some(sink_coordinator_stream_response::Response::Commit(
109 sink_coordinator_stream_response::CommitResponse {
110 epoch: response_epoch,
111 },
112 )),
113 } => {
114 if epoch == response_epoch {
115 Ok(())
116 } else {
117 Err(RpcError::Internal(anyhow!(
118 "get different response epoch to commit epoch: {} {}",
119 epoch,
120 response_epoch
121 )))
122 }
123 }
124 msg => Err(RpcError::Internal(anyhow!(
125 "should get Commit response but get {:?}",
126 msg
127 ))),
128 }
129 }
130}