risingwave_rpc_client/
sink_coordinate_client.rs1use std::future::Future;
16
17use anyhow::anyhow;
18use futures::{Stream, TryStreamExt};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::Field;
21use risingwave_pb::connector_service::coordinate_request::{
22 CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest,
23};
24use risingwave_pb::connector_service::coordinate_response::StartCoordinationResponse;
25use risingwave_pb::connector_service::{
26 CoordinateRequest, CoordinateResponse, PbSinkParam, SinkMetadata, coordinate_request,
27 coordinate_response,
28};
29use risingwave_pb::stream_plan::PbSinkAddColumns;
30use tokio::sync::mpsc::Receiver;
31use tokio_stream::wrappers::ReceiverStream;
32use tonic::{Response, Status};
33
34use crate::error::RpcError;
35use crate::{BidiStreamHandle, SinkCoordinationRpcClient};
36
37pub type CoordinatorStreamHandle = BidiStreamHandle<CoordinateRequest, CoordinateResponse>;
38
39impl CoordinatorStreamHandle {
40 pub async fn new(
41 mut client: SinkCoordinationRpcClient,
42 param: PbSinkParam,
43 vnode_bitmap: Bitmap,
44 ) -> Result<(Self, Option<u64>), RpcError> {
45 let (instance, log_store_rewind_start_epoch) =
46 Self::new_with_init_stream(param, vnode_bitmap, |rx| async move {
47 client.coordinate(ReceiverStream::new(rx)).await
48 })
49 .await?;
50
51 Ok((instance, log_store_rewind_start_epoch))
52 }
53
54 pub async fn new_with_init_stream<F, St, Fut>(
55 param: PbSinkParam,
56 vnode_bitmap: Bitmap,
57 init_stream: F,
58 ) -> Result<(Self, Option<u64>), RpcError>
59 where
60 F: FnOnce(Receiver<CoordinateRequest>) -> Fut + Send,
61 St: Stream<Item = Result<CoordinateResponse, Status>> + Send + Unpin + 'static,
62 Fut: Future<Output = Result<Response<St>, Status>> + Send,
63 {
64 let (stream_handle, first_response) = BidiStreamHandle::initialize(
65 CoordinateRequest {
66 msg: Some(coordinate_request::Msg::StartRequest(
67 StartCoordinationRequest {
68 vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
69 param: Some(param),
70 },
71 )),
72 },
73 move |rx| async move {
74 init_stream(rx)
75 .await
76 .map(|response| {
77 response
78 .into_inner()
79 .map_err(RpcError::from_connector_status)
80 })
81 .map_err(RpcError::from_connector_status)
82 },
83 )
84 .await?;
85
86 match first_response {
87 CoordinateResponse {
88 msg:
89 Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
90 log_store_rewind_start_epoch,
91 })),
92 } => Ok((stream_handle, log_store_rewind_start_epoch)),
93 msg => Err(anyhow!("should get start response but get {:?}", msg).into()),
94 }
95 }
96
97 pub async fn commit(
98 &mut self,
99 epoch: u64,
100 metadata: SinkMetadata,
101 add_columns: Option<Vec<Field>>,
102 ) -> anyhow::Result<()> {
103 self.send_request(CoordinateRequest {
104 msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest {
105 epoch,
106 metadata: Some(metadata),
107 add_columns: add_columns.map(|columns| PbSinkAddColumns {
108 fields: columns.into_iter().map(|field| field.to_prost()).collect(),
109 }),
110 })),
111 })
112 .await?;
113 match self.next_response().await? {
114 CoordinateResponse {
115 msg: Some(coordinate_response::Msg::CommitResponse(_)),
116 } => Ok(()),
117 msg => Err(anyhow!("should get commit response but get {:?}", msg)),
118 }
119 }
120
121 pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<u64> {
122 self.send_request(CoordinateRequest {
123 msg: Some(coordinate_request::Msg::UpdateVnodeRequest(
124 UpdateVnodeBitmapRequest {
125 vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
126 },
127 )),
128 })
129 .await?;
130 match self.next_response().await? {
131 CoordinateResponse {
132 msg:
133 Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
134 log_store_rewind_start_epoch,
135 })),
136 } => Ok(log_store_rewind_start_epoch
137 .ok_or_else(|| anyhow!("should get start epoch after update vnode bitmap"))?),
138 msg => Err(anyhow!("should get start response but get {:?}", msg)),
139 }
140 }
141
142 pub async fn stop(mut self) -> anyhow::Result<()> {
143 self.send_request(CoordinateRequest {
144 msg: Some(coordinate_request::Msg::Stop(true)),
145 })
146 .await?;
147 match self.next_response().await? {
148 CoordinateResponse {
149 msg: Some(coordinate_response::Msg::Stopped(_)),
150 } => Ok(()),
151 msg => Err(anyhow!("should get Stopped but get {:?}", msg)),
152 }
153 }
154
155 pub async fn align_initial_epoch(&mut self, initial_epoch: u64) -> anyhow::Result<u64> {
156 self.send_request(CoordinateRequest {
157 msg: Some(coordinate_request::Msg::AlignInitialEpochRequest(
158 initial_epoch,
159 )),
160 })
161 .await?;
162 match self.next_response().await? {
163 CoordinateResponse {
164 msg: Some(coordinate_response::Msg::AlignInitialEpochResponse(epoch)),
165 } => Ok(epoch),
166 msg => Err(anyhow!(
167 "should get AlignInitialEpochResponse but get {:?}",
168 msg
169 )),
170 }
171 }
172}