risingwave_meta/manager/sink_coordination/
handle.rs1use std::task::{Context, Poll};
16
17use anyhow::anyhow;
18use futures::{TryStreamExt, ready};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::sink::SinkParam;
21use risingwave_pb::connector_service::coordinate_response::{
22 CommitResponse, StartCoordinationResponse,
23};
24use risingwave_pb::connector_service::{
25 CoordinateResponse, coordinate_request, coordinate_response,
26};
27use tonic::Status;
28
29use crate::manager::sink_coordination::{SinkCoordinatorResponseSender, SinkWriterRequestStream};
30
31pub(super) struct SinkWriterCoordinationHandle {
32 request_stream: SinkWriterRequestStream,
33 response_tx: SinkCoordinatorResponseSender,
34 param: SinkParam,
35 vnode_bitmap: Bitmap,
36 prev_epoch: Option<u64>,
37}
38
39impl SinkWriterCoordinationHandle {
40 pub(super) fn new(
41 request_stream: SinkWriterRequestStream,
42 response_tx: SinkCoordinatorResponseSender,
43 param: SinkParam,
44 vnode_bitmap: Bitmap,
45 ) -> Self {
46 Self {
47 request_stream,
48 response_tx,
49 param,
50 vnode_bitmap,
51 prev_epoch: None,
52 }
53 }
54
55 pub(super) fn param(&self) -> &SinkParam {
56 &self.param
57 }
58
59 pub(super) fn vnode_bitmap(&self) -> &Bitmap {
60 &self.vnode_bitmap
61 }
62
63 pub(super) fn start(
64 &mut self,
65 log_store_rewind_start_epoch: Option<u64>,
66 ) -> anyhow::Result<()> {
67 self.response_tx
68 .send(Ok(CoordinateResponse {
69 msg: Some(coordinate_response::Msg::StartResponse(
70 StartCoordinationResponse {
71 log_store_rewind_start_epoch,
72 },
73 )),
74 }))
75 .map_err(|_| anyhow!("fail to send start response"))
76 }
77
78 pub(super) fn ack_aligned_initial_epoch(
79 &mut self,
80 aligned_initial_epoch: u64,
81 ) -> anyhow::Result<()> {
82 self.response_tx
83 .send(Ok(CoordinateResponse {
84 msg: Some(coordinate_response::Msg::AlignInitialEpochResponse(
85 aligned_initial_epoch,
86 )),
87 }))
88 .map_err(|_| anyhow!("fail to send start response"))
89 }
90
91 pub(super) fn abort(self, status: Status) {
92 let _ = self.response_tx.send(Err(status));
93 }
94
95 pub(super) fn ack_commit(&mut self, epoch: u64) -> anyhow::Result<()> {
96 self.response_tx
97 .send(Ok(CoordinateResponse {
98 msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse {
99 epoch,
100 })),
101 }))
102 .map_err(|_| anyhow!("fail to send commit response of epoch {}", epoch))
103 }
104
105 pub(super) fn stop(&mut self) -> anyhow::Result<()> {
106 self.response_tx
107 .send(Ok(CoordinateResponse {
108 msg: Some(coordinate_response::Msg::Stopped(true)),
109 }))
110 .map_err(|_| anyhow!("fail to send stopped response"))
111 }
112
113 pub(super) fn poll_next_request(
114 &mut self,
115 cx: &mut Context<'_>,
116 ) -> Poll<anyhow::Result<coordinate_request::Msg>> {
117 let result = try {
118 let request = ready!(self.request_stream.try_poll_next_unpin(cx))
119 .ok_or_else(|| anyhow!("end of request stream"))??;
120 let request = request.msg.ok_or_else(|| anyhow!("None msg in request"))?;
121 match &request {
122 coordinate_request::Msg::StartRequest(_)
123 | coordinate_request::Msg::Stop(_)
124 | coordinate_request::Msg::AlignInitialEpochRequest(_) => {}
125 coordinate_request::Msg::CommitRequest(request) => {
126 if let Some(prev_epoch) = self.prev_epoch {
127 if request.epoch < prev_epoch {
128 return Poll::Ready(Err(anyhow!(
129 "invalid commit epoch {}, prev_epoch {}",
130 request.epoch,
131 prev_epoch
132 )));
133 }
134 }
135 if request.metadata.is_none() {
136 return Poll::Ready(Err(anyhow!("empty commit metadata")));
137 };
138 self.prev_epoch = Some(request.epoch);
139 }
140 coordinate_request::Msg::UpdateVnodeRequest(request) => {
141 let bitmap = Bitmap::from(
142 request
143 .vnode_bitmap
144 .as_ref()
145 .ok_or_else(|| anyhow!("empty vnode bitmap"))?,
146 );
147 self.vnode_bitmap = bitmap;
148 }
149 };
150 request
151 };
152 Poll::Ready(result)
153 }
154}