risingwave_meta/manager/sink_coordination/
handle.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::task::{Context, Poll};
16
17use anyhow::anyhow;
18use futures::{TryStreamExt, ready};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::sink::SinkParam;
21use risingwave_pb::connector_service::coordinate_response::{
22    CommitResponse, StartCoordinationResponse,
23};
24use risingwave_pb::connector_service::{
25    CoordinateResponse, coordinate_request, coordinate_response,
26};
27use tonic::Status;
28
29use crate::manager::sink_coordination::{SinkCoordinatorResponseSender, SinkWriterRequestStream};
30
31pub(super) struct SinkWriterCoordinationHandle {
32    request_stream: SinkWriterRequestStream,
33    response_tx: SinkCoordinatorResponseSender,
34    param: SinkParam,
35    vnode_bitmap: Bitmap,
36    prev_epoch: Option<u64>,
37}
38
39impl SinkWriterCoordinationHandle {
40    pub(super) fn new(
41        request_stream: SinkWriterRequestStream,
42        response_tx: SinkCoordinatorResponseSender,
43        param: SinkParam,
44        vnode_bitmap: Bitmap,
45    ) -> Self {
46        Self {
47            request_stream,
48            response_tx,
49            param,
50            vnode_bitmap,
51            prev_epoch: None,
52        }
53    }
54
55    pub(super) fn param(&self) -> &SinkParam {
56        &self.param
57    }
58
59    pub(super) fn vnode_bitmap(&self) -> &Bitmap {
60        &self.vnode_bitmap
61    }
62
63    pub(super) fn start(
64        &mut self,
65        log_store_rewind_start_epoch: Option<u64>,
66    ) -> anyhow::Result<()> {
67        self.response_tx
68            .send(Ok(CoordinateResponse {
69                msg: Some(coordinate_response::Msg::StartResponse(
70                    StartCoordinationResponse {
71                        log_store_rewind_start_epoch,
72                    },
73                )),
74            }))
75            .map_err(|_| anyhow!("fail to send start response"))
76    }
77
78    pub(super) fn ack_aligned_initial_epoch(
79        &mut self,
80        aligned_initial_epoch: u64,
81    ) -> anyhow::Result<()> {
82        self.response_tx
83            .send(Ok(CoordinateResponse {
84                msg: Some(coordinate_response::Msg::AlignInitialEpochResponse(
85                    aligned_initial_epoch,
86                )),
87            }))
88            .map_err(|_| anyhow!("fail to send start response"))
89    }
90
91    pub(super) fn abort(self, status: Status) {
92        let _ = self.response_tx.send(Err(status));
93    }
94
95    pub(super) fn ack_commit(&mut self, epoch: u64) -> anyhow::Result<()> {
96        self.response_tx
97            .send(Ok(CoordinateResponse {
98                msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse {
99                    epoch,
100                })),
101            }))
102            .map_err(|_| anyhow!("fail to send commit response of epoch {}", epoch))
103    }
104
105    pub(super) fn stop(&mut self) -> anyhow::Result<()> {
106        self.response_tx
107            .send(Ok(CoordinateResponse {
108                msg: Some(coordinate_response::Msg::Stopped(true)),
109            }))
110            .map_err(|_| anyhow!("fail to send stopped response"))
111    }
112
113    pub(super) fn poll_next_request(
114        &mut self,
115        cx: &mut Context<'_>,
116    ) -> Poll<anyhow::Result<coordinate_request::Msg>> {
117        let result = try {
118            let request = ready!(self.request_stream.try_poll_next_unpin(cx))
119                .ok_or_else(|| anyhow!("end of request stream"))??;
120            let request = request.msg.ok_or_else(|| anyhow!("None msg in request"))?;
121            match &request {
122                coordinate_request::Msg::StartRequest(_)
123                | coordinate_request::Msg::Stop(_)
124                | coordinate_request::Msg::AlignInitialEpochRequest(_) => {}
125                coordinate_request::Msg::CommitRequest(request) => {
126                    if let Some(prev_epoch) = self.prev_epoch {
127                        if request.epoch < prev_epoch {
128                            return Poll::Ready(Err(anyhow!(
129                                "invalid commit epoch {}, prev_epoch {}",
130                                request.epoch,
131                                prev_epoch
132                            )));
133                        }
134                    }
135                    if request.metadata.is_none() {
136                        return Poll::Ready(Err(anyhow!("empty commit metadata")));
137                    };
138                    self.prev_epoch = Some(request.epoch);
139                }
140                coordinate_request::Msg::UpdateVnodeRequest(request) => {
141                    let bitmap = Bitmap::from(
142                        request
143                            .vnode_bitmap
144                            .as_ref()
145                            .ok_or_else(|| anyhow!("empty vnode bitmap"))?,
146                    );
147                    self.vnode_bitmap = bitmap;
148                }
149            };
150            request
151        };
152        Poll::Ready(result)
153    }
154}