risingwave_meta/manager/sink_coordination/
handle.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::pin;
16use std::task::{Context, Poll};
17
18use anyhow::anyhow;
19use futures::{Future, TryStreamExt};
20use risingwave_common::bitmap::Bitmap;
21use risingwave_connector::sink::SinkParam;
22use risingwave_pb::connector_service::coordinate_response::{
23    CommitResponse, StartCoordinationResponse,
24};
25use risingwave_pb::connector_service::{
26    CoordinateResponse, SinkMetadata, coordinate_request, coordinate_response,
27};
28use tonic::Status;
29
30use crate::manager::sink_coordination::{SinkCoordinatorResponseSender, SinkWriterRequestStream};
31
32pub(super) struct SinkWriterCoordinationHandle {
33    request_stream: SinkWriterRequestStream,
34    response_tx: SinkCoordinatorResponseSender,
35    param: SinkParam,
36    vnode_bitmap: Bitmap,
37    prev_epoch: Option<u64>,
38}
39
40impl SinkWriterCoordinationHandle {
41    pub(super) fn new(
42        request_stream: SinkWriterRequestStream,
43        response_tx: SinkCoordinatorResponseSender,
44        param: SinkParam,
45        vnode_bitmap: Bitmap,
46    ) -> Self {
47        Self {
48            request_stream,
49            response_tx,
50            param,
51            vnode_bitmap,
52            prev_epoch: None,
53        }
54    }
55
56    pub(super) fn param(&self) -> &SinkParam {
57        &self.param
58    }
59
60    pub(super) fn vnode_bitmap(&self) -> &Bitmap {
61        &self.vnode_bitmap
62    }
63
64    pub(super) fn start(
65        &mut self,
66        log_store_rewind_start_epoch: Option<u64>,
67    ) -> anyhow::Result<()> {
68        self.response_tx
69            .send(Ok(CoordinateResponse {
70                msg: Some(coordinate_response::Msg::StartResponse(
71                    StartCoordinationResponse {
72                        log_store_rewind_start_epoch,
73                    },
74                )),
75            }))
76            .map_err(|_| anyhow!("fail to send start response"))
77    }
78
79    pub(super) fn abort(self, status: Status) {
80        let _ = self.response_tx.send(Err(status));
81    }
82
83    pub(super) fn ack_commit(&mut self, epoch: u64) -> anyhow::Result<()> {
84        self.response_tx
85            .send(Ok(CoordinateResponse {
86                msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse {
87                    epoch,
88                })),
89            }))
90            .map_err(|_| anyhow!("fail to send commit response of epoch {}", epoch))
91    }
92
93    pub(super) fn poll_next_commit_request(
94        &mut self,
95        cx: &mut Context<'_>,
96    ) -> Poll<anyhow::Result<Option<(u64, SinkMetadata)>>> {
97        let future = self.next_commit_request();
98        let future = pin!(future);
99        future.poll(cx)
100    }
101
102    async fn next_commit_request(&mut self) -> anyhow::Result<Option<(u64, SinkMetadata)>> {
103        loop {
104            let request = self
105                .request_stream
106                .try_next()
107                .await?
108                .ok_or_else(|| anyhow!("end of request stream"))?;
109            match request.msg.ok_or_else(|| anyhow!("None msg in request"))? {
110                coordinate_request::Msg::StartRequest(_) => {
111                    return Err(anyhow!("should have started"));
112                }
113                coordinate_request::Msg::CommitRequest(request) => {
114                    if let Some(prev_epoch) = self.prev_epoch {
115                        if request.epoch < prev_epoch {
116                            return Err(anyhow!(
117                                "invalid commit epoch {}, prev_epoch {}",
118                                request.epoch,
119                                prev_epoch
120                            ));
121                        }
122                    }
123                    let Some(metadata) = request.metadata else {
124                        return Err(anyhow!("empty commit metadata"));
125                    };
126                    self.prev_epoch = Some(request.epoch);
127                    return Ok(Some((request.epoch, metadata)));
128                }
129                coordinate_request::Msg::UpdateVnodeRequest(request) => {
130                    let bitmap = Bitmap::from(
131                        &request
132                            .vnode_bitmap
133                            .ok_or_else(|| anyhow!("empty vnode bitmap"))?,
134                    );
135                    self.vnode_bitmap = bitmap;
136                    continue;
137                }
138                coordinate_request::Msg::Stop(_) => {
139                    return Ok(None);
140                }
141            }
142        }
143    }
144}