risingwave_rpc_client/
sink_coordinate_client.rs1use std::future::Future;
16
17use anyhow::anyhow;
18use futures::{Stream, TryStreamExt};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_pb::connector_service::coordinate_request::{
21 CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest,
22};
23use risingwave_pb::connector_service::coordinate_response::StartCoordinationResponse;
24use risingwave_pb::connector_service::{
25 CoordinateRequest, CoordinateResponse, PbSinkParam, SinkMetadata, coordinate_request,
26 coordinate_response,
27};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use tokio::sync::mpsc::Receiver;
30use tokio_stream::wrappers::ReceiverStream;
31use tonic::{Response, Status};
32
33use crate::error::RpcError;
34use crate::{BidiStreamHandle, SinkCoordinationRpcClient};
35
36pub type CoordinatorStreamHandle = BidiStreamHandle<CoordinateRequest, CoordinateResponse>;
37
38impl CoordinatorStreamHandle {
39 pub async fn new(
40 mut client: SinkCoordinationRpcClient,
41 param: PbSinkParam,
42 vnode_bitmap: Bitmap,
43 ) -> Result<(Self, Option<u64>), RpcError> {
44 let (instance, log_store_rewind_start_epoch) =
45 Self::new_with_init_stream(param, vnode_bitmap, |rx| async move {
46 client.coordinate(ReceiverStream::new(rx)).await
47 })
48 .await?;
49
50 Ok((instance, log_store_rewind_start_epoch))
51 }
52
53 pub async fn new_with_init_stream<F, St, Fut>(
54 param: PbSinkParam,
55 vnode_bitmap: Bitmap,
56 init_stream: F,
57 ) -> Result<(Self, Option<u64>), RpcError>
58 where
59 F: FnOnce(Receiver<CoordinateRequest>) -> Fut + Send,
60 St: Stream<Item = Result<CoordinateResponse, Status>> + Send + Unpin + 'static,
61 Fut: Future<Output = Result<Response<St>, Status>> + Send,
62 {
63 let (stream_handle, first_response) = BidiStreamHandle::initialize(
64 CoordinateRequest {
65 msg: Some(coordinate_request::Msg::StartRequest(
66 StartCoordinationRequest {
67 vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
68 param: Some(param),
69 },
70 )),
71 },
72 move |rx| async move {
73 init_stream(rx)
74 .await
75 .map(|response| {
76 response
77 .into_inner()
78 .map_err(RpcError::from_connector_status)
79 })
80 .map_err(RpcError::from_connector_status)
81 },
82 )
83 .await?;
84
85 match first_response {
86 CoordinateResponse {
87 msg:
88 Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
89 log_store_rewind_start_epoch,
90 })),
91 } => Ok((stream_handle, log_store_rewind_start_epoch)),
92 msg => Err(anyhow!("should get start response but get {:?}", msg).into()),
93 }
94 }
95
96 pub async fn commit(
97 &mut self,
98 epoch: u64,
99 metadata: SinkMetadata,
100 schema_change: Option<PbSinkSchemaChange>,
101 ) -> anyhow::Result<()> {
102 self.send_request(CoordinateRequest {
103 msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest {
104 epoch,
105 metadata: Some(metadata),
106 schema_change,
107 })),
108 })
109 .await?;
110 match self.next_response().await? {
111 CoordinateResponse {
112 msg: Some(coordinate_response::Msg::CommitResponse(_)),
113 } => Ok(()),
114 msg => Err(anyhow!("should get commit response but get {:?}", msg)),
115 }
116 }
117
118 pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<u64> {
119 self.send_request(CoordinateRequest {
120 msg: Some(coordinate_request::Msg::UpdateVnodeRequest(
121 UpdateVnodeBitmapRequest {
122 vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
123 },
124 )),
125 })
126 .await?;
127 match self.next_response().await? {
128 CoordinateResponse {
129 msg:
130 Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
131 log_store_rewind_start_epoch,
132 })),
133 } => Ok(log_store_rewind_start_epoch
134 .ok_or_else(|| anyhow!("should get start epoch after update vnode bitmap"))?),
135 msg => Err(anyhow!("should get start response but get {:?}", msg)),
136 }
137 }
138
139 pub async fn stop(mut self) -> anyhow::Result<()> {
140 self.send_request(CoordinateRequest {
141 msg: Some(coordinate_request::Msg::Stop(true)),
142 })
143 .await?;
144 match self.next_response().await? {
145 CoordinateResponse {
146 msg: Some(coordinate_response::Msg::Stopped(_)),
147 } => Ok(()),
148 msg => Err(anyhow!("should get Stopped but get {:?}", msg)),
149 }
150 }
151
152 pub async fn align_initial_epoch(&mut self, initial_epoch: u64) -> anyhow::Result<u64> {
153 self.send_request(CoordinateRequest {
154 msg: Some(coordinate_request::Msg::AlignInitialEpochRequest(
155 initial_epoch,
156 )),
157 })
158 .await?;
159 match self.next_response().await? {
160 CoordinateResponse {
161 msg: Some(coordinate_response::Msg::AlignInitialEpochResponse(epoch)),
162 } => Ok(epoch),
163 msg => Err(anyhow!(
164 "should get AlignInitialEpochResponse but get {:?}",
165 msg
166 )),
167 }
168 }
169}