risingwave_meta/manager/sink_coordination/
coordinator_worker.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::future::{Future, poll_fn};
17use std::pin::pin;
18use std::task::Poll;
19use std::time::{Duration, Instant};
20
21use anyhow::anyhow;
22use futures::future::{Either, select};
23use futures::pin_mut;
24use itertools::Itertools;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_connector::dispatch_sink;
27use risingwave_connector::sink::{
28 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink,
29};
30use risingwave_pb::connector_service::SinkMetadata;
31use sea_orm::DatabaseConnection;
32use thiserror_ext::AsReport;
33use tokio::select;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::time::sleep;
36use tonic::Status;
37use tracing::{error, warn};
38
39use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
40
41async fn run_future_with_periodic_fn<F: Future>(
42 future: F,
43 interval: Duration,
44 mut f: impl FnMut(),
45) -> F::Output {
46 pin_mut!(future);
47 loop {
48 match select(&mut future, pin!(sleep(interval))).await {
49 Either::Left((output, _)) => {
50 break output;
51 }
52 Either::Right(_) => f(),
53 }
54 }
55}
56
57struct EpochCommitRequests {
58 epoch: u64,
59 metadatas: Vec<SinkMetadata>,
60 handle_ids: HashSet<usize>,
61 committed_bitmap: Option<Bitmap>, }
63
64impl EpochCommitRequests {
65 fn new(epoch: u64) -> Self {
66 Self {
67 epoch,
68 metadatas: vec![],
69 handle_ids: Default::default(),
70 committed_bitmap: None,
71 }
72 }
73
74 fn add_new_request(
75 &mut self,
76 handle_id: usize,
77 metadata: SinkMetadata,
78 vnode_bitmap: Bitmap,
79 ) -> anyhow::Result<()> {
80 let committed_bitmap = self
81 .committed_bitmap
82 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
83 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
84
85 self.metadatas.push(metadata);
86 assert!(self.handle_ids.insert(handle_id));
87 let check_bitmap = (&*committed_bitmap) & &vnode_bitmap;
88 if check_bitmap.count_ones() > 0 {
89 return Err(anyhow!(
90 "duplicate vnode {:?} on epoch {}. request vnode: {:?}, prev vnode: {:?}",
91 check_bitmap.iter_ones().collect_vec(),
92 self.epoch,
93 vnode_bitmap,
94 committed_bitmap
95 ));
96 }
97 *committed_bitmap |= &vnode_bitmap;
98 Ok(())
99 }
100
101 fn can_commit(&self) -> bool {
102 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
103 }
104}
105
106struct CoordinationHandleManager {
107 param: SinkParam,
108 writer_handles: HashMap<usize, SinkWriterCoordinationHandle>,
109 next_handle_id: usize,
110 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
111 initial_log_store_rewind_start_epoch: Option<u64>,
112}
113
114impl CoordinationHandleManager {
115 fn ack_commit(
116 &mut self,
117 epoch: u64,
118 handle_ids: impl IntoIterator<Item = usize>,
119 ) -> anyhow::Result<()> {
120 for handle_id in handle_ids {
121 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
122 anyhow!(
123 "fail to find handle for {} when ack commit on epoch {}",
124 handle_id,
125 epoch
126 )
127 })?;
128 handle.ack_commit(epoch).map_err(|_| {
129 anyhow!(
130 "fail to ack commit on epoch {} for handle {}",
131 epoch,
132 handle_id
133 )
134 })?;
135 }
136 Ok(())
137 }
138
139 async fn next_commit_request_inner(
140 writer_handles: &mut HashMap<usize, SinkWriterCoordinationHandle>,
141 ) -> anyhow::Result<(usize, Bitmap, u64, SinkMetadata)> {
142 poll_fn(|cx| {
143 'outer: loop {
144 for (handle_id, handle) in writer_handles.iter_mut() {
145 if let Poll::Ready(result) = handle.poll_next_commit_request(cx) {
146 match result {
147 Ok(Some((epoch, metadata))) => {
148 return Poll::Ready(Ok((
149 *handle_id,
150 handle.vnode_bitmap().clone(),
151 epoch,
152 metadata,
153 )));
154 }
155 Ok(None) => {
156 let handle_id = *handle_id;
157 writer_handles.remove(&handle_id);
158 continue 'outer;
159 }
160 Err(e) => {
161 return Poll::Ready(Err(e));
162 }
163 }
164 }
165 }
166 return Poll::Pending;
167 }
168 })
169 .await
170 }
171
172 async fn next_commit_request(&mut self) -> anyhow::Result<(usize, Bitmap, u64, SinkMetadata)> {
173 loop {
174 select! {
175 handle = self.request_rx.recv() => {
176 let mut handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
177 if handle.param() != &self.param {
178 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
179 }
180 handle.start(self.initial_log_store_rewind_start_epoch)?;
181 let handle_id = self.next_handle_id;
182 self.next_handle_id += 1;
183 self.writer_handles.insert(handle_id, handle);
184 }
185 result = Self::next_commit_request_inner(&mut self.writer_handles) => {
186 break result;
187 }
188 }
189 }
190 }
191}
192
193pub struct CoordinatorWorker {
194 handle_manager: CoordinationHandleManager,
195 pending_epochs: BTreeMap<u64, EpochCommitRequests>,
196}
197
198impl CoordinatorWorker {
199 pub async fn run(
200 param: SinkParam,
201 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
202 db: DatabaseConnection,
203 subscriber: SinkCommittedEpochSubscriber,
204 ) {
205 let sink = match build_sink(param.clone()) {
206 Ok(sink) => sink,
207 Err(e) => {
208 error!(
209 error = %e.as_report(),
210 "unable to build sink with param {:?}",
211 param
212 );
213 return;
214 }
215 };
216
217 dispatch_sink!(sink, sink, {
218 let coordinator = match sink.new_coordinator(db).await {
219 Ok(coordinator) => coordinator,
220 Err(e) => {
221 error!(
222 error = %e.as_report(),
223 "unable to build coordinator with param {:?}",
224 param
225 );
226 return;
227 }
228 };
229 Self::execute_coordinator(param, request_rx, coordinator, subscriber).await
230 });
231 }
232
233 pub async fn execute_coordinator(
234 param: SinkParam,
235 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
236 coordinator: impl SinkCommitCoordinator,
237 subscriber: SinkCommittedEpochSubscriber,
238 ) {
239 let mut worker = CoordinatorWorker {
240 handle_manager: CoordinationHandleManager {
241 param,
242 writer_handles: HashMap::new(),
243 next_handle_id: 0,
244 request_rx,
245 initial_log_store_rewind_start_epoch: None,
246 },
247 pending_epochs: Default::default(),
248 };
249
250 if let Err(e) = worker.run_coordination(coordinator, subscriber).await {
251 for handle in worker.handle_manager.writer_handles.into_values() {
252 handle.abort(Status::internal(format!(
253 "failed to run coordination: {:?}",
254 e.as_report()
255 )))
256 }
257 }
258 }
259
260 async fn run_coordination(
261 &mut self,
262 mut coordinator: impl SinkCommitCoordinator,
263 subscriber: SinkCommittedEpochSubscriber,
264 ) -> anyhow::Result<()> {
265 self.handle_manager.initial_log_store_rewind_start_epoch =
266 coordinator.init(subscriber).await?;
267 loop {
268 let (handle_id, vnode_bitmap, epoch, metadata) =
269 self.handle_manager.next_commit_request().await?;
270 self.pending_epochs
271 .entry(epoch)
272 .or_insert_with(|| EpochCommitRequests::new(epoch))
273 .add_new_request(handle_id, metadata, vnode_bitmap)?;
274 if self
275 .pending_epochs
276 .first_key_value()
277 .expect("non-empty")
278 .1
279 .can_commit()
280 {
281 let (epoch, requests) = self.pending_epochs.pop_first().expect("non-empty");
282
283 let start_time = Instant::now();
284 run_future_with_periodic_fn(
285 coordinator.commit(epoch, requests.metadatas),
286 Duration::from_secs(5),
287 || {
288 warn!(
289 elapsed = ?start_time.elapsed(),
290 sink_id = self.handle_manager.param.sink_id.sink_id,
291 "committing"
292 );
293 },
294 )
295 .await
296 .map_err(|e| anyhow!(e))?;
297 self.handle_manager.ack_commit(epoch, requests.handle_ids)?;
298 }
299 }
300 }
301}