1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use futures::future::{Either, pending, select};
24use futures::pin_mut;
25use itertools::Itertools;
26use risingwave_common::bail;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::Field;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::boxed::BoxTwoPhaseCoordinator;
32use risingwave_connector::sink::catalog::SinkId;
33use risingwave_connector::sink::{
34 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
35};
36use risingwave_meta_model::pending_sink_state::SinkState;
37use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48 clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49 persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54 future: F,
55 interval: Duration,
56 mut f: impl FnMut(),
57) -> F::Output {
58 pin_mut!(future);
59 loop {
60 match select(&mut future, pin!(sleep(interval))).await {
61 Either::Left((output, _)) => {
62 break output;
63 }
64 Either::Right(_) => f(),
65 }
66 }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73 requests: Vec<R>,
74 handle_ids: HashSet<HandleId>,
75 committed_bitmap: Option<Bitmap>, }
77
78impl<R> AligningRequests<R> {
79 fn add_new_request(
80 &mut self,
81 handle_id: HandleId,
82 request: R,
83 vnode_bitmap: &Bitmap,
84 ) -> anyhow::Result<()>
85 where
86 R: Debug,
87 {
88 let committed_bitmap = self
89 .committed_bitmap
90 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93 let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94 if check_bitmap.count_ones() > 0 {
95 return Err(anyhow!(
96 "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97 check_bitmap.iter_ones().collect_vec(),
98 vnode_bitmap,
99 committed_bitmap,
100 self.requests,
101 request
102 ));
103 }
104 *committed_bitmap |= vnode_bitmap;
105 self.requests.push(request);
106 assert!(self.handle_ids.insert(handle_id));
107 Ok(())
108 }
109
110 fn aligned(&self) -> bool {
111 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112 }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119 db: DatabaseConnection,
120 sink_id: SinkId,
121 curr_hummock_committed_epoch: u64,
122 job_committed_epoch_rx: UnboundedReceiver<u64>,
123 last_committed_epoch: Option<u64>,
124 pending_epochs: VecDeque<(u64, Vec<u8>)>,
125 prepared_epochs: VecDeque<(u64, Vec<u8>)>,
126 backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130 fn new(
131 db: DatabaseConnection,
132 sink_id: SinkId,
133 initial_hummock_committed_epoch: u64,
134 job_committed_epoch_rx: UnboundedReceiver<u64>,
135 last_committed_epoch: Option<u64>,
136 ) -> Self {
137 Self {
138 db,
139 sink_id,
140 curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141 job_committed_epoch_rx,
142 last_committed_epoch,
143 pending_epochs: VecDeque::new(),
144 prepared_epochs: VecDeque::new(),
145 backoff_state: None,
146 }
147 }
148
149 #[define_opaque(RetryBackoffStrategy)]
150 fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151 ExponentialBackoff::from_millis(10)
152 .max_delay(Duration::from_secs(60))
153 .map(jitter)
154 .map(|delay| Box::pin(tokio::time::sleep(delay)))
155 }
156
157 async fn next_to_commit(&mut self) -> anyhow::Result<(u64, Vec<u8>)> {
158 loop {
159 let wait_backoff = async {
160 if self.prepared_epochs.is_empty() {
161 pending::<()>().await;
162 } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
163 backoff_fut.await;
164 }
165 };
166
167 select! {
168 _ = wait_backoff => {
169 let (epoch, metadata) = self.prepared_epochs.front().cloned().expect("non-empty");
170 return Ok((epoch, metadata));
171 }
172
173 recv_epoch = self.job_committed_epoch_rx.recv() => {
174 let Some(recv_epoch) = recv_epoch else {
175 return Err(anyhow!(
176 "Hummock committed epoch sender closed unexpectedly"
177 ));
178 };
179 self.curr_hummock_committed_epoch = recv_epoch;
180 while let Some((epoch, metadata)) = self.pending_epochs.pop_front_if(|(epoch, _)| *epoch <= recv_epoch) {
181 if let Some((last_epoch, _)) = self.prepared_epochs.back() {
182 assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
183 }
184 self.prepared_epochs.push_back((epoch, metadata));
185 }
186 }
187 }
188 }
189 }
190
191 fn push_new_item(&mut self, epoch: u64, metadata: Vec<u8>) {
192 if epoch > self.curr_hummock_committed_epoch {
193 if let Some((last_epoch, _)) = self.pending_epochs.back() {
194 assert!(
195 epoch > *last_epoch,
196 "pending epochs must be in increasing order"
197 );
198 }
199 self.pending_epochs.push_back((epoch, metadata));
200 } else {
201 assert!(self.pending_epochs.is_empty());
202 if let Some((last_epoch, _)) = self.prepared_epochs.back() {
203 assert!(
204 epoch > *last_epoch,
205 "prepared epochs must be in increasing order"
206 );
207 }
208 self.prepared_epochs.push_back((epoch, metadata));
209 }
210 }
211
212 async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
213 self.backoff_state = None;
214 let (last_epoch, _) = self.prepared_epochs.pop_front().expect("non-empty");
215 assert_eq!(last_epoch, epoch);
216
217 commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
218 self.last_committed_epoch = Some(epoch);
219 Ok(())
220 }
221
222 fn failed_committed(&mut self, epoch: u64, err: SinkError) {
223 assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
224 if let Some((prev_fut, strategy)) = &mut self.backoff_state {
225 let new_fut = strategy.next().expect("infinite");
226 *prev_fut = new_fut;
227 } else {
228 let mut strategy = Self::get_retry_backoff_strategy();
229 let backoff_fut = strategy.next().expect("infinite");
230 self.backoff_state = Some((backoff_fut, strategy));
231 }
232 tracing::error!(
233 error = %err.as_report(),
234 %self.sink_id,
235 "failed to commit epoch {}, Retrying after backoff",
236 epoch,
237 );
238 }
239
240 fn is_empty(&self) -> bool {
241 self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
242 }
243}
244
245struct CoordinationHandleManager {
246 param: SinkParam,
247 writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
248 next_handle_id: HandleId,
249 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
250}
251
252impl CoordinationHandleManager {
253 fn start(
254 &mut self,
255 log_store_rewind_start_epoch: Option<u64>,
256 handle_ids: impl IntoIterator<Item = HandleId>,
257 ) -> anyhow::Result<()> {
258 for handle_id in handle_ids {
259 let handle = self
260 .writer_handles
261 .get_mut(&handle_id)
262 .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
263 handle.start(log_store_rewind_start_epoch).map_err(|_| {
264 anyhow!(
265 "fail to start {:?} for handle {}",
266 log_store_rewind_start_epoch,
267 handle_id
268 )
269 })?;
270 }
271 Ok(())
272 }
273
274 fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
275 for (handle_id, handle) in &mut self.writer_handles {
276 handle
277 .ack_aligned_initial_epoch(aligned_initial_epoch)
278 .map_err(|_| {
279 anyhow!(
280 "fail to ack_aligned_initial_epoch {:?} for handle {}",
281 aligned_initial_epoch,
282 handle_id
283 )
284 })?;
285 }
286 Ok(())
287 }
288
289 fn ack_commit(
290 &mut self,
291 epoch: u64,
292 handle_ids: impl IntoIterator<Item = HandleId>,
293 ) -> anyhow::Result<()> {
294 for handle_id in handle_ids {
295 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
296 anyhow!(
297 "fail to find handle for {} when ack commit on epoch {}",
298 handle_id,
299 epoch
300 )
301 })?;
302 handle.ack_commit(epoch).map_err(|_| {
303 anyhow!(
304 "fail to ack commit on epoch {} for handle {}",
305 epoch,
306 handle_id
307 )
308 })?;
309 }
310 Ok(())
311 }
312
313 async fn next_request_inner(
314 writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
315 ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
316 poll_fn(|cx| {
317 for (handle_id, handle) in writer_handles.iter_mut() {
318 if let Poll::Ready(result) = handle.poll_next_request(cx) {
319 return Poll::Ready(result.map(|request| (*handle_id, request)));
320 }
321 }
322 Poll::Pending
323 })
324 .await
325 }
326}
327
328enum CoordinationHandleManagerEvent {
329 NewHandle,
330 UpdateVnodeBitmap,
331 Stop,
332 CommitRequest {
333 epoch: u64,
334 metadata: SinkMetadata,
335 add_columns: Option<Vec<Field>>,
336 },
337 AlignInitialEpoch(u64),
338}
339
340impl CoordinationHandleManagerEvent {
341 fn name(&self) -> &'static str {
342 match self {
343 CoordinationHandleManagerEvent::NewHandle => "NewHandle",
344 CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
345 CoordinationHandleManagerEvent::Stop => "Stop",
346 CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
347 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
348 }
349 }
350}
351
352impl CoordinationHandleManager {
353 async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
354 select! {
355 handle = self.request_rx.recv() => {
356 let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
357 if handle.param() != &self.param {
358 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
359 }
360 let handle_id = self.next_handle_id;
361 self.next_handle_id += 1;
362 self.writer_handles.insert(handle_id, handle);
363 Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
364 }
365 result = Self::next_request_inner(&mut self.writer_handles) => {
366 let (handle_id, request) = result?;
367 let event = match request {
368 coordinate_request::Msg::CommitRequest(request) => {
369 CoordinationHandleManagerEvent::CommitRequest {
370 epoch: request.epoch,
371 metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
372 add_columns: request.add_columns.map(|add_columns| add_columns.fields.into_iter().map(|field| Field::from_prost(&field)).collect()),
373 }
374 }
375 coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
376 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
377 }
378 coordinate_request::Msg::UpdateVnodeRequest(_) => {
379 CoordinationHandleManagerEvent::UpdateVnodeBitmap
380 }
381 coordinate_request::Msg::Stop(_) => {
382 CoordinationHandleManagerEvent::Stop
383 }
384 coordinate_request::Msg::StartRequest(_) => {
385 unreachable!("should have been handled");
386 }
387 };
388 Ok((handle_id, event))
389 }
390 }
391 }
392
393 fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
394 self.writer_handles[&handle_id].vnode_bitmap()
395 }
396
397 fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
398 self.writer_handles
399 .remove(&handle_id)
400 .expect("should exist")
401 .stop()
402 }
403
404 async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
405 assert!(self.writer_handles.is_empty());
406 let mut init_requests = AligningRequests::default();
407 while !init_requests.aligned() {
408 let (handle_id, event) = self.next_event().await?;
409 let unexpected_event = match event {
410 CoordinationHandleManagerEvent::NewHandle => {
411 init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
412 continue;
413 }
414 event => event.name(),
415 };
416 return Err(anyhow!(
417 "expect new handle during init, but got {}",
418 unexpected_event
419 ));
420 }
421 Ok(init_requests.handle_ids)
422 }
423
424 async fn alter_parallelisms(
425 &mut self,
426 altered_handles: impl Iterator<Item = HandleId>,
427 ) -> anyhow::Result<HashSet<HandleId>> {
428 let mut requests = AligningRequests::default();
429 for handle_id in altered_handles {
430 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
431 }
432 let mut remaining_handles: HashSet<_> = self
433 .writer_handles
434 .keys()
435 .filter(|handle_id| !requests.handle_ids.contains(handle_id))
436 .cloned()
437 .collect();
438 while !remaining_handles.is_empty() || !requests.aligned() {
439 let (handle_id, event) = self.next_event().await?;
440 match event {
441 CoordinationHandleManagerEvent::NewHandle => {
442 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
443 }
444 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
445 assert!(remaining_handles.remove(&handle_id));
446 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
447 }
448 CoordinationHandleManagerEvent::Stop => {
449 assert!(remaining_handles.remove(&handle_id));
450 self.stop_handle(handle_id)?;
451 }
452 CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
453 bail!(
454 "receive commit request on epoch {} from handle {} during alter parallelism",
455 epoch,
456 handle_id
457 );
458 }
459 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
460 bail!(
461 "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
462 epoch,
463 handle_id
464 );
465 }
466 }
467 }
468 Ok(requests.handle_ids)
469 }
470}
471
472enum CoordinatorWorkerState {
478 Running,
479 WaitingForFlushed(HashSet<HandleId>),
480}
481
482pub struct CoordinatorWorker {
483 handle_manager: CoordinationHandleManager,
484 prev_committed_epoch: Option<u64>,
485 curr_state: CoordinatorWorkerState,
486}
487
488enum CoordinatorWorkerEvent {
489 HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
490 ReadyToCommit(u64, Vec<u8>),
491}
492
493impl CoordinatorWorker {
494 pub async fn run(
495 param: SinkParam,
496 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
497 db: DatabaseConnection,
498 subscriber: SinkCommittedEpochSubscriber,
499 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
500 ) {
501 let sink = match build_sink(param.clone()) {
502 Ok(sink) => sink,
503 Err(e) => {
504 error!(
505 error = %e.as_report(),
506 "unable to build sink with param {:?}",
507 param
508 );
509 return;
510 }
511 };
512
513 dispatch_sink!(sink, sink, {
514 let coordinator = match sink
515 .new_coordinator(Some(iceberg_compact_stat_sender))
516 .await
517 {
518 Ok(coordinator) => coordinator,
519 Err(e) => {
520 error!(
521 error = %e.as_report(),
522 "unable to build coordinator with param {:?}",
523 param
524 );
525 return;
526 }
527 };
528 Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
529 });
530 }
531
532 pub async fn execute_coordinator(
533 db: DatabaseConnection,
534 param: SinkParam,
535 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
536 coordinator: SinkCommitCoordinator,
537 subscriber: SinkCommittedEpochSubscriber,
538 ) {
539 let mut worker = CoordinatorWorker {
540 handle_manager: CoordinationHandleManager {
541 param,
542 writer_handles: HashMap::new(),
543 next_handle_id: 0,
544 request_rx,
545 },
546 prev_committed_epoch: None,
547 curr_state: CoordinatorWorkerState::Running,
548 };
549
550 if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
551 for handle in worker.handle_manager.writer_handles.into_values() {
552 handle.abort(Status::internal(format!(
553 "failed to run coordination: {:?}",
554 e.as_report()
555 )))
556 }
557 }
558 }
559
560 async fn try_handle_init_requests(
561 &mut self,
562 pending_handle_ids: &HashSet<HandleId>,
563 two_phase_handler: Option<&mut TwoPhaseCommitHandler>,
564 ) -> anyhow::Result<()> {
565 assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
566 if let Some(two_phase_handler) = two_phase_handler
567 && !two_phase_handler.is_empty()
568 {
569 self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
571 } else {
572 self.handle_init_requests_impl(pending_handle_ids.clone())
573 .await?;
574 }
575 Ok(())
576 }
577
578 async fn handle_init_requests_impl(
579 &mut self,
580 pending_handle_ids: impl IntoIterator<Item = HandleId>,
581 ) -> anyhow::Result<()> {
582 let log_store_rewind_start_epoch = self.prev_committed_epoch;
583 self.handle_manager
584 .start(log_store_rewind_start_epoch, pending_handle_ids)?;
585 if log_store_rewind_start_epoch.is_none() {
586 let mut align_requests = AligningRequests::default();
587 while !align_requests.aligned() {
588 let (handle_id, event) = self.handle_manager.next_event().await?;
589 match event {
590 CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
591 align_requests.add_new_request(
592 handle_id,
593 initial_epoch,
594 self.handle_manager.vnode_bitmap(handle_id),
595 )?;
596 }
597 other => {
598 return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
599 }
600 }
601 }
602 let aligned_initial_epoch = align_requests
603 .requests
604 .into_iter()
605 .max()
606 .expect("non-empty");
607 self.handle_manager
608 .ack_aligned_initial_epoch(aligned_initial_epoch)?;
609 }
610 Ok(())
611 }
612
613 async fn next_event(
614 &mut self,
615 mut two_phase_handler: Option<&mut TwoPhaseCommitHandler>,
616 ) -> anyhow::Result<CoordinatorWorkerEvent> {
617 if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state {
618 let handler = two_phase_handler
619 .as_mut()
620 .expect("two-phase handler should exist when waiting for flush");
621 if handler.is_empty() {
622 let pending_handle_ids = pending_handle_ids.clone();
623 self.handle_init_requests_impl(pending_handle_ids).await?;
624 self.curr_state = CoordinatorWorkerState::Running;
625 }
626 }
627
628 let two_phase_next_fut = async {
630 if let Some(handler) = two_phase_handler {
631 handler.next_to_commit().await
632 } else {
633 pending().await
634 }
635 };
636 select! {
637 next_handle_event = self.handle_manager.next_event() => {
638 let (handle_id, event) = next_handle_event?;
639 Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
640 }
641
642 next_item_to_commit = two_phase_next_fut => {
643 let (epoch, metadata) = next_item_to_commit?;
644 Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata))
645 }
646 }
647 }
648
649 async fn run_coordination(
650 &mut self,
651 db: DatabaseConnection,
652 mut coordinator: SinkCommitCoordinator,
653 subscriber: SinkCommittedEpochSubscriber,
654 ) -> anyhow::Result<()> {
655 let sink_id = self.handle_manager.param.sink_id;
656
657 let mut two_phase_handler = match &mut coordinator {
658 SinkCommitCoordinator::SinglePhase(coordinator) => {
659 coordinator.init().await?;
660 None
661 }
662 SinkCommitCoordinator::TwoPhase(coordinator) => {
663 let two_phase_handler = self
664 .init_state_from_store(&db, sink_id, subscriber, coordinator)
665 .await?;
666 coordinator.init().await?;
667 Some(two_phase_handler)
668 }
669 };
670
671 let mut running_handles = self.handle_manager.wait_init_handles().await?;
672 self.try_handle_init_requests(&running_handles, two_phase_handler.as_mut())
673 .await?;
674
675 let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
676 let mut pending_new_handles = vec![];
677 loop {
678 let event = self.next_event(two_phase_handler.as_mut()).await?;
679 let (handle_id, epoch, commit_request) = match event {
680 CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
681 CoordinationHandleManagerEvent::NewHandle => {
682 pending_new_handles.push(handle_id);
683 continue;
684 }
685 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
686 running_handles = self
687 .handle_manager
688 .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
689 .await?;
690 self.try_handle_init_requests(&running_handles, two_phase_handler.as_mut())
691 .await?;
692 continue;
693 }
694 CoordinationHandleManagerEvent::Stop => {
695 self.handle_manager.stop_handle(handle_id)?;
696 running_handles = self
697 .handle_manager
698 .alter_parallelisms(pending_new_handles.drain(..))
699 .await?;
700 self.try_handle_init_requests(&running_handles, two_phase_handler.as_mut())
701 .await?;
702
703 continue;
704 }
705 CoordinationHandleManagerEvent::CommitRequest {
706 epoch,
707 metadata,
708 add_columns,
709 } => (handle_id, epoch, (metadata, add_columns)),
710 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
711 bail!("receive AlignInitialEpoch after initialization")
712 }
713 },
714 CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata) => {
715 let SinkCommitCoordinator::TwoPhase(coordinator) = &mut coordinator else {
716 unreachable!("should be two-phase commit coordinator");
717 };
718 let two_phase_handler = two_phase_handler.as_mut().expect("should exist");
719 let start_time = Instant::now();
720 let commit_res = run_future_with_periodic_fn(
721 coordinator.commit(epoch, metadata),
722 Duration::from_secs(5),
723 || {
724 warn!(
725 elapsed = ?start_time.elapsed(),
726 %sink_id,
727 "committing"
728 );
729 },
730 )
731 .await;
732
733 match commit_res {
734 Ok(_) => {
735 two_phase_handler.ack_committed(epoch).await?;
736 self.prev_committed_epoch = Some(epoch);
737 }
738 Err(e) => {
739 two_phase_handler.failed_committed(epoch, e);
740 }
741 }
742
743 continue;
744 }
745 };
746 if !running_handles.contains(&handle_id) {
747 bail!(
748 "receiving commit request from non-running handle {}, running handles: {:?}",
749 handle_id,
750 running_handles
751 );
752 }
753 pending_epochs.entry(epoch).or_default().add_new_request(
754 handle_id,
755 commit_request,
756 self.handle_manager.vnode_bitmap(handle_id),
757 )?;
758 if pending_epochs
759 .first_key_value()
760 .expect("non-empty")
761 .1
762 .aligned()
763 {
764 let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
765 let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
766 let mut requests = commit_requests.requests.into_iter();
767 let (first_metadata, first_add_columns) = requests.next().expect("non-empty");
768 metadatas.push(first_metadata);
769 for (metadata, add_columns) in requests {
770 if first_add_columns != add_columns {
771 return Err(anyhow!(
772 "got different add columns {:?} to prev add columns {:?}",
773 add_columns,
774 first_add_columns
775 ));
776 }
777 metadatas.push(metadata);
778 }
779
780 match &mut coordinator {
781 SinkCommitCoordinator::SinglePhase(coordinator) => {
782 let start_time = Instant::now();
783 run_future_with_periodic_fn(
784 coordinator.commit(epoch, metadatas, first_add_columns),
785 Duration::from_secs(5),
786 || {
787 warn!(
788 elapsed = ?start_time.elapsed(),
789 %sink_id,
790 "committing"
791 );
792 },
793 )
794 .await
795 .map_err(|e| anyhow!(e))?;
796 self.handle_manager
797 .ack_commit(epoch, commit_requests.handle_ids)?;
798 self.prev_committed_epoch = Some(epoch);
799 }
800 SinkCommitCoordinator::TwoPhase(coordinator) => {
801 let commit_metadata = coordinator
802 .pre_commit(epoch, metadatas, first_add_columns)
803 .await?;
804 persist_pre_commit_metadata(
805 &db,
806 sink_id as _,
807 epoch,
808 commit_metadata.clone(),
809 )
810 .await?;
811 self.handle_manager
812 .ack_commit(epoch, commit_requests.handle_ids)?;
813
814 let two_phase_handler = two_phase_handler.as_mut().expect("should exist");
815 two_phase_handler.push_new_item(epoch, commit_metadata);
816 }
817 }
818 }
819 }
820 }
821
822 async fn init_state_from_store(
824 &mut self,
825 db: &DatabaseConnection,
826 sink_id: SinkId,
827 subscriber: SinkCommittedEpochSubscriber,
828 coordinator: &mut BoxTwoPhaseCoordinator,
829 ) -> anyhow::Result<TwoPhaseCommitHandler> {
830 let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
831
832 let mut metadata_iter = ordered_metadata.into_iter().peekable();
833 let last_committed_epoch = metadata_iter
834 .next_if(|(_, state, _)| matches!(state, SinkState::Committed))
835 .map(|(epoch, _, _)| epoch);
836 self.prev_committed_epoch = last_committed_epoch;
837
838 let pending_items = metadata_iter
839 .peeking_take_while(|(_, state, _)| matches!(state, SinkState::Pending))
840 .map(|(epoch, _, metadata)| (epoch, metadata))
841 .collect_vec();
842
843 let mut aborted_epochs = vec![];
844
845 for (epoch, state, metadata) in metadata_iter {
846 match state {
847 SinkState::Aborted => {
848 coordinator.abort(epoch, metadata).await;
849 aborted_epochs.push(epoch);
850 }
851 other => {
852 unreachable!(
853 "unexpected state {:?} after pending items at epoch {}",
854 other, epoch
855 );
856 }
857 }
858 }
859
860 clean_aborted_records(db, sink_id, aborted_epochs).await?;
862
863 let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
864 let mut two_phase_handler = TwoPhaseCommitHandler::new(
865 db.clone(),
866 sink_id,
867 initial_hummock_committed_epoch,
868 job_committed_epoch_rx,
869 last_committed_epoch,
870 );
871
872 for (epoch, metadata) in pending_items {
873 two_phase_handler.push_new_item(epoch, metadata);
874 }
875
876 Ok(two_phase_handler)
877 }
878}