1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use futures::future::{Either, select};
24use futures::pin_mut;
25use itertools::Itertools;
26use risingwave_common::bail;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
29use risingwave_connector::dispatch_sink;
30use risingwave_connector::sink::{
31 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink,
32};
33use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
34use sea_orm::DatabaseConnection;
35use thiserror_ext::AsReport;
36use tokio::select;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
38use tokio::time::sleep;
39use tonic::Status;
40use tracing::{error, warn};
41
42use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
43
44async fn run_future_with_periodic_fn<F: Future>(
45 future: F,
46 interval: Duration,
47 mut f: impl FnMut(),
48) -> F::Output {
49 pin_mut!(future);
50 loop {
51 match select(&mut future, pin!(sleep(interval))).await {
52 Either::Left((output, _)) => {
53 break output;
54 }
55 Either::Right(_) => f(),
56 }
57 }
58}
59
60type HandleId = usize;
61
62#[derive(Default)]
63struct AligningRequests<R> {
64 requests: Vec<R>,
65 handle_ids: HashSet<HandleId>,
66 committed_bitmap: Option<Bitmap>, }
68
69impl<R> AligningRequests<R> {
70 fn add_new_request(
71 &mut self,
72 handle_id: HandleId,
73 request: R,
74 vnode_bitmap: &Bitmap,
75 ) -> anyhow::Result<()>
76 where
77 R: Debug,
78 {
79 let committed_bitmap = self
80 .committed_bitmap
81 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
82 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
83
84 let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
85 if check_bitmap.count_ones() > 0 {
86 return Err(anyhow!(
87 "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
88 check_bitmap.iter_ones().collect_vec(),
89 vnode_bitmap,
90 committed_bitmap,
91 self.requests,
92 request
93 ));
94 }
95 *committed_bitmap |= vnode_bitmap;
96 self.requests.push(request);
97 assert!(self.handle_ids.insert(handle_id));
98 Ok(())
99 }
100
101 fn aligned(&self) -> bool {
102 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
103 }
104}
105
106struct CoordinationHandleManager {
107 param: SinkParam,
108 writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
109 next_handle_id: HandleId,
110 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
111}
112
113impl CoordinationHandleManager {
114 fn start(
115 &mut self,
116 log_store_rewind_start_epoch: Option<u64>,
117 handle_ids: impl IntoIterator<Item = HandleId>,
118 ) -> anyhow::Result<()> {
119 for handle_id in handle_ids {
120 let handle = self
121 .writer_handles
122 .get_mut(&handle_id)
123 .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
124 handle.start(log_store_rewind_start_epoch).map_err(|_| {
125 anyhow!(
126 "fail to start {:?} for handle {}",
127 log_store_rewind_start_epoch,
128 handle_id
129 )
130 })?;
131 }
132 Ok(())
133 }
134
135 fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
136 for (handle_id, handle) in &mut self.writer_handles {
137 handle
138 .ack_aligned_initial_epoch(aligned_initial_epoch)
139 .map_err(|_| {
140 anyhow!(
141 "fail to ack_aligned_initial_epoch {:?} for handle {}",
142 aligned_initial_epoch,
143 handle_id
144 )
145 })?;
146 }
147 Ok(())
148 }
149
150 fn ack_commit(
151 &mut self,
152 epoch: u64,
153 handle_ids: impl IntoIterator<Item = HandleId>,
154 ) -> anyhow::Result<()> {
155 for handle_id in handle_ids {
156 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
157 anyhow!(
158 "fail to find handle for {} when ack commit on epoch {}",
159 handle_id,
160 epoch
161 )
162 })?;
163 handle.ack_commit(epoch).map_err(|_| {
164 anyhow!(
165 "fail to ack commit on epoch {} for handle {}",
166 epoch,
167 handle_id
168 )
169 })?;
170 }
171 Ok(())
172 }
173
174 async fn next_request_inner(
175 writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
176 ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
177 poll_fn(|cx| {
178 for (handle_id, handle) in writer_handles.iter_mut() {
179 if let Poll::Ready(result) = handle.poll_next_request(cx) {
180 return Poll::Ready(result.map(|request| (*handle_id, request)));
181 }
182 }
183 Poll::Pending
184 })
185 .await
186 }
187}
188
189enum CoordinationHandleManagerEvent {
190 NewHandle,
191 UpdateVnodeBitmap,
192 Stop,
193 CommitRequest { epoch: u64, metadata: SinkMetadata },
194 AlignInitialEpoch(u64),
195}
196
197impl CoordinationHandleManagerEvent {
198 fn name(&self) -> &'static str {
199 match self {
200 CoordinationHandleManagerEvent::NewHandle => "NewHandle",
201 CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
202 CoordinationHandleManagerEvent::Stop => "Stop",
203 CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
204 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
205 }
206 }
207}
208
209impl CoordinationHandleManager {
210 async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
211 {
212 select! {
213 handle = self.request_rx.recv() => {
214 let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
215 if handle.param() != &self.param {
216 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
217 }
218 let handle_id = self.next_handle_id;
219 self.next_handle_id += 1;
220 self.writer_handles.insert(handle_id, handle);
221 Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
222 }
223 result = Self::next_request_inner(&mut self.writer_handles) => {
224 let (handle_id, request) = result?;
225 let event = match request {
226 coordinate_request::Msg::CommitRequest(request) => {
227 CoordinationHandleManagerEvent::CommitRequest {
228 epoch: request.epoch,
229 metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
230 }
231 }
232 coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
233 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
234 }
235 coordinate_request::Msg::UpdateVnodeRequest(_) => {
236 CoordinationHandleManagerEvent::UpdateVnodeBitmap
237 }
238 coordinate_request::Msg::Stop(_) => {
239 CoordinationHandleManagerEvent::Stop
240 }
241 coordinate_request::Msg::StartRequest(_) => {
242 unreachable!("should have been handled");
243 }
244 };
245 Ok((handle_id, event))
246 }
247 }
248 }
249 }
250
251 fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
252 self.writer_handles[&handle_id].vnode_bitmap()
253 }
254
255 fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
256 self.writer_handles
257 .remove(&handle_id)
258 .expect("should exist")
259 .stop()
260 }
261
262 async fn wait_init_handles(
263 &mut self,
264 log_store_rewind_start_epoch: Option<u64>,
265 ) -> anyhow::Result<HashSet<HandleId>> {
266 assert!(self.writer_handles.is_empty());
267 let mut init_requests = AligningRequests::default();
268 while !init_requests.aligned() {
269 let (handle_id, event) = self.next_event().await?;
270 let unexpected_event = match event {
271 CoordinationHandleManagerEvent::NewHandle => {
272 init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
273 continue;
274 }
275 event => event.name(),
276 };
277 return Err(anyhow!(
278 "expect new handle during init, but got {}",
279 unexpected_event
280 ));
281 }
282 self.start(
283 log_store_rewind_start_epoch,
284 init_requests.handle_ids.iter().cloned(),
285 )?;
286 if log_store_rewind_start_epoch.is_none() {
287 let mut align_requests = AligningRequests::default();
288 while !align_requests.aligned() {
289 let (handle_id, event) = self.next_event().await?;
290 match event {
291 CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
292 align_requests.add_new_request(
293 handle_id,
294 initial_epoch,
295 self.vnode_bitmap(handle_id),
296 )?;
297 }
298 other => {
299 return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
300 }
301 }
302 }
303 let aligned_initial_epoch = align_requests
304 .requests
305 .into_iter()
306 .max()
307 .expect("non-empty");
308 self.ack_aligned_initial_epoch(aligned_initial_epoch)?;
309 }
310 Ok(init_requests.handle_ids)
311 }
312
313 async fn alter_parallelisms(
314 &mut self,
315 altered_handles: impl Iterator<Item = HandleId>,
316 prev_commit_epoch: u64,
317 ) -> anyhow::Result<HashSet<HandleId>> {
318 let mut requests = AligningRequests::default();
319 for handle_id in altered_handles {
320 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
321 }
322 let mut remaining_handles: HashSet<_> = self
323 .writer_handles
324 .keys()
325 .filter(|handle_id| !requests.handle_ids.contains(handle_id))
326 .cloned()
327 .collect();
328 while !remaining_handles.is_empty() || !requests.aligned() {
329 let (handle_id, event) = self.next_event().await?;
330 match event {
331 CoordinationHandleManagerEvent::NewHandle => {
332 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
333 }
334 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
335 assert!(remaining_handles.remove(&handle_id));
336 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
337 }
338 CoordinationHandleManagerEvent::Stop => {
339 assert!(remaining_handles.remove(&handle_id));
340 self.stop_handle(handle_id)?;
341 }
342 CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
343 bail!(
344 "receive commit request on epoch {} from handle {} during alter parallelism",
345 epoch,
346 handle_id
347 );
348 }
349 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
350 bail!(
351 "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
352 epoch,
353 handle_id
354 );
355 }
356 }
357 }
358 self.start(Some(prev_commit_epoch), requests.handle_ids.iter().cloned())?;
359 Ok(requests.handle_ids)
360 }
361}
362
363pub struct CoordinatorWorker {
364 handle_manager: CoordinationHandleManager,
365}
366
367impl CoordinatorWorker {
368 pub async fn run(
369 param: SinkParam,
370 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
371 db: DatabaseConnection,
372 subscriber: SinkCommittedEpochSubscriber,
373 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
374 ) {
375 let sink = match build_sink(param.clone()) {
376 Ok(sink) => sink,
377 Err(e) => {
378 error!(
379 error = %e.as_report(),
380 "unable to build sink with param {:?}",
381 param
382 );
383 return;
384 }
385 };
386
387 dispatch_sink!(sink, sink, {
388 let coordinator = match sink
389 .new_coordinator(db, Some(iceberg_compact_stat_sender))
390 .await
391 {
392 Ok(coordinator) => coordinator,
393 Err(e) => {
394 error!(
395 error = %e.as_report(),
396 "unable to build coordinator with param {:?}",
397 param
398 );
399 return;
400 }
401 };
402 Self::execute_coordinator(param, request_rx, coordinator, subscriber).await
403 });
404 }
405
406 pub async fn execute_coordinator(
407 param: SinkParam,
408 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
409 coordinator: impl SinkCommitCoordinator,
410 subscriber: SinkCommittedEpochSubscriber,
411 ) {
412 let mut worker = CoordinatorWorker {
413 handle_manager: CoordinationHandleManager {
414 param,
415 writer_handles: HashMap::new(),
416 next_handle_id: 0,
417 request_rx,
418 },
419 };
420
421 if let Err(e) = worker.run_coordination(coordinator, subscriber).await {
422 for handle in worker.handle_manager.writer_handles.into_values() {
423 handle.abort(Status::internal(format!(
424 "failed to run coordination: {:?}",
425 e.as_report()
426 )))
427 }
428 }
429 }
430
431 async fn run_coordination(
432 &mut self,
433 mut coordinator: impl SinkCommitCoordinator,
434 subscriber: SinkCommittedEpochSubscriber,
435 ) -> anyhow::Result<()> {
436 let initial_log_store_rewind_start_epoch = coordinator.init(subscriber).await?;
437 let sink_id = self.handle_manager.param.sink_id;
438 let mut running_handles = self
439 .handle_manager
440 .wait_init_handles(initial_log_store_rewind_start_epoch)
441 .await?;
442 let mut pending_epochs: BTreeMap<u64, AligningRequests<SinkMetadata>> = BTreeMap::new();
443 let mut pending_new_handles = vec![];
444 let mut prev_commit_epoch = None;
445 loop {
446 let (handle_id, event) = self.handle_manager.next_event().await?;
447 let (epoch, metadata) = match event {
448 CoordinationHandleManagerEvent::NewHandle => {
449 pending_new_handles.push(handle_id);
450 continue;
451 }
452 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
453 running_handles = self
454 .handle_manager
455 .alter_parallelisms(
456 pending_new_handles.drain(..).chain([handle_id]),
457 prev_commit_epoch.ok_or_else(|| {
458 anyhow!("should have committed once on alter parallelisms")
459 })?,
460 )
461 .await?;
462 continue;
463 }
464 CoordinationHandleManagerEvent::Stop => {
465 self.handle_manager.stop_handle(handle_id)?;
466 running_handles = self
467 .handle_manager
468 .alter_parallelisms(
469 pending_new_handles.drain(..),
470 prev_commit_epoch.ok_or_else(|| {
471 anyhow!("should have committed once on alter parallelisms")
472 })?,
473 )
474 .await?;
475 continue;
476 }
477 CoordinationHandleManagerEvent::CommitRequest { epoch, metadata } => {
478 (epoch, metadata)
479 }
480 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
481 bail!("receive AlignInitialEpoch after initialization")
482 }
483 };
484 if !running_handles.contains(&handle_id) {
485 bail!(
486 "receiving commit request from non-running handle {}, running handles: {:?}",
487 handle_id,
488 running_handles
489 );
490 }
491 pending_epochs.entry(epoch).or_default().add_new_request(
492 handle_id,
493 metadata,
494 self.handle_manager.vnode_bitmap(handle_id),
495 )?;
496 if pending_epochs
497 .first_key_value()
498 .expect("non-empty")
499 .1
500 .aligned()
501 {
502 let (epoch, requests) = pending_epochs.pop_first().expect("non-empty");
503
504 let start_time = Instant::now();
505 run_future_with_periodic_fn(
506 coordinator.commit(epoch, requests.requests),
507 Duration::from_secs(5),
508 || {
509 warn!(
510 elapsed = ?start_time.elapsed(),
511 sink_id = sink_id.sink_id,
512 "committing"
513 );
514 },
515 )
516 .await
517 .map_err(|e| anyhow!(e))?;
518 self.handle_manager.ack_commit(epoch, requests.handle_ids)?;
519 prev_commit_epoch = Some(epoch);
520 }
521 }
522 }
523}