risingwave_meta/manager/sink_coordination/
handle.rs1use std::pin::pin;
16use std::task::{Context, Poll};
17
18use anyhow::anyhow;
19use futures::{Future, TryStreamExt};
20use risingwave_common::bitmap::Bitmap;
21use risingwave_connector::sink::SinkParam;
22use risingwave_pb::connector_service::coordinate_response::{
23 CommitResponse, StartCoordinationResponse,
24};
25use risingwave_pb::connector_service::{
26 CoordinateResponse, SinkMetadata, coordinate_request, coordinate_response,
27};
28use tonic::Status;
29
30use crate::manager::sink_coordination::{SinkCoordinatorResponseSender, SinkWriterRequestStream};
31
32pub(super) struct SinkWriterCoordinationHandle {
33 request_stream: SinkWriterRequestStream,
34 response_tx: SinkCoordinatorResponseSender,
35 param: SinkParam,
36 vnode_bitmap: Bitmap,
37 prev_epoch: Option<u64>,
38}
39
40impl SinkWriterCoordinationHandle {
41 pub(super) fn new(
42 request_stream: SinkWriterRequestStream,
43 response_tx: SinkCoordinatorResponseSender,
44 param: SinkParam,
45 vnode_bitmap: Bitmap,
46 ) -> Self {
47 Self {
48 request_stream,
49 response_tx,
50 param,
51 vnode_bitmap,
52 prev_epoch: None,
53 }
54 }
55
56 pub(super) fn param(&self) -> &SinkParam {
57 &self.param
58 }
59
60 pub(super) fn vnode_bitmap(&self) -> &Bitmap {
61 &self.vnode_bitmap
62 }
63
64 pub(super) fn start(
65 &mut self,
66 log_store_rewind_start_epoch: Option<u64>,
67 ) -> anyhow::Result<()> {
68 self.response_tx
69 .send(Ok(CoordinateResponse {
70 msg: Some(coordinate_response::Msg::StartResponse(
71 StartCoordinationResponse {
72 log_store_rewind_start_epoch,
73 },
74 )),
75 }))
76 .map_err(|_| anyhow!("fail to send start response"))
77 }
78
79 pub(super) fn abort(self, status: Status) {
80 let _ = self.response_tx.send(Err(status));
81 }
82
83 pub(super) fn ack_commit(&mut self, epoch: u64) -> anyhow::Result<()> {
84 self.response_tx
85 .send(Ok(CoordinateResponse {
86 msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse {
87 epoch,
88 })),
89 }))
90 .map_err(|_| anyhow!("fail to send commit response of epoch {}", epoch))
91 }
92
93 pub(super) fn poll_next_commit_request(
94 &mut self,
95 cx: &mut Context<'_>,
96 ) -> Poll<anyhow::Result<Option<(u64, SinkMetadata)>>> {
97 let future = self.next_commit_request();
98 let future = pin!(future);
99 future.poll(cx)
100 }
101
102 async fn next_commit_request(&mut self) -> anyhow::Result<Option<(u64, SinkMetadata)>> {
103 loop {
104 let request = self
105 .request_stream
106 .try_next()
107 .await?
108 .ok_or_else(|| anyhow!("end of request stream"))?;
109 match request.msg.ok_or_else(|| anyhow!("None msg in request"))? {
110 coordinate_request::Msg::StartRequest(_) => {
111 return Err(anyhow!("should have started"));
112 }
113 coordinate_request::Msg::CommitRequest(request) => {
114 if let Some(prev_epoch) = self.prev_epoch {
115 if request.epoch < prev_epoch {
116 return Err(anyhow!(
117 "invalid commit epoch {}, prev_epoch {}",
118 request.epoch,
119 prev_epoch
120 ));
121 }
122 }
123 let Some(metadata) = request.metadata else {
124 return Err(anyhow!("empty commit metadata"));
125 };
126 self.prev_epoch = Some(request.epoch);
127 return Ok(Some((request.epoch, metadata)));
128 }
129 coordinate_request::Msg::UpdateVnodeRequest(request) => {
130 let bitmap = Bitmap::from(
131 &request
132 .vnode_bitmap
133 .ok_or_else(|| anyhow!("empty vnode bitmap"))?,
134 );
135 self.vnode_bitmap = bitmap;
136 continue;
137 }
138 coordinate_request::Msg::Stop(_) => {
139 return Ok(None);
140 }
141 }
142 }
143 }
144}