risingwave_connector/sink/
mock_coordination_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bitmap::Bitmap;
16use risingwave_pb::connector_service::coordinate_response::{
17    self, CommitResponse, StartCoordinationResponse,
18};
19use risingwave_pb::connector_service::{
20    CoordinateRequest, CoordinateResponse, PbSinkParam, coordinate_request,
21};
22use risingwave_rpc_client::error::RpcError;
23use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
24use tokio::sync::mpsc::{self, Receiver};
25use tokio_stream::wrappers::ReceiverStream;
26use tonic::Status;
27
28use super::boxed::BoxCoordinator;
29use super::{BOUNDED_CHANNEL_SIZE, SinkParam};
30
31#[derive(Clone)]
32pub enum SinkCoordinationRpcClientEnum {
33    SinkCoordinationRpcClient(SinkCoordinationRpcClient),
34    MockSinkCoordinationRpcClient(MockSinkCoordinationRpcClient),
35}
36
37impl SinkCoordinationRpcClientEnum {
38    pub async fn new_stream_handle(
39        self,
40        param: &SinkParam,
41        vnode_bitmap: Bitmap,
42    ) -> super::Result<(CoordinatorStreamHandle, Option<u64>)> {
43        match self {
44            SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
45                sink_coordination_rpc_client,
46            ) => {
47                let (handle, log_store_rewind_start_epoch) = CoordinatorStreamHandle::new(
48                    sink_coordination_rpc_client,
49                    param.to_proto(),
50                    vnode_bitmap,
51                )
52                .await?;
53                Ok((handle, log_store_rewind_start_epoch))
54            }
55            SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
56                mock_sink_coordination_rpc_client,
57            ) => {
58                let handle = mock_sink_coordination_rpc_client
59                    .new_stream_handle(param.to_proto(), vnode_bitmap)
60                    .await?;
61                Ok((handle, None))
62            }
63        }
64    }
65}
66
67#[derive(Clone)]
68pub struct MockMetaClient {
69    mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
70}
71impl MockMetaClient {
72    pub fn new(mock_coordinator_committer: BoxCoordinator) -> Self {
73        Self {
74            mock_coordinator_committer: std::sync::Arc::new(tokio::sync::Mutex::new(
75                mock_coordinator_committer,
76            )),
77        }
78    }
79
80    pub fn sink_coordinate_client(&self) -> MockSinkCoordinationRpcClient {
81        MockSinkCoordinationRpcClient::new(self.mock_coordinator_committer.clone())
82    }
83}
84
85#[derive(Clone)]
86pub struct MockSinkCoordinationRpcClient {
87    mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
88}
89
90impl MockSinkCoordinationRpcClient {
91    pub fn new(
92        mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
93    ) -> Self {
94        Self {
95            mock_coordinator_committer,
96        }
97    }
98
99    pub async fn new_stream_handle(
100        &self,
101        param: PbSinkParam,
102        vnode_bitmap: Bitmap,
103    ) -> std::result::Result<CoordinatorStreamHandle, RpcError> {
104        let (res, _) =
105            CoordinatorStreamHandle::new_with_init_stream(param, vnode_bitmap, |rx| async move {
106                self.coordinate(rx).await
107            })
108            .await?;
109        Ok(res)
110    }
111
112    pub async fn coordinate(
113        &self,
114        mut receiver_stream: Receiver<CoordinateRequest>,
115    ) -> std::result::Result<
116        tonic::Response<ReceiverStream<std::result::Result<CoordinateResponse, tonic::Status>>>,
117        Status,
118    > {
119        match receiver_stream.try_recv() {
120            Ok(CoordinateRequest {
121                msg:
122                    Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest(
123                        coordinate_request::StartCoordinationRequest {
124                            param: Some(_param),
125                            vnode_bitmap: Some(_vnode_bitmap),
126                        },
127                    )),
128            }) => (),
129            msg => {
130                return Err(Status::invalid_argument(format!(
131                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
132                    msg
133                )));
134            }
135        };
136
137        let (response_tx, response_rx) =
138            mpsc::channel::<std::result::Result<CoordinateResponse, Status>>(BOUNDED_CHANNEL_SIZE);
139        let response_tx = std::sync::Arc::new(response_tx);
140        response_tx
141            .send(Ok(CoordinateResponse {
142                msg: Some(coordinate_response::Msg::StartResponse(
143                    StartCoordinationResponse {
144                        log_store_rewind_start_epoch: None,
145                    },
146                )),
147            }))
148            .await
149            .map_err(|e| Status::from_error(Box::new(e)))?;
150
151        let mock_coordinator_committer = self.mock_coordinator_committer.clone();
152        let response_tx_clone = response_tx.clone();
153        tokio::spawn(async move {
154            loop {
155                match receiver_stream.recv().await {
156                    Some(CoordinateRequest {
157                        msg:
158                            Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest {
159                                epoch,
160                                metadata,
161                            })),
162                    }) => {
163                        mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?;
164                        response_tx_clone.clone().send(Ok(CoordinateResponse {
165                            msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})),
166                        })).await.map_err(|e| Status::from_error(Box::new(e)))?;
167                    },
168                    msg => {
169                        return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(Status::invalid_argument(format!(
170                            "expected CoordinateRequest::CommitRequest , get {:?}",
171                            msg
172                        )));
173                    }
174                }
175            }
176        });
177
178        Ok(tonic::Response::new(ReceiverStream::new(response_rx)))
179    }
180}