risingwave_rpc_client/
sink_coordinate_client.rs1use std::future::Future;
16
17use anyhow::anyhow;
18use futures::{Stream, TryStreamExt};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_pb::connector_service::coordinate_request::{
21 CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest,
22};
23use risingwave_pb::connector_service::coordinate_response::StartCoordinationResponse;
24use risingwave_pb::connector_service::{
25 CoordinateRequest, CoordinateResponse, PbSinkParam, SinkMetadata, coordinate_request,
26 coordinate_response,
27};
28use tokio::sync::mpsc::Receiver;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Response, Status};
31
32use crate::error::RpcError;
33use crate::{BidiStreamHandle, SinkCoordinationRpcClient};
34
35pub type CoordinatorStreamHandle = BidiStreamHandle<CoordinateRequest, CoordinateResponse>;
36
37impl CoordinatorStreamHandle {
38 pub async fn new(
39 mut client: SinkCoordinationRpcClient,
40 param: PbSinkParam,
41 vnode_bitmap: Bitmap,
42 ) -> Result<(Self, Option<u64>), RpcError> {
43 let (instance, log_store_rewind_start_epoch) =
44 Self::new_with_init_stream(param, vnode_bitmap, |rx| async move {
45 client.coordinate(ReceiverStream::new(rx)).await
46 })
47 .await?;
48
49 Ok((instance, log_store_rewind_start_epoch))
50 }
51
52 pub async fn new_with_init_stream<F, St, Fut>(
53 param: PbSinkParam,
54 vnode_bitmap: Bitmap,
55 init_stream: F,
56 ) -> Result<(Self, Option<u64>), RpcError>
57 where
58 F: FnOnce(Receiver<CoordinateRequest>) -> Fut + Send,
59 St: Stream<Item = Result<CoordinateResponse, Status>> + Send + Unpin + 'static,
60 Fut: Future<Output = Result<Response<St>, Status>> + Send,
61 {
62 let (stream_handle, first_response) = BidiStreamHandle::initialize(
63 CoordinateRequest {
64 msg: Some(coordinate_request::Msg::StartRequest(
65 StartCoordinationRequest {
66 vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
67 param: Some(param),
68 },
69 )),
70 },
71 move |rx| async move {
72 init_stream(rx)
73 .await
74 .map(|response| {
75 response
76 .into_inner()
77 .map_err(RpcError::from_connector_status)
78 })
79 .map_err(RpcError::from_connector_status)
80 },
81 )
82 .await?;
83
84 match first_response {
85 CoordinateResponse {
86 msg:
87 Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
88 log_store_rewind_start_epoch,
89 })),
90 } => Ok((stream_handle, log_store_rewind_start_epoch)),
91 msg => Err(anyhow!("should get start response but get {:?}", msg).into()),
92 }
93 }
94
95 pub async fn commit(&mut self, epoch: u64, metadata: SinkMetadata) -> anyhow::Result<()> {
96 self.send_request(CoordinateRequest {
97 msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest {
98 epoch,
99 metadata: Some(metadata),
100 })),
101 })
102 .await?;
103 match self.next_response().await? {
104 CoordinateResponse {
105 msg: Some(coordinate_response::Msg::CommitResponse(_)),
106 } => Ok(()),
107 msg => Err(anyhow!("should get commit response but get {:?}", msg)),
108 }
109 }
110
111 pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<()> {
112 self.send_request(CoordinateRequest {
113 msg: Some(coordinate_request::Msg::UpdateVnodeRequest(
114 UpdateVnodeBitmapRequest {
115 vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
116 },
117 )),
118 })
119 .await?;
120 Ok(())
121 }
122
123 pub async fn stop(mut self) -> anyhow::Result<()> {
124 self.send_request(CoordinateRequest {
125 msg: Some(coordinate_request::Msg::Stop(true)),
126 })
127 .await?;
128 Ok(())
129 }
130}