risingwave_connector/sink/
mock_coordination_client.rs1use risingwave_common::bitmap::Bitmap;
16use risingwave_pb::connector_service::coordinate_response::{
17 self, CommitResponse, StartCoordinationResponse,
18};
19use risingwave_pb::connector_service::{
20 CoordinateRequest, CoordinateResponse, PbSinkParam, coordinate_request,
21};
22use risingwave_rpc_client::error::RpcError;
23use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
24use tokio::sync::mpsc::{self, Receiver};
25use tokio_stream::wrappers::ReceiverStream;
26use tonic::Status;
27
28use super::boxed::BoxCoordinator;
29use super::{BOUNDED_CHANNEL_SIZE, SinkParam};
30
31#[derive(Clone)]
32pub enum SinkCoordinationRpcClientEnum {
33 SinkCoordinationRpcClient(SinkCoordinationRpcClient),
34 MockSinkCoordinationRpcClient(MockSinkCoordinationRpcClient),
35}
36
37impl SinkCoordinationRpcClientEnum {
38 pub async fn new_stream_handle(
39 self,
40 param: &SinkParam,
41 vnode_bitmap: Bitmap,
42 ) -> super::Result<(CoordinatorStreamHandle, Option<u64>)> {
43 match self {
44 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
45 sink_coordination_rpc_client,
46 ) => {
47 let (handle, log_store_rewind_start_epoch) = CoordinatorStreamHandle::new(
48 sink_coordination_rpc_client,
49 param.to_proto(),
50 vnode_bitmap,
51 )
52 .await?;
53 Ok((handle, log_store_rewind_start_epoch))
54 }
55 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
56 mock_sink_coordination_rpc_client,
57 ) => {
58 let handle = mock_sink_coordination_rpc_client
59 .new_stream_handle(param.to_proto(), vnode_bitmap)
60 .await?;
61 Ok((handle, None))
62 }
63 }
64 }
65}
66
67#[derive(Clone)]
68pub struct MockMetaClient {
69 mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
70}
71impl MockMetaClient {
72 pub fn new(mock_coordinator_committer: BoxCoordinator) -> Self {
73 Self {
74 mock_coordinator_committer: std::sync::Arc::new(tokio::sync::Mutex::new(
75 mock_coordinator_committer,
76 )),
77 }
78 }
79
80 pub fn sink_coordinate_client(&self) -> MockSinkCoordinationRpcClient {
81 MockSinkCoordinationRpcClient::new(self.mock_coordinator_committer.clone())
82 }
83}
84
85#[derive(Clone)]
86pub struct MockSinkCoordinationRpcClient {
87 mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
88}
89
90impl MockSinkCoordinationRpcClient {
91 pub fn new(
92 mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
93 ) -> Self {
94 Self {
95 mock_coordinator_committer,
96 }
97 }
98
99 pub async fn new_stream_handle(
100 &self,
101 param: PbSinkParam,
102 vnode_bitmap: Bitmap,
103 ) -> std::result::Result<CoordinatorStreamHandle, RpcError> {
104 let (res, _) =
105 CoordinatorStreamHandle::new_with_init_stream(param, vnode_bitmap, |rx| async move {
106 self.coordinate(rx).await
107 })
108 .await?;
109 Ok(res)
110 }
111
112 pub async fn coordinate(
113 &self,
114 mut receiver_stream: Receiver<CoordinateRequest>,
115 ) -> std::result::Result<
116 tonic::Response<ReceiverStream<std::result::Result<CoordinateResponse, tonic::Status>>>,
117 Status,
118 > {
119 match receiver_stream.try_recv() {
120 Ok(CoordinateRequest {
121 msg:
122 Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest(
123 coordinate_request::StartCoordinationRequest {
124 param: Some(_param),
125 vnode_bitmap: Some(_vnode_bitmap),
126 },
127 )),
128 }) => (),
129 msg => {
130 return Err(Status::invalid_argument(format!(
131 "expected CoordinateRequest::StartRequest in the first request, get {:?}",
132 msg
133 )));
134 }
135 };
136
137 let (response_tx, response_rx) =
138 mpsc::channel::<std::result::Result<CoordinateResponse, Status>>(BOUNDED_CHANNEL_SIZE);
139 let response_tx = std::sync::Arc::new(response_tx);
140 response_tx
141 .send(Ok(CoordinateResponse {
142 msg: Some(coordinate_response::Msg::StartResponse(
143 StartCoordinationResponse {
144 log_store_rewind_start_epoch: None,
145 },
146 )),
147 }))
148 .await
149 .map_err(|e| Status::from_error(Box::new(e)))?;
150
151 let mock_coordinator_committer = self.mock_coordinator_committer.clone();
152 let response_tx_clone = response_tx.clone();
153 tokio::spawn(async move {
154 loop {
155 match receiver_stream.recv().await {
156 Some(CoordinateRequest {
157 msg:
158 Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest {
159 epoch,
160 metadata,
161 })),
162 }) => {
163 mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?;
164 response_tx_clone.clone().send(Ok(CoordinateResponse {
165 msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})),
166 })).await.map_err(|e| Status::from_error(Box::new(e)))?;
167 },
168 msg => {
169 return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(Status::invalid_argument(format!(
170 "expected CoordinateRequest::CommitRequest , get {:?}",
171 msg
172 )));
173 }
174 }
175 }
176 });
177
178 Ok(tonic::Response::new(ReceiverStream::new(response_rx)))
179 }
180}