risingwave_rpc_client/
sink_coordinate_client.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use anyhow::anyhow;
18use futures::{Stream, TryStreamExt};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_pb::connector_service::coordinate_request::{
21    CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest,
22};
23use risingwave_pb::connector_service::coordinate_response::StartCoordinationResponse;
24use risingwave_pb::connector_service::{
25    CoordinateRequest, CoordinateResponse, PbSinkParam, SinkMetadata, coordinate_request,
26    coordinate_response,
27};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use tokio::sync::mpsc::Receiver;
30use tokio_stream::wrappers::ReceiverStream;
31use tonic::{Response, Status};
32
33use crate::error::RpcError;
34use crate::{BidiStreamHandle, SinkCoordinationRpcClient};
35
36pub type CoordinatorStreamHandle = BidiStreamHandle<CoordinateRequest, CoordinateResponse>;
37
38impl CoordinatorStreamHandle {
39    pub async fn new(
40        mut client: SinkCoordinationRpcClient,
41        param: PbSinkParam,
42        vnode_bitmap: Bitmap,
43    ) -> Result<(Self, Option<u64>), RpcError> {
44        let (instance, log_store_rewind_start_epoch) =
45            Self::new_with_init_stream(param, vnode_bitmap, |rx| async move {
46                client.coordinate(ReceiverStream::new(rx)).await
47            })
48            .await?;
49
50        Ok((instance, log_store_rewind_start_epoch))
51    }
52
53    pub async fn new_with_init_stream<F, St, Fut>(
54        param: PbSinkParam,
55        vnode_bitmap: Bitmap,
56        init_stream: F,
57    ) -> Result<(Self, Option<u64>), RpcError>
58    where
59        F: FnOnce(Receiver<CoordinateRequest>) -> Fut + Send,
60        St: Stream<Item = Result<CoordinateResponse, Status>> + Send + Unpin + 'static,
61        Fut: Future<Output = Result<Response<St>, Status>> + Send,
62    {
63        let (stream_handle, first_response) = BidiStreamHandle::initialize(
64            CoordinateRequest {
65                msg: Some(coordinate_request::Msg::StartRequest(
66                    StartCoordinationRequest {
67                        vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
68                        param: Some(param),
69                    },
70                )),
71            },
72            move |rx| async move {
73                init_stream(rx)
74                    .await
75                    .map(|response| {
76                        response
77                            .into_inner()
78                            .map_err(RpcError::from_connector_status)
79                    })
80                    .map_err(RpcError::from_connector_status)
81            },
82        )
83        .await?;
84
85        match first_response {
86            CoordinateResponse {
87                msg:
88                    Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
89                        log_store_rewind_start_epoch,
90                    })),
91            } => Ok((stream_handle, log_store_rewind_start_epoch)),
92            msg => Err(anyhow!("should get start response but get {:?}", msg).into()),
93        }
94    }
95
96    pub async fn commit(
97        &mut self,
98        epoch: u64,
99        metadata: SinkMetadata,
100        schema_change: Option<PbSinkSchemaChange>,
101    ) -> anyhow::Result<()> {
102        self.send_request(CoordinateRequest {
103            msg: Some(coordinate_request::Msg::CommitRequest(CommitRequest {
104                epoch,
105                metadata: Some(metadata),
106                schema_change,
107            })),
108        })
109        .await?;
110        match self.next_response().await? {
111            CoordinateResponse {
112                msg: Some(coordinate_response::Msg::CommitResponse(_)),
113            } => Ok(()),
114            msg => Err(anyhow!("should get commit response but get {:?}", msg)),
115        }
116    }
117
118    pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<u64> {
119        self.send_request(CoordinateRequest {
120            msg: Some(coordinate_request::Msg::UpdateVnodeRequest(
121                UpdateVnodeBitmapRequest {
122                    vnode_bitmap: Some(vnode_bitmap.to_protobuf()),
123                },
124            )),
125        })
126        .await?;
127        match self.next_response().await? {
128            CoordinateResponse {
129                msg:
130                    Some(coordinate_response::Msg::StartResponse(StartCoordinationResponse {
131                        log_store_rewind_start_epoch,
132                    })),
133            } => Ok(log_store_rewind_start_epoch
134                .ok_or_else(|| anyhow!("should get start epoch after update vnode bitmap"))?),
135            msg => Err(anyhow!("should get start response but get {:?}", msg)),
136        }
137    }
138
139    pub async fn stop(mut self) -> anyhow::Result<()> {
140        self.send_request(CoordinateRequest {
141            msg: Some(coordinate_request::Msg::Stop(true)),
142        })
143        .await?;
144        match self.next_response().await? {
145            CoordinateResponse {
146                msg: Some(coordinate_response::Msg::Stopped(_)),
147            } => Ok(()),
148            msg => Err(anyhow!("should get Stopped but get {:?}", msg)),
149        }
150    }
151
152    pub async fn align_initial_epoch(&mut self, initial_epoch: u64) -> anyhow::Result<u64> {
153        self.send_request(CoordinateRequest {
154            msg: Some(coordinate_request::Msg::AlignInitialEpochRequest(
155                initial_epoch,
156            )),
157        })
158        .await?;
159        match self.next_response().await? {
160            CoordinateResponse {
161                msg: Some(coordinate_response::Msg::AlignInitialEpochResponse(epoch)),
162            } => Ok(epoch),
163            msg => Err(anyhow!(
164                "should get AlignInitialEpochResponse but get {:?}",
165                msg
166            )),
167        }
168    }
169}