1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::{Either, pending, select};
25use futures::pin_mut;
26use itertools::Itertools;
27use risingwave_common::bail;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::SinkId;
32use risingwave_connector::sink::{
33 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
34};
35use risingwave_meta_model::pending_sink_state::SinkState;
36use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
37use risingwave_pb::stream_plan::PbSinkSchemaChange;
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48 clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49 persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54 future: F,
55 interval: Duration,
56 mut f: impl FnMut(),
57) -> F::Output {
58 pin_mut!(future);
59 loop {
60 match select(&mut future, pin!(sleep(interval))).await {
61 Either::Left((output, _)) => {
62 break output;
63 }
64 Either::Right(_) => f(),
65 }
66 }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73 requests: Vec<R>,
74 handle_ids: HashSet<HandleId>,
75 committed_bitmap: Option<Bitmap>, }
77
78impl<R> AligningRequests<R> {
79 fn add_new_request(
80 &mut self,
81 handle_id: HandleId,
82 request: R,
83 vnode_bitmap: &Bitmap,
84 ) -> anyhow::Result<()>
85 where
86 R: Debug,
87 {
88 let committed_bitmap = self
89 .committed_bitmap
90 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93 let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94 if check_bitmap.count_ones() > 0 {
95 return Err(anyhow!(
96 "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97 check_bitmap.iter_ones().collect_vec(),
98 vnode_bitmap,
99 committed_bitmap,
100 self.requests,
101 request
102 ));
103 }
104 *committed_bitmap |= vnode_bitmap;
105 self.requests.push(request);
106 assert!(self.handle_ids.insert(handle_id));
107 Ok(())
108 }
109
110 fn aligned(&self) -> bool {
111 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112 }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119 db: DatabaseConnection,
120 sink_id: SinkId,
121 curr_hummock_committed_epoch: u64,
122 job_committed_epoch_rx: UnboundedReceiver<u64>,
123 last_committed_epoch: Option<u64>,
124 pending_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
125 prepared_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
126 backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130 fn new(
131 db: DatabaseConnection,
132 sink_id: SinkId,
133 initial_hummock_committed_epoch: u64,
134 job_committed_epoch_rx: UnboundedReceiver<u64>,
135 last_committed_epoch: Option<u64>,
136 ) -> Self {
137 Self {
138 db,
139 sink_id,
140 curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141 job_committed_epoch_rx,
142 last_committed_epoch,
143 pending_epochs: VecDeque::new(),
144 prepared_epochs: VecDeque::new(),
145 backoff_state: None,
146 }
147 }
148
149 #[define_opaque(RetryBackoffStrategy)]
150 fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151 ExponentialBackoff::from_millis(10)
152 .max_delay(Duration::from_secs(60))
153 .map(jitter)
154 .map(|delay| Box::pin(tokio::time::sleep(delay)))
155 }
156
157 async fn next_to_commit(
158 &mut self,
159 ) -> anyhow::Result<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)> {
160 loop {
161 let wait_backoff = async {
162 if self.prepared_epochs.is_empty() {
163 pending::<()>().await;
164 } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
165 backoff_fut.await;
166 }
167 };
168
169 select! {
170 _ = wait_backoff => {
171 let item = self.prepared_epochs.front().cloned().expect("non-empty");
172 return Ok(item);
173 }
174
175 recv_epoch = self.job_committed_epoch_rx.recv() => {
176 let Some(recv_epoch) = recv_epoch else {
177 return Err(anyhow!(
178 "Hummock committed epoch sender closed unexpectedly"
179 ));
180 };
181 self.curr_hummock_committed_epoch = recv_epoch;
182 while let Some((epoch, metadata, schema_change)) = self.pending_epochs.pop_front_if(|(epoch, _, _)| *epoch <= recv_epoch) {
183 if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
184 assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
185 }
186 self.prepared_epochs.push_back((epoch, metadata, schema_change));
187 }
188 }
189 }
190 }
191 }
192
193 fn push_new_item(
194 &mut self,
195 epoch: u64,
196 metadata: Option<Vec<u8>>,
197 schema_change: Option<PbSinkSchemaChange>,
198 ) {
199 if epoch > self.curr_hummock_committed_epoch {
200 if let Some((last_epoch, _, _)) = self.pending_epochs.back() {
201 assert!(
202 epoch > *last_epoch,
203 "pending epochs must be in increasing order"
204 );
205 }
206 self.pending_epochs
207 .push_back((epoch, metadata, schema_change));
208 } else {
209 assert!(self.pending_epochs.is_empty());
210 if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
211 assert!(
212 epoch > *last_epoch,
213 "prepared epochs must be in increasing order"
214 );
215 }
216 self.prepared_epochs
217 .push_back((epoch, metadata, schema_change));
218 }
219 }
220
221 async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
222 self.backoff_state = None;
223 let (last_epoch, _, _) = self.prepared_epochs.pop_front().expect("non-empty");
224 assert_eq!(last_epoch, epoch);
225
226 commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
227 self.last_committed_epoch = Some(epoch);
228 Ok(())
229 }
230
231 fn failed_committed(&mut self, epoch: u64, err: SinkError) {
232 assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
233 if let Some((prev_fut, strategy)) = &mut self.backoff_state {
234 let new_fut = strategy.next().expect("infinite");
235 *prev_fut = new_fut;
236 } else {
237 let mut strategy = Self::get_retry_backoff_strategy();
238 let backoff_fut = strategy.next().expect("infinite");
239 self.backoff_state = Some((backoff_fut, strategy));
240 }
241 tracing::error!(
242 error = %err.as_report(),
243 %self.sink_id,
244 "failed to commit epoch {}, Retrying after backoff",
245 epoch,
246 );
247 }
248
249 fn is_empty(&self) -> bool {
250 self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
251 }
252
253 fn has_uncommitted_schema_change(&self) -> bool {
258 if let Some((_, _, schema_change)) = self.pending_epochs.back() {
259 schema_change.is_some()
260 } else if let Some((_, _, schema_change)) = self.prepared_epochs.back() {
261 schema_change.is_some()
262 } else {
263 false
264 }
265 }
266}
267
268struct CoordinationHandleManager {
269 param: SinkParam,
270 writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
271 next_handle_id: HandleId,
272 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
273}
274
275impl CoordinationHandleManager {
276 fn start(
277 &mut self,
278 log_store_rewind_start_epoch: Option<u64>,
279 handle_ids: impl IntoIterator<Item = HandleId>,
280 ) -> anyhow::Result<()> {
281 for handle_id in handle_ids {
282 let handle = self
283 .writer_handles
284 .get_mut(&handle_id)
285 .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
286 handle.start(log_store_rewind_start_epoch).map_err(|_| {
287 anyhow!(
288 "fail to start {:?} for handle {}",
289 log_store_rewind_start_epoch,
290 handle_id
291 )
292 })?;
293 }
294 Ok(())
295 }
296
297 fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
298 for (handle_id, handle) in &mut self.writer_handles {
299 handle
300 .ack_aligned_initial_epoch(aligned_initial_epoch)
301 .map_err(|_| {
302 anyhow!(
303 "fail to ack_aligned_initial_epoch {:?} for handle {}",
304 aligned_initial_epoch,
305 handle_id
306 )
307 })?;
308 }
309 Ok(())
310 }
311
312 fn ack_commit(
313 &mut self,
314 epoch: u64,
315 handle_ids: impl IntoIterator<Item = HandleId>,
316 ) -> anyhow::Result<()> {
317 for handle_id in handle_ids {
318 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
319 anyhow!(
320 "fail to find handle for {} when ack commit on epoch {}",
321 handle_id,
322 epoch
323 )
324 })?;
325 handle.ack_commit(epoch).map_err(|_| {
326 anyhow!(
327 "fail to ack commit on epoch {} for handle {}",
328 epoch,
329 handle_id
330 )
331 })?;
332 }
333 Ok(())
334 }
335
336 async fn next_request_inner(
337 writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
338 ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
339 poll_fn(|cx| {
340 for (handle_id, handle) in writer_handles.iter_mut() {
341 if let Poll::Ready(result) = handle.poll_next_request(cx) {
342 return Poll::Ready(result.map(|request| (*handle_id, request)));
343 }
344 }
345 Poll::Pending
346 })
347 .await
348 }
349}
350
351enum CoordinationHandleManagerEvent {
352 NewHandle,
353 UpdateVnodeBitmap,
354 Stop,
355 CommitRequest {
356 epoch: u64,
357 metadata: SinkMetadata,
358 schema_change: Option<PbSinkSchemaChange>,
359 },
360 AlignInitialEpoch(u64),
361}
362
363impl CoordinationHandleManagerEvent {
364 fn name(&self) -> &'static str {
365 match self {
366 CoordinationHandleManagerEvent::NewHandle => "NewHandle",
367 CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
368 CoordinationHandleManagerEvent::Stop => "Stop",
369 CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
370 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
371 }
372 }
373}
374
375impl CoordinationHandleManager {
376 async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
377 select! {
378 handle = self.request_rx.recv() => {
379 let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
380 if handle.param() != &self.param {
381 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
382 }
383 let handle_id = self.next_handle_id;
384 self.next_handle_id += 1;
385 self.writer_handles.insert(handle_id, handle);
386 Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
387 }
388 result = Self::next_request_inner(&mut self.writer_handles) => {
389 let (handle_id, request) = result?;
390 let event = match request {
391 coordinate_request::Msg::CommitRequest(request) => {
392 CoordinationHandleManagerEvent::CommitRequest {
393 epoch: request.epoch,
394 metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
395 schema_change: request.schema_change,
396 }
397 }
398 coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
399 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
400 }
401 coordinate_request::Msg::UpdateVnodeRequest(_) => {
402 CoordinationHandleManagerEvent::UpdateVnodeBitmap
403 }
404 coordinate_request::Msg::Stop(_) => {
405 CoordinationHandleManagerEvent::Stop
406 }
407 coordinate_request::Msg::StartRequest(_) => {
408 unreachable!("should have been handled");
409 }
410 };
411 Ok((handle_id, event))
412 }
413 }
414 }
415
416 fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
417 self.writer_handles[&handle_id].vnode_bitmap()
418 }
419
420 fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
421 self.writer_handles
422 .remove(&handle_id)
423 .expect("should exist")
424 .stop()
425 }
426
427 async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
428 assert!(self.writer_handles.is_empty());
429 let mut init_requests = AligningRequests::default();
430 while !init_requests.aligned() {
431 let (handle_id, event) = self.next_event().await?;
432 let unexpected_event = match event {
433 CoordinationHandleManagerEvent::NewHandle => {
434 init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
435 continue;
436 }
437 event => event.name(),
438 };
439 return Err(anyhow!(
440 "expect new handle during init, but got {}",
441 unexpected_event
442 ));
443 }
444 Ok(init_requests.handle_ids)
445 }
446
447 async fn alter_parallelisms(
448 &mut self,
449 altered_handles: impl Iterator<Item = HandleId>,
450 ) -> anyhow::Result<HashSet<HandleId>> {
451 let mut requests = AligningRequests::default();
452 for handle_id in altered_handles {
453 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
454 }
455 let mut remaining_handles: HashSet<_> = self
456 .writer_handles
457 .keys()
458 .filter(|handle_id| !requests.handle_ids.contains(handle_id))
459 .cloned()
460 .collect();
461 while !remaining_handles.is_empty() || !requests.aligned() {
462 let (handle_id, event) = self.next_event().await?;
463 match event {
464 CoordinationHandleManagerEvent::NewHandle => {
465 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
466 }
467 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
468 assert!(remaining_handles.remove(&handle_id));
469 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
470 }
471 CoordinationHandleManagerEvent::Stop => {
472 assert!(remaining_handles.remove(&handle_id));
473 self.stop_handle(handle_id)?;
474 }
475 CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
476 bail!(
477 "receive commit request on epoch {} from handle {} during alter parallelism",
478 epoch,
479 handle_id
480 );
481 }
482 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
483 bail!(
484 "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
485 epoch,
486 handle_id
487 );
488 }
489 }
490 }
491 Ok(requests.handle_ids)
492 }
493}
494
495enum CoordinatorWorkerState {
501 Running,
502 WaitingForFlushed(HashSet<HandleId>),
503}
504
505pub struct CoordinatorWorker {
506 handle_manager: CoordinationHandleManager,
507 prev_committed_epoch: Option<u64>,
508 curr_state: CoordinatorWorkerState,
509}
510
511enum CoordinatorWorkerEvent {
512 HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
513 ReadyToCommit(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>),
514}
515
516impl CoordinatorWorker {
517 #[allow(clippy::large_stack_frames)]
518 pub async fn run(
519 param: SinkParam,
520 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
521 db: DatabaseConnection,
522 subscriber: SinkCommittedEpochSubscriber,
523 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
524 ) {
525 let sink = match build_sink(param.clone()) {
526 Ok(sink) => sink,
527 Err(e) => {
528 error!(
529 error = %e.as_report(),
530 "unable to build sink with param {:?}",
531 param
532 );
533 return;
534 }
535 };
536
537 dispatch_sink!(sink, sink, {
538 let coordinator =
539 match Box::pin(sink.new_coordinator(Some(iceberg_compact_stat_sender))).await {
540 Ok(coordinator) => coordinator,
541 Err(e) => {
542 error!(
543 error = %e.as_report(),
544 "unable to build coordinator with param {:?}",
545 param
546 );
547 return;
548 }
549 };
550 Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
551 });
552 }
553
554 pub async fn execute_coordinator(
555 db: DatabaseConnection,
556 param: SinkParam,
557 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
558 coordinator: SinkCommitCoordinator,
559 subscriber: SinkCommittedEpochSubscriber,
560 ) {
561 let mut worker = CoordinatorWorker {
562 handle_manager: CoordinationHandleManager {
563 param,
564 writer_handles: HashMap::new(),
565 next_handle_id: 0,
566 request_rx,
567 },
568 prev_committed_epoch: None,
569 curr_state: CoordinatorWorkerState::Running,
570 };
571
572 if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
573 for handle in worker.handle_manager.writer_handles.into_values() {
574 handle.abort(Status::internal(format!(
575 "failed to run coordination: {:?}",
576 e.as_report()
577 )))
578 }
579 }
580 }
581
582 async fn try_handle_init_requests(
583 &mut self,
584 pending_handle_ids: &HashSet<HandleId>,
585 two_phase_handler: &mut TwoPhaseCommitHandler,
586 ) -> anyhow::Result<()> {
587 assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
588 if two_phase_handler.has_uncommitted_schema_change() {
589 self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
591 } else {
592 self.handle_init_requests_impl(pending_handle_ids.clone())
593 .await?;
594 }
595 Ok(())
596 }
597
598 async fn handle_init_requests_impl(
599 &mut self,
600 pending_handle_ids: impl IntoIterator<Item = HandleId>,
601 ) -> anyhow::Result<()> {
602 let log_store_rewind_start_epoch = self.prev_committed_epoch;
603 self.handle_manager
604 .start(log_store_rewind_start_epoch, pending_handle_ids)?;
605 if log_store_rewind_start_epoch.is_none() {
606 let mut align_requests = AligningRequests::default();
607 while !align_requests.aligned() {
608 let (handle_id, event) = self.handle_manager.next_event().await?;
609 match event {
610 CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
611 align_requests.add_new_request(
612 handle_id,
613 initial_epoch,
614 self.handle_manager.vnode_bitmap(handle_id),
615 )?;
616 }
617 other => {
618 return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
619 }
620 }
621 }
622 let aligned_initial_epoch = align_requests
623 .requests
624 .into_iter()
625 .max()
626 .expect("non-empty");
627 self.handle_manager
628 .ack_aligned_initial_epoch(aligned_initial_epoch)?;
629 }
630 Ok(())
631 }
632
633 async fn next_event(
634 &mut self,
635 two_phase_handler: &mut TwoPhaseCommitHandler,
636 ) -> anyhow::Result<CoordinatorWorkerEvent> {
637 if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state
638 && two_phase_handler.is_empty()
639 {
640 let pending_handle_ids = pending_handle_ids.clone();
641 self.handle_init_requests_impl(pending_handle_ids).await?;
642 self.curr_state = CoordinatorWorkerState::Running;
643 }
644
645 select! {
646 next_handle_event = self.handle_manager.next_event() => {
647 let (handle_id, event) = next_handle_event?;
648 Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
649 }
650
651 next_item_to_commit = two_phase_handler.next_to_commit() => {
652 let (epoch, metadata, schema_change) = next_item_to_commit?;
653 Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change))
654 }
655 }
656 }
657
658 async fn run_coordination(
659 &mut self,
660 db: DatabaseConnection,
661 mut coordinator: SinkCommitCoordinator,
662 subscriber: SinkCommittedEpochSubscriber,
663 ) -> anyhow::Result<()> {
664 let sink_id = self.handle_manager.param.sink_id;
665
666 let mut two_phase_handler = self
667 .init_state_from_store(&db, sink_id, subscriber, &mut coordinator)
668 .await?;
669 match &mut coordinator {
670 SinkCommitCoordinator::SinglePhase(coordinator) => coordinator.init().await?,
671 SinkCommitCoordinator::TwoPhase(coordinator) => coordinator.init().await?,
672 }
673
674 let mut running_handles = self.handle_manager.wait_init_handles().await?;
675 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
676 .await?;
677
678 let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
679 let mut pending_new_handles = vec![];
680 loop {
681 let event = self.next_event(&mut two_phase_handler).await?;
682 let (handle_id, epoch, commit_request) = match event {
683 CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
684 CoordinationHandleManagerEvent::NewHandle => {
685 pending_new_handles.push(handle_id);
686 continue;
687 }
688 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
689 running_handles = self
690 .handle_manager
691 .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
692 .await?;
693 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
694 .await?;
695 continue;
696 }
697 CoordinationHandleManagerEvent::Stop => {
698 self.handle_manager.stop_handle(handle_id)?;
699 running_handles = self
700 .handle_manager
701 .alter_parallelisms(pending_new_handles.drain(..))
702 .await?;
703 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
704 .await?;
705
706 continue;
707 }
708 CoordinationHandleManagerEvent::CommitRequest {
709 epoch,
710 metadata,
711 schema_change,
712 } => (handle_id, epoch, (metadata, schema_change)),
713 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
714 bail!("receive AlignInitialEpoch after initialization")
715 }
716 },
717 CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change) => {
718 let start_time = Instant::now();
719 let commit_fut = async {
720 match &mut coordinator {
721 SinkCommitCoordinator::SinglePhase(coordinator) => {
722 assert!(metadata.is_none());
723 if let Some(schema_change) = schema_change {
724 coordinator
725 .commit_schema_change(epoch, schema_change)
726 .instrument_await(Self::commit_span(
727 "single_phase_schema_change",
728 sink_id,
729 epoch,
730 ))
731 .await?;
732 }
733 }
734 SinkCommitCoordinator::TwoPhase(coordinator) => {
735 if let Some(metadata) = metadata {
736 coordinator
737 .commit_data(epoch, metadata)
738 .instrument_await(Self::commit_span(
739 "two_phase_commit_data",
740 sink_id,
741 epoch,
742 ))
743 .await?;
744 }
745 if let Some(schema_change) = schema_change {
746 coordinator
747 .commit_schema_change(epoch, schema_change)
748 .instrument_await(Self::commit_span(
749 "two_phase_commit_schema_change",
750 sink_id,
751 epoch,
752 ))
753 .await?;
754 }
755 }
756 }
757 Ok(())
758 };
759 let commit_res =
760 run_future_with_periodic_fn(commit_fut, Duration::from_secs(5), || {
761 warn!(
762 elapsed = ?start_time.elapsed(),
763 %sink_id,
764 "committing"
765 );
766 })
767 .await;
768
769 match commit_res {
770 Ok(_) => {
771 two_phase_handler.ack_committed(epoch).await?;
772 self.prev_committed_epoch = Some(epoch);
773 }
774 Err(e) => {
775 two_phase_handler.failed_committed(epoch, e);
776 }
777 }
778
779 continue;
780 }
781 };
782 if !running_handles.contains(&handle_id) {
783 bail!(
784 "receiving commit request from non-running handle {}, running handles: {:?}",
785 handle_id,
786 running_handles
787 );
788 }
789 pending_epochs.entry(epoch).or_default().add_new_request(
790 handle_id,
791 commit_request,
792 self.handle_manager.vnode_bitmap(handle_id),
793 )?;
794 if pending_epochs
795 .first_key_value()
796 .expect("non-empty")
797 .1
798 .aligned()
799 {
800 let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
801 let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
802 let mut requests = commit_requests.requests.into_iter();
803 let (first_metadata, first_schema_change) = requests.next().expect("non-empty");
804 metadatas.push(first_metadata);
805 for (metadata, schema_change) in requests {
806 if first_schema_change != schema_change {
807 return Err(anyhow!(
808 "got different schema change {:?} to prev schema change {:?}",
809 schema_change,
810 first_schema_change
811 ));
812 }
813 metadatas.push(metadata);
814 }
815
816 match &mut coordinator {
817 SinkCommitCoordinator::SinglePhase(coordinator) => {
818 if !metadatas.is_empty() {
819 let start_time = Instant::now();
820 run_future_with_periodic_fn(
821 coordinator.commit_data(epoch, metadatas).instrument_await(
822 Self::commit_span("single_phase_commit_data", sink_id, epoch),
823 ),
824 Duration::from_secs(5),
825 || {
826 warn!(
827 elapsed = ?start_time.elapsed(),
828 %sink_id,
829 "committing"
830 );
831 },
832 )
833 .await
834 .map_err(|e| anyhow!(e))?;
835 }
836 if first_schema_change.is_some() {
837 persist_pre_commit_metadata(
838 &db,
839 sink_id as _,
840 epoch,
841 None,
842 first_schema_change.as_ref(),
843 )
844 .await?;
845 two_phase_handler.push_new_item(epoch, None, first_schema_change);
846 } else {
847 self.prev_committed_epoch = Some(epoch);
848 }
849 }
850 SinkCommitCoordinator::TwoPhase(coordinator) => {
851 let commit_metadata = coordinator
852 .pre_commit(epoch, metadatas, first_schema_change.clone())
853 .instrument_await(Self::commit_span(
854 "two_phase_pre_commit",
855 sink_id,
856 epoch,
857 ))
858 .await?;
859 if commit_metadata.is_some() || first_schema_change.is_some() {
860 persist_pre_commit_metadata(
861 &db,
862 sink_id as _,
863 epoch,
864 commit_metadata.clone(),
865 first_schema_change.as_ref(),
866 )
867 .await?;
868 two_phase_handler.push_new_item(
869 epoch,
870 commit_metadata,
871 first_schema_change,
872 );
873 } else {
874 self.prev_committed_epoch = Some(epoch);
876 }
877 }
878 }
879
880 self.handle_manager
881 .ack_commit(epoch, commit_requests.handle_ids)?;
882 }
883 }
884 }
885
886 async fn init_state_from_store(
888 &mut self,
889 db: &DatabaseConnection,
890 sink_id: SinkId,
891 subscriber: SinkCommittedEpochSubscriber,
892 coordinator: &mut SinkCommitCoordinator,
893 ) -> anyhow::Result<TwoPhaseCommitHandler> {
894 let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
895
896 let mut metadata_iter = ordered_metadata.into_iter().peekable();
897 let last_committed_epoch = metadata_iter
898 .next_if(|(_, state, _, _)| matches!(state, SinkState::Committed))
899 .map(|(epoch, _, _, _)| epoch);
900 self.prev_committed_epoch = last_committed_epoch;
901
902 let pending_items = metadata_iter
903 .peeking_take_while(|(_, state, _, _)| matches!(state, SinkState::Pending))
904 .map(|(epoch, _, metadata, schema_change)| (epoch, metadata, schema_change))
905 .collect_vec();
906
907 let mut aborted_epochs = vec![];
908
909 for (epoch, state, metadata, _) in metadata_iter {
910 match state {
911 SinkState::Aborted => {
912 if let Some(metadata) = metadata
913 && let SinkCommitCoordinator::TwoPhase(coordinator) = coordinator
914 {
915 coordinator.abort(epoch, metadata).await;
916 }
917 aborted_epochs.push(epoch);
918 }
919 other => {
920 unreachable!(
921 "unexpected state {:?} after pending items at epoch {}",
922 other, epoch
923 );
924 }
925 }
926 }
927
928 clean_aborted_records(db, sink_id, aborted_epochs).await?;
930
931 let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
932 let mut two_phase_handler = TwoPhaseCommitHandler::new(
933 db.clone(),
934 sink_id,
935 initial_hummock_committed_epoch,
936 job_committed_epoch_rx,
937 last_committed_epoch,
938 );
939
940 for (epoch, metadata, schema_change) in pending_items {
941 two_phase_handler.push_new_item(epoch, metadata, schema_change);
942 }
943
944 Ok(two_phase_handler)
945 }
946
947 fn commit_span(stage: &str, sink_id: SinkId, epoch: u64) -> await_tree::Span {
948 await_tree::span!("sink_coord_{stage} (sink_id {sink_id}, epoch {epoch})").long_running()
949 }
950}