1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::{Either, pending, select};
25use futures::pin_mut;
26use itertools::Itertools;
27use risingwave_common::bail;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::SinkId;
32use risingwave_connector::sink::{
33 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
34};
35use risingwave_meta_model::pending_sink_state::SinkState;
36use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
37use risingwave_pb::stream_plan::PbSinkSchemaChange;
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48 clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49 persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54 future: F,
55 interval: Duration,
56 mut f: impl FnMut(),
57) -> F::Output {
58 pin_mut!(future);
59 loop {
60 match select(&mut future, pin!(sleep(interval))).await {
61 Either::Left((output, _)) => {
62 break output;
63 }
64 Either::Right(_) => f(),
65 }
66 }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73 requests: Vec<R>,
74 handle_ids: HashSet<HandleId>,
75 committed_bitmap: Option<Bitmap>, }
77
78impl<R> AligningRequests<R> {
79 fn add_new_request(
80 &mut self,
81 handle_id: HandleId,
82 request: R,
83 vnode_bitmap: &Bitmap,
84 ) -> anyhow::Result<()>
85 where
86 R: Debug,
87 {
88 let committed_bitmap = self
89 .committed_bitmap
90 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93 let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94 if check_bitmap.count_ones() > 0 {
95 return Err(anyhow!(
96 "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97 check_bitmap.iter_ones().collect_vec(),
98 vnode_bitmap,
99 committed_bitmap,
100 self.requests,
101 request
102 ));
103 }
104 *committed_bitmap |= vnode_bitmap;
105 self.requests.push(request);
106 assert!(self.handle_ids.insert(handle_id));
107 Ok(())
108 }
109
110 fn aligned(&self) -> bool {
111 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112 }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119 db: DatabaseConnection,
120 sink_id: SinkId,
121 curr_hummock_committed_epoch: u64,
122 job_committed_epoch_rx: UnboundedReceiver<u64>,
123 last_committed_epoch: Option<u64>,
124 pending_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
125 prepared_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
126 backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130 fn new(
131 db: DatabaseConnection,
132 sink_id: SinkId,
133 initial_hummock_committed_epoch: u64,
134 job_committed_epoch_rx: UnboundedReceiver<u64>,
135 last_committed_epoch: Option<u64>,
136 ) -> Self {
137 Self {
138 db,
139 sink_id,
140 curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141 job_committed_epoch_rx,
142 last_committed_epoch,
143 pending_epochs: VecDeque::new(),
144 prepared_epochs: VecDeque::new(),
145 backoff_state: None,
146 }
147 }
148
149 #[define_opaque(RetryBackoffStrategy)]
150 fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151 ExponentialBackoff::from_millis(10)
152 .max_delay(Duration::from_secs(60))
153 .map(jitter)
154 .map(|delay| Box::pin(tokio::time::sleep(delay)))
155 }
156
157 async fn next_to_commit(
158 &mut self,
159 ) -> anyhow::Result<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)> {
160 loop {
161 let wait_backoff = async {
162 if self.prepared_epochs.is_empty() {
163 pending::<()>().await;
164 } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
165 backoff_fut.await;
166 }
167 };
168
169 select! {
170 _ = wait_backoff => {
171 let item = self.prepared_epochs.front().cloned().expect("non-empty");
172 return Ok(item);
173 }
174
175 recv_epoch = self.job_committed_epoch_rx.recv() => {
176 let Some(recv_epoch) = recv_epoch else {
177 return Err(anyhow!(
178 "Hummock committed epoch sender closed unexpectedly"
179 ));
180 };
181 self.curr_hummock_committed_epoch = recv_epoch;
182 while let Some((epoch, metadata, schema_change)) = self.pending_epochs.pop_front_if(|(epoch, _, _)| *epoch <= recv_epoch) {
183 if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
184 assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
185 }
186 self.prepared_epochs.push_back((epoch, metadata, schema_change));
187 }
188 }
189 }
190 }
191 }
192
193 fn push_new_item(
194 &mut self,
195 epoch: u64,
196 metadata: Option<Vec<u8>>,
197 schema_change: Option<PbSinkSchemaChange>,
198 ) {
199 if epoch > self.curr_hummock_committed_epoch {
200 if let Some((last_epoch, _, _)) = self.pending_epochs.back() {
201 assert!(
202 epoch > *last_epoch,
203 "pending epochs must be in increasing order"
204 );
205 }
206 self.pending_epochs
207 .push_back((epoch, metadata, schema_change));
208 } else {
209 assert!(self.pending_epochs.is_empty());
210 if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
211 assert!(
212 epoch > *last_epoch,
213 "prepared epochs must be in increasing order"
214 );
215 }
216 self.prepared_epochs
217 .push_back((epoch, metadata, schema_change));
218 }
219 }
220
221 async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
222 self.backoff_state = None;
223 let (last_epoch, _, _) = self.prepared_epochs.pop_front().expect("non-empty");
224 assert_eq!(last_epoch, epoch);
225
226 commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
227 self.last_committed_epoch = Some(epoch);
228 Ok(())
229 }
230
231 fn failed_committed(&mut self, epoch: u64, err: SinkError) {
232 assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
233 if let Some((prev_fut, strategy)) = &mut self.backoff_state {
234 let new_fut = strategy.next().expect("infinite");
235 *prev_fut = new_fut;
236 } else {
237 let mut strategy = Self::get_retry_backoff_strategy();
238 let backoff_fut = strategy.next().expect("infinite");
239 self.backoff_state = Some((backoff_fut, strategy));
240 }
241 tracing::error!(
242 error = %err.as_report(),
243 %self.sink_id,
244 "failed to commit epoch {}, Retrying after backoff",
245 epoch,
246 );
247 }
248
249 fn is_empty(&self) -> bool {
250 self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
251 }
252
253 fn has_uncommitted_schema_change(&self) -> bool {
258 if let Some((_, _, schema_change)) = self.pending_epochs.back() {
259 schema_change.is_some()
260 } else if let Some((_, _, schema_change)) = self.prepared_epochs.back() {
261 schema_change.is_some()
262 } else {
263 false
264 }
265 }
266}
267
268struct CoordinationHandleManager {
269 param: SinkParam,
270 writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
271 next_handle_id: HandleId,
272 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
273}
274
275impl CoordinationHandleManager {
276 fn start(
277 &mut self,
278 log_store_rewind_start_epoch: Option<u64>,
279 handle_ids: impl IntoIterator<Item = HandleId>,
280 ) -> anyhow::Result<()> {
281 for handle_id in handle_ids {
282 let handle = self
283 .writer_handles
284 .get_mut(&handle_id)
285 .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
286 handle.start(log_store_rewind_start_epoch).map_err(|_| {
287 anyhow!(
288 "fail to start {:?} for handle {}",
289 log_store_rewind_start_epoch,
290 handle_id
291 )
292 })?;
293 }
294 Ok(())
295 }
296
297 fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
298 for (handle_id, handle) in &mut self.writer_handles {
299 handle
300 .ack_aligned_initial_epoch(aligned_initial_epoch)
301 .map_err(|_| {
302 anyhow!(
303 "fail to ack_aligned_initial_epoch {:?} for handle {}",
304 aligned_initial_epoch,
305 handle_id
306 )
307 })?;
308 }
309 Ok(())
310 }
311
312 fn ack_commit(
313 &mut self,
314 epoch: u64,
315 handle_ids: impl IntoIterator<Item = HandleId>,
316 ) -> anyhow::Result<()> {
317 for handle_id in handle_ids {
318 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
319 anyhow!(
320 "fail to find handle for {} when ack commit on epoch {}",
321 handle_id,
322 epoch
323 )
324 })?;
325 handle.ack_commit(epoch).map_err(|_| {
326 anyhow!(
327 "fail to ack commit on epoch {} for handle {}",
328 epoch,
329 handle_id
330 )
331 })?;
332 }
333 Ok(())
334 }
335
336 async fn next_request_inner(
337 writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
338 ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
339 poll_fn(|cx| {
340 for (handle_id, handle) in writer_handles.iter_mut() {
341 if let Poll::Ready(result) = handle.poll_next_request(cx) {
342 return Poll::Ready(result.map(|request| (*handle_id, request)));
343 }
344 }
345 Poll::Pending
346 })
347 .await
348 }
349}
350
351enum CoordinationHandleManagerEvent {
352 NewHandle,
353 UpdateVnodeBitmap,
354 Stop,
355 CommitRequest {
356 epoch: u64,
357 metadata: SinkMetadata,
358 schema_change: Option<PbSinkSchemaChange>,
359 },
360 AlignInitialEpoch(u64),
361}
362
363impl CoordinationHandleManagerEvent {
364 fn name(&self) -> &'static str {
365 match self {
366 CoordinationHandleManagerEvent::NewHandle => "NewHandle",
367 CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
368 CoordinationHandleManagerEvent::Stop => "Stop",
369 CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
370 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
371 }
372 }
373}
374
375impl CoordinationHandleManager {
376 async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
377 select! {
378 handle = self.request_rx.recv() => {
379 let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
380 if handle.param() != &self.param {
381 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
382 }
383 let handle_id = self.next_handle_id;
384 self.next_handle_id += 1;
385 self.writer_handles.insert(handle_id, handle);
386 Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
387 }
388 result = Self::next_request_inner(&mut self.writer_handles) => {
389 let (handle_id, request) = result?;
390 let event = match request {
391 coordinate_request::Msg::CommitRequest(request) => {
392 CoordinationHandleManagerEvent::CommitRequest {
393 epoch: request.epoch,
394 metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
395 schema_change: request.schema_change,
396 }
397 }
398 coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
399 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
400 }
401 coordinate_request::Msg::UpdateVnodeRequest(_) => {
402 CoordinationHandleManagerEvent::UpdateVnodeBitmap
403 }
404 coordinate_request::Msg::Stop(_) => {
405 CoordinationHandleManagerEvent::Stop
406 }
407 coordinate_request::Msg::StartRequest(_) => {
408 unreachable!("should have been handled");
409 }
410 };
411 Ok((handle_id, event))
412 }
413 }
414 }
415
416 fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
417 self.writer_handles[&handle_id].vnode_bitmap()
418 }
419
420 fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
421 self.writer_handles
422 .remove(&handle_id)
423 .expect("should exist")
424 .stop()
425 }
426
427 async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
428 assert!(self.writer_handles.is_empty());
429 let mut init_requests = AligningRequests::default();
430 while !init_requests.aligned() {
431 let (handle_id, event) = self.next_event().await?;
432 let unexpected_event = match event {
433 CoordinationHandleManagerEvent::NewHandle => {
434 init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
435 continue;
436 }
437 event => event.name(),
438 };
439 return Err(anyhow!(
440 "expect new handle during init, but got {}",
441 unexpected_event
442 ));
443 }
444 Ok(init_requests.handle_ids)
445 }
446
447 async fn alter_parallelisms(
448 &mut self,
449 altered_handles: impl Iterator<Item = HandleId>,
450 ) -> anyhow::Result<HashSet<HandleId>> {
451 let mut requests = AligningRequests::default();
452 for handle_id in altered_handles {
453 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
454 }
455 let mut remaining_handles: HashSet<_> = self
456 .writer_handles
457 .keys()
458 .filter(|handle_id| !requests.handle_ids.contains(handle_id))
459 .cloned()
460 .collect();
461 while !remaining_handles.is_empty() || !requests.aligned() {
462 let (handle_id, event) = self.next_event().await?;
463 match event {
464 CoordinationHandleManagerEvent::NewHandle => {
465 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
466 }
467 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
468 assert!(remaining_handles.remove(&handle_id));
469 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
470 }
471 CoordinationHandleManagerEvent::Stop => {
472 assert!(remaining_handles.remove(&handle_id));
473 self.stop_handle(handle_id)?;
474 }
475 CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
476 bail!(
477 "receive commit request on epoch {} from handle {} during alter parallelism",
478 epoch,
479 handle_id
480 );
481 }
482 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
483 bail!(
484 "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
485 epoch,
486 handle_id
487 );
488 }
489 }
490 }
491 Ok(requests.handle_ids)
492 }
493}
494
495enum CoordinatorWorkerState {
501 Running,
502 WaitingForFlushed(HashSet<HandleId>),
503}
504
505pub struct CoordinatorWorker {
506 handle_manager: CoordinationHandleManager,
507 prev_committed_epoch: Option<u64>,
508 curr_state: CoordinatorWorkerState,
509}
510
511enum CoordinatorWorkerEvent {
512 HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
513 ReadyToCommit(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>),
514}
515
516impl CoordinatorWorker {
517 pub async fn run(
518 param: SinkParam,
519 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
520 db: DatabaseConnection,
521 subscriber: SinkCommittedEpochSubscriber,
522 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
523 ) {
524 let sink = match build_sink(param.clone()) {
525 Ok(sink) => sink,
526 Err(e) => {
527 error!(
528 error = %e.as_report(),
529 "unable to build sink with param {:?}",
530 param
531 );
532 return;
533 }
534 };
535
536 dispatch_sink!(sink, sink, {
537 let coordinator = match sink
538 .new_coordinator(Some(iceberg_compact_stat_sender))
539 .await
540 {
541 Ok(coordinator) => coordinator,
542 Err(e) => {
543 error!(
544 error = %e.as_report(),
545 "unable to build coordinator with param {:?}",
546 param
547 );
548 return;
549 }
550 };
551 Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
552 });
553 }
554
555 pub async fn execute_coordinator(
556 db: DatabaseConnection,
557 param: SinkParam,
558 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
559 coordinator: SinkCommitCoordinator,
560 subscriber: SinkCommittedEpochSubscriber,
561 ) {
562 let mut worker = CoordinatorWorker {
563 handle_manager: CoordinationHandleManager {
564 param,
565 writer_handles: HashMap::new(),
566 next_handle_id: 0,
567 request_rx,
568 },
569 prev_committed_epoch: None,
570 curr_state: CoordinatorWorkerState::Running,
571 };
572
573 if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
574 for handle in worker.handle_manager.writer_handles.into_values() {
575 handle.abort(Status::internal(format!(
576 "failed to run coordination: {:?}",
577 e.as_report()
578 )))
579 }
580 }
581 }
582
583 async fn try_handle_init_requests(
584 &mut self,
585 pending_handle_ids: &HashSet<HandleId>,
586 two_phase_handler: &mut TwoPhaseCommitHandler,
587 ) -> anyhow::Result<()> {
588 assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
589 if two_phase_handler.has_uncommitted_schema_change() {
590 self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
592 } else {
593 self.handle_init_requests_impl(pending_handle_ids.clone())
594 .await?;
595 }
596 Ok(())
597 }
598
599 async fn handle_init_requests_impl(
600 &mut self,
601 pending_handle_ids: impl IntoIterator<Item = HandleId>,
602 ) -> anyhow::Result<()> {
603 let log_store_rewind_start_epoch = self.prev_committed_epoch;
604 self.handle_manager
605 .start(log_store_rewind_start_epoch, pending_handle_ids)?;
606 if log_store_rewind_start_epoch.is_none() {
607 let mut align_requests = AligningRequests::default();
608 while !align_requests.aligned() {
609 let (handle_id, event) = self.handle_manager.next_event().await?;
610 match event {
611 CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
612 align_requests.add_new_request(
613 handle_id,
614 initial_epoch,
615 self.handle_manager.vnode_bitmap(handle_id),
616 )?;
617 }
618 other => {
619 return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
620 }
621 }
622 }
623 let aligned_initial_epoch = align_requests
624 .requests
625 .into_iter()
626 .max()
627 .expect("non-empty");
628 self.handle_manager
629 .ack_aligned_initial_epoch(aligned_initial_epoch)?;
630 }
631 Ok(())
632 }
633
634 async fn next_event(
635 &mut self,
636 two_phase_handler: &mut TwoPhaseCommitHandler,
637 ) -> anyhow::Result<CoordinatorWorkerEvent> {
638 if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state
639 && two_phase_handler.is_empty()
640 {
641 let pending_handle_ids = pending_handle_ids.clone();
642 self.handle_init_requests_impl(pending_handle_ids).await?;
643 self.curr_state = CoordinatorWorkerState::Running;
644 }
645
646 select! {
647 next_handle_event = self.handle_manager.next_event() => {
648 let (handle_id, event) = next_handle_event?;
649 Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
650 }
651
652 next_item_to_commit = two_phase_handler.next_to_commit() => {
653 let (epoch, metadata, schema_change) = next_item_to_commit?;
654 Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change))
655 }
656 }
657 }
658
659 async fn run_coordination(
660 &mut self,
661 db: DatabaseConnection,
662 mut coordinator: SinkCommitCoordinator,
663 subscriber: SinkCommittedEpochSubscriber,
664 ) -> anyhow::Result<()> {
665 let sink_id = self.handle_manager.param.sink_id;
666
667 let mut two_phase_handler = self
668 .init_state_from_store(&db, sink_id, subscriber, &mut coordinator)
669 .await?;
670 match &mut coordinator {
671 SinkCommitCoordinator::SinglePhase(coordinator) => coordinator.init().await?,
672 SinkCommitCoordinator::TwoPhase(coordinator) => coordinator.init().await?,
673 }
674
675 let mut running_handles = self.handle_manager.wait_init_handles().await?;
676 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
677 .await?;
678
679 let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
680 let mut pending_new_handles = vec![];
681 loop {
682 let event = self.next_event(&mut two_phase_handler).await?;
683 let (handle_id, epoch, commit_request) = match event {
684 CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
685 CoordinationHandleManagerEvent::NewHandle => {
686 pending_new_handles.push(handle_id);
687 continue;
688 }
689 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
690 running_handles = self
691 .handle_manager
692 .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
693 .await?;
694 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
695 .await?;
696 continue;
697 }
698 CoordinationHandleManagerEvent::Stop => {
699 self.handle_manager.stop_handle(handle_id)?;
700 running_handles = self
701 .handle_manager
702 .alter_parallelisms(pending_new_handles.drain(..))
703 .await?;
704 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
705 .await?;
706
707 continue;
708 }
709 CoordinationHandleManagerEvent::CommitRequest {
710 epoch,
711 metadata,
712 schema_change,
713 } => (handle_id, epoch, (metadata, schema_change)),
714 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
715 bail!("receive AlignInitialEpoch after initialization")
716 }
717 },
718 CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change) => {
719 let start_time = Instant::now();
720 let commit_fut = async {
721 match &mut coordinator {
722 SinkCommitCoordinator::SinglePhase(coordinator) => {
723 assert!(metadata.is_none());
724 if let Some(schema_change) = schema_change {
725 coordinator
726 .commit_schema_change(epoch, schema_change)
727 .instrument_await(Self::commit_span(
728 "single_phase_schema_change",
729 sink_id,
730 epoch,
731 ))
732 .await?;
733 }
734 }
735 SinkCommitCoordinator::TwoPhase(coordinator) => {
736 if let Some(metadata) = metadata {
737 coordinator
738 .commit_data(epoch, metadata)
739 .instrument_await(Self::commit_span(
740 "two_phase_commit_data",
741 sink_id,
742 epoch,
743 ))
744 .await?;
745 }
746 if let Some(schema_change) = schema_change {
747 coordinator
748 .commit_schema_change(epoch, schema_change)
749 .instrument_await(Self::commit_span(
750 "two_phase_commit_schema_change",
751 sink_id,
752 epoch,
753 ))
754 .await?;
755 }
756 }
757 }
758 Ok(())
759 };
760 let commit_res =
761 run_future_with_periodic_fn(commit_fut, Duration::from_secs(5), || {
762 warn!(
763 elapsed = ?start_time.elapsed(),
764 %sink_id,
765 "committing"
766 );
767 })
768 .await;
769
770 match commit_res {
771 Ok(_) => {
772 two_phase_handler.ack_committed(epoch).await?;
773 self.prev_committed_epoch = Some(epoch);
774 }
775 Err(e) => {
776 two_phase_handler.failed_committed(epoch, e);
777 }
778 }
779
780 continue;
781 }
782 };
783 if !running_handles.contains(&handle_id) {
784 bail!(
785 "receiving commit request from non-running handle {}, running handles: {:?}",
786 handle_id,
787 running_handles
788 );
789 }
790 pending_epochs.entry(epoch).or_default().add_new_request(
791 handle_id,
792 commit_request,
793 self.handle_manager.vnode_bitmap(handle_id),
794 )?;
795 if pending_epochs
796 .first_key_value()
797 .expect("non-empty")
798 .1
799 .aligned()
800 {
801 let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
802 let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
803 let mut requests = commit_requests.requests.into_iter();
804 let (first_metadata, first_schema_change) = requests.next().expect("non-empty");
805 metadatas.push(first_metadata);
806 for (metadata, schema_change) in requests {
807 if first_schema_change != schema_change {
808 return Err(anyhow!(
809 "got different schema change {:?} to prev schema change {:?}",
810 schema_change,
811 first_schema_change
812 ));
813 }
814 metadatas.push(metadata);
815 }
816
817 match &mut coordinator {
818 SinkCommitCoordinator::SinglePhase(coordinator) => {
819 if !metadatas.is_empty() {
820 let start_time = Instant::now();
821 run_future_with_periodic_fn(
822 coordinator.commit_data(epoch, metadatas).instrument_await(
823 Self::commit_span("single_phase_commit_data", sink_id, epoch),
824 ),
825 Duration::from_secs(5),
826 || {
827 warn!(
828 elapsed = ?start_time.elapsed(),
829 %sink_id,
830 "committing"
831 );
832 },
833 )
834 .await
835 .map_err(|e| anyhow!(e))?;
836 }
837 if first_schema_change.is_some() {
838 persist_pre_commit_metadata(
839 &db,
840 sink_id as _,
841 epoch,
842 None,
843 first_schema_change.as_ref(),
844 )
845 .await?;
846 two_phase_handler.push_new_item(epoch, None, first_schema_change);
847 } else {
848 self.prev_committed_epoch = Some(epoch);
849 }
850 }
851 SinkCommitCoordinator::TwoPhase(coordinator) => {
852 let commit_metadata = coordinator
853 .pre_commit(epoch, metadatas, first_schema_change.clone())
854 .instrument_await(Self::commit_span(
855 "two_phase_pre_commit",
856 sink_id,
857 epoch,
858 ))
859 .await?;
860 if commit_metadata.is_some() || first_schema_change.is_some() {
861 persist_pre_commit_metadata(
862 &db,
863 sink_id as _,
864 epoch,
865 commit_metadata.clone(),
866 first_schema_change.as_ref(),
867 )
868 .await?;
869 two_phase_handler.push_new_item(
870 epoch,
871 commit_metadata,
872 first_schema_change,
873 );
874 } else {
875 self.prev_committed_epoch = Some(epoch);
877 }
878 }
879 }
880
881 self.handle_manager
882 .ack_commit(epoch, commit_requests.handle_ids)?;
883 }
884 }
885 }
886
887 async fn init_state_from_store(
889 &mut self,
890 db: &DatabaseConnection,
891 sink_id: SinkId,
892 subscriber: SinkCommittedEpochSubscriber,
893 coordinator: &mut SinkCommitCoordinator,
894 ) -> anyhow::Result<TwoPhaseCommitHandler> {
895 let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
896
897 let mut metadata_iter = ordered_metadata.into_iter().peekable();
898 let last_committed_epoch = metadata_iter
899 .next_if(|(_, state, _, _)| matches!(state, SinkState::Committed))
900 .map(|(epoch, _, _, _)| epoch);
901 self.prev_committed_epoch = last_committed_epoch;
902
903 let pending_items = metadata_iter
904 .peeking_take_while(|(_, state, _, _)| matches!(state, SinkState::Pending))
905 .map(|(epoch, _, metadata, schema_change)| (epoch, metadata, schema_change))
906 .collect_vec();
907
908 let mut aborted_epochs = vec![];
909
910 for (epoch, state, metadata, _) in metadata_iter {
911 match state {
912 SinkState::Aborted => {
913 if let Some(metadata) = metadata
914 && let SinkCommitCoordinator::TwoPhase(coordinator) = coordinator
915 {
916 coordinator.abort(epoch, metadata).await;
917 }
918 aborted_epochs.push(epoch);
919 }
920 other => {
921 unreachable!(
922 "unexpected state {:?} after pending items at epoch {}",
923 other, epoch
924 );
925 }
926 }
927 }
928
929 clean_aborted_records(db, sink_id, aborted_epochs).await?;
931
932 let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
933 let mut two_phase_handler = TwoPhaseCommitHandler::new(
934 db.clone(),
935 sink_id,
936 initial_hummock_committed_epoch,
937 job_committed_epoch_rx,
938 last_committed_epoch,
939 );
940
941 for (epoch, metadata, schema_change) in pending_items {
942 two_phase_handler.push_new_item(epoch, metadata, schema_change);
943 }
944
945 Ok(two_phase_handler)
946 }
947
948 fn commit_span(stage: &str, sink_id: SinkId, epoch: u64) -> await_tree::Span {
949 await_tree::span!("sink_coord_{stage} (sink_id {sink_id}, epoch {epoch})").long_running()
950 }
951}