risingwave_rpc_client/
sink_coordinate_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use anyhow::anyhow;
18use futures::{Stream, TryStreamExt};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_pb::connector_service::coordinate_request::{
21    CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest,
22};
23use risingwave_pb::connector_service::coordinate_response::StartCoordinationResponse;
24use risingwave_pb::connector_service::{
25    CoordinateRequest, CoordinateResponse, PbSinkParam, SinkMetadata, coordinate_request,
26    coordinate_response,
27};
28use tokio::sync::mpsc::Receiver;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Response, Status};
31
32use crate::error::RpcError;
33use crate::{BidiStreamHandle, SinkCoordinationRpcClient};
34
35pub type CoordinatorStreamHandle = BidiStreamHandle<CoordinateRequest, CoordinateResponse>;
36
37impl CoordinatorStreamHandle {
38    pub async fn new(
39        mut client: SinkCoordinationRpcClient,
40        param: PbSinkParam,
41        vnode_bitmap: Bitmap,
42    ) -> Result<(Self, Option<u64>), RpcError> {
43        let (instance, log_store_rewind_start_epoch) =
44            Self::new_with_init_stream(param, vnode_bitmap, |rx| async move {
45                client.coordinate(ReceiverStream::new(rx)).await
46            })
47            .await?;
48
49        Ok((instance, log_store_rewind_start_epoch))
50    }
51
52    pub async fn new_with_init_stream<F, St, Fut>(
53        param: PbSinkParam,
54        vnode_bitmap: Bitmap,
55        init_stream: F,
56    ) -> Result<(Self, Option<u64>), RpcError>
57    where
58        F: FnOnce(Receiver<CoordinateRequest>) -> Fut + Send,
59        St: Stream<Item = Result<CoordinateResponse, Status>> + Send + Unpin + 'static,
60        Fut: Future<Output = Result<Response<St>, Status>> + Send,
61    {
62        let (stream_handle, first_response) = BidiStreamHandle::initialize(
63            CoordinateRequest {
64                msg: Some(coordinate_request::Msg::StartRequest(
65                    StartCoordinationRequest {
66                        vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
67                        param: Some(param),
68                    },
69                )),
70            },
71            move |rx| async move {
72                init_stream(rx)
73                    .await
74                    .map(|response| {
75                        response
76                            .into_inner()
77                            .map_err(RpcError::from_connector_status)
78                    })
79                    .map_err(RpcError::from_connector_status)
80            },
81        )
82        .await?;
83
84        match first_response {
85            CoordinateResponse {
86                msg:
87                    Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
88                        log_store_rewind_start_epoch,
89                    })),
90            } => Ok((stream_handle, log_store_rewind_start_epoch)),
91            msg => Err(anyhow!("should get start response but get {:?}", msg).into()),
92        }
93    }
94
95    pub async fn commit(&mut self, epoch: u64, metadata: SinkMetadata) -> anyhow::Result<()> {
96        self.send_request(CoordinateRequest {
97            msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest {
98                epoch,
99                metadata: Some(metadata),
100            })),
101        })
102        .await?;
103        match self.next_response().await? {
104            CoordinateResponse {
105                msg: Some(coordinate_response::Msg::CommitResponse(_)),
106            } => Ok(()),
107            msg => Err(anyhow!("should get commit response but get {:?}", msg)),
108        }
109    }
110
111    pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<()> {
112        self.send_request(CoordinateRequest {
113            msg: Some(coordinate_request::Msg::UpdateVnodeRequest(
114                UpdateVnodeBitmapRequest {
115                    vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
116                },
117            )),
118        })
119        .await?;
120        Ok(())
121    }
122
123    pub async fn stop(mut self) -> anyhow::Result<()> {
124        self.send_request(CoordinateRequest {
125            msg: Some(coordinate_request::Msg::Stop(true)),
126        })
127        .await?;
128        Ok(())
129    }
130}