risingwave_rpc_client/
sink_coordinate_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use anyhow::anyhow;
18use futures::{Stream, TryStreamExt};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::Field;
21use risingwave_pb::connector_service::coordinate_request::{
22    CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest,
23};
24use risingwave_pb::connector_service::coordinate_response::StartCoordinationResponse;
25use risingwave_pb::connector_service::{
26    CoordinateRequest, CoordinateResponse, PbSinkParam, SinkMetadata, coordinate_request,
27    coordinate_response,
28};
29use risingwave_pb::stream_plan::PbSinkAddColumns;
30use tokio::sync::mpsc::Receiver;
31use tokio_stream::wrappers::ReceiverStream;
32use tonic::{Response, Status};
33
34use crate::error::RpcError;
35use crate::{BidiStreamHandle, SinkCoordinationRpcClient};
36
37pub type CoordinatorStreamHandle = BidiStreamHandle<CoordinateRequest, CoordinateResponse>;
38
39impl CoordinatorStreamHandle {
40    pub async fn new(
41        mut client: SinkCoordinationRpcClient,
42        param: PbSinkParam,
43        vnode_bitmap: Bitmap,
44    ) -> Result<(Self, Option<u64>), RpcError> {
45        let (instance, log_store_rewind_start_epoch) =
46            Self::new_with_init_stream(param, vnode_bitmap, |rx| async move {
47                client.coordinate(ReceiverStream::new(rx)).await
48            })
49            .await?;
50
51        Ok((instance, log_store_rewind_start_epoch))
52    }
53
54    pub async fn new_with_init_stream<F, St, Fut>(
55        param: PbSinkParam,
56        vnode_bitmap: Bitmap,
57        init_stream: F,
58    ) -> Result<(Self, Option<u64>), RpcError>
59    where
60        F: FnOnce(Receiver<CoordinateRequest>) -> Fut + Send,
61        St: Stream<Item = Result<CoordinateResponse, Status>> + Send + Unpin + 'static,
62        Fut: Future<Output = Result<Response<St>, Status>> + Send,
63    {
64        let (stream_handle, first_response) = BidiStreamHandle::initialize(
65            CoordinateRequest {
66                msg: Some(coordinate_request::Msg::StartRequest(
67                    StartCoordinationRequest {
68                        vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
69                        param: Some(param),
70                    },
71                )),
72            },
73            move |rx| async move {
74                init_stream(rx)
75                    .await
76                    .map(|response| {
77                        response
78                            .into_inner()
79                            .map_err(RpcError::from_connector_status)
80                    })
81                    .map_err(RpcError::from_connector_status)
82            },
83        )
84        .await?;
85
86        match first_response {
87            CoordinateResponse {
88                msg:
89                    Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
90                        log_store_rewind_start_epoch,
91                    })),
92            } => Ok((stream_handle, log_store_rewind_start_epoch)),
93            msg => Err(anyhow!("should get start response but get {:?}", msg).into()),
94        }
95    }
96
97    pub async fn commit(
98        &mut self,
99        epoch: u64,
100        metadata: SinkMetadata,
101        add_columns: Option<Vec<Field>>,
102    ) -> anyhow::Result<()> {
103        self.send_request(CoordinateRequest {
104            msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest {
105                epoch,
106                metadata: Some(metadata),
107                add_columns: add_columns.map(|columns| PbSinkAddColumns {
108                    fields: columns.into_iter().map(|field| field.to_prost()).collect(),
109                }),
110            })),
111        })
112        .await?;
113        match self.next_response().await? {
114            CoordinateResponse {
115                msg: Some(coordinate_response::Msg::CommitResponse(_)),
116            } => Ok(()),
117            msg => Err(anyhow!("should get commit response but get {:?}", msg)),
118        }
119    }
120
121    pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<u64> {
122        self.send_request(CoordinateRequest {
123            msg: Some(coordinate_request::Msg::UpdateVnodeRequest(
124                UpdateVnodeBitmapRequest {
125                    vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
126                },
127            )),
128        })
129        .await?;
130        match self.next_response().await? {
131            CoordinateResponse {
132                msg:
133                    Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
134                        log_store_rewind_start_epoch,
135                    })),
136            } => Ok(log_store_rewind_start_epoch
137                .ok_or_else(|| anyhow!("should get start epoch after update vnode bitmap"))?),
138            msg => Err(anyhow!("should get start response but get {:?}", msg)),
139        }
140    }
141
142    pub async fn stop(mut self) -> anyhow::Result<()> {
143        self.send_request(CoordinateRequest {
144            msg: Some(coordinate_request::Msg::Stop(true)),
145        })
146        .await?;
147        match self.next_response().await? {
148            CoordinateResponse {
149                msg: Some(coordinate_response::Msg::Stopped(_)),
150            } => Ok(()),
151            msg => Err(anyhow!("should get Stopped but get {:?}", msg)),
152        }
153    }
154
155    pub async fn align_initial_epoch(&mut self, initial_epoch: u64) -> anyhow::Result<u64> {
156        self.send_request(CoordinateRequest {
157            msg: Some(coordinate_request::Msg::AlignInitialEpochRequest(
158                initial_epoch,
159            )),
160        })
161        .await?;
162        match self.next_response().await? {
163            CoordinateResponse {
164                msg: Some(coordinate_response::Msg::AlignInitialEpochResponse(epoch)),
165            } => Ok(epoch),
166            msg => Err(anyhow!(
167                "should get AlignInitialEpochResponse but get {:?}",
168                msg
169            )),
170        }
171    }
172}