1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::{Either, pending, select};
25use futures::pin_mut;
26use itertools::Itertools;
27use risingwave_common::bail;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::SinkId;
32use risingwave_connector::sink::{
33 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
34};
35use risingwave_meta_model::pending_sink_state::SinkState;
36use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
37use risingwave_pb::stream_plan::PbSinkSchemaChange;
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48 clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49 persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54 future: F,
55 interval: Duration,
56 mut f: impl FnMut(),
57) -> F::Output {
58 pin_mut!(future);
59 loop {
60 match select(&mut future, pin!(sleep(interval))).await {
61 Either::Left((output, _)) => {
62 break output;
63 }
64 Either::Right(_) => f(),
65 }
66 }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73 requests: Vec<R>,
74 handle_ids: HashSet<HandleId>,
75 committed_bitmap: Option<Bitmap>, }
77
78impl<R> AligningRequests<R> {
79 fn add_new_request(
80 &mut self,
81 handle_id: HandleId,
82 request: R,
83 vnode_bitmap: &Bitmap,
84 ) -> anyhow::Result<()>
85 where
86 R: Debug,
87 {
88 let committed_bitmap = self
89 .committed_bitmap
90 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93 let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94 if check_bitmap.count_ones() > 0 {
95 return Err(anyhow!(
96 "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97 check_bitmap.iter_ones().collect_vec(),
98 vnode_bitmap,
99 committed_bitmap,
100 self.requests,
101 request
102 ));
103 }
104 *committed_bitmap |= vnode_bitmap;
105 self.requests.push(request);
106 assert!(self.handle_ids.insert(handle_id));
107 Ok(())
108 }
109
110 fn aligned(&self) -> bool {
111 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112 }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119 db: DatabaseConnection,
120 sink_id: SinkId,
121 curr_hummock_committed_epoch: u64,
122 job_committed_epoch_rx: UnboundedReceiver<u64>,
123 last_committed_epoch: Option<u64>,
124 pending_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
125 prepared_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
126 backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130 fn new(
131 db: DatabaseConnection,
132 sink_id: SinkId,
133 initial_hummock_committed_epoch: u64,
134 job_committed_epoch_rx: UnboundedReceiver<u64>,
135 last_committed_epoch: Option<u64>,
136 ) -> Self {
137 Self {
138 db,
139 sink_id,
140 curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141 job_committed_epoch_rx,
142 last_committed_epoch,
143 pending_epochs: VecDeque::new(),
144 prepared_epochs: VecDeque::new(),
145 backoff_state: None,
146 }
147 }
148
149 #[define_opaque(RetryBackoffStrategy)]
150 fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151 ExponentialBackoff::from_millis(10)
152 .max_delay(Duration::from_secs(60))
153 .map(jitter)
154 .map(|delay| Box::pin(tokio::time::sleep(delay)))
155 }
156
157 async fn next_to_commit(
158 &mut self,
159 ) -> anyhow::Result<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)> {
160 loop {
161 let wait_backoff = async {
162 if self.prepared_epochs.is_empty() {
163 pending::<()>().await;
164 } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
165 backoff_fut.await;
166 }
167 };
168
169 select! {
170 _ = wait_backoff => {
171 let item = self.prepared_epochs.front().cloned().expect("non-empty");
172 return Ok(item);
173 }
174
175 recv_epoch = self.job_committed_epoch_rx.recv() => {
176 let Some(recv_epoch) = recv_epoch else {
177 return Err(anyhow!(
178 "Hummock committed epoch sender closed unexpectedly"
179 ));
180 };
181 self.curr_hummock_committed_epoch = recv_epoch;
182 while let Some((epoch, metadata, schema_change)) = self.pending_epochs.pop_front_if(|(epoch, _, _)| *epoch <= recv_epoch) {
183 if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
184 assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
185 }
186 self.prepared_epochs.push_back((epoch, metadata, schema_change));
187 }
188 }
189 }
190 }
191 }
192
193 fn push_new_item(
194 &mut self,
195 epoch: u64,
196 metadata: Option<Vec<u8>>,
197 schema_change: Option<PbSinkSchemaChange>,
198 ) {
199 if epoch > self.curr_hummock_committed_epoch {
200 if let Some((last_epoch, _, _)) = self.pending_epochs.back() {
201 assert!(
202 epoch > *last_epoch,
203 "pending epochs must be in increasing order"
204 );
205 }
206 self.pending_epochs
207 .push_back((epoch, metadata, schema_change));
208 } else {
209 assert!(self.pending_epochs.is_empty());
210 if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
211 assert!(
212 epoch > *last_epoch,
213 "prepared epochs must be in increasing order"
214 );
215 }
216 self.prepared_epochs
217 .push_back((epoch, metadata, schema_change));
218 }
219 }
220
221 async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
222 self.backoff_state = None;
223 let (last_epoch, _, _) = self.prepared_epochs.pop_front().expect("non-empty");
224 assert_eq!(last_epoch, epoch);
225
226 commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
227 self.last_committed_epoch = Some(epoch);
228 Ok(())
229 }
230
231 fn failed_committed(&mut self, epoch: u64, err: SinkError) {
232 assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
233 if let Some((prev_fut, strategy)) = &mut self.backoff_state {
234 let new_fut = strategy.next().expect("infinite");
235 *prev_fut = new_fut;
236 } else {
237 let mut strategy = Self::get_retry_backoff_strategy();
238 let backoff_fut = strategy.next().expect("infinite");
239 self.backoff_state = Some((backoff_fut, strategy));
240 }
241 tracing::error!(
242 error = %err.as_report(),
243 %self.sink_id,
244 "failed to commit epoch {}, Retrying after backoff",
245 epoch,
246 );
247 }
248
249 fn is_empty(&self) -> bool {
250 self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
251 }
252
253 fn has_uncommitted_schema_change(&self) -> bool {
258 if let Some((_, _, schema_change)) = self.pending_epochs.back() {
259 schema_change.is_some()
260 } else if let Some((_, _, schema_change)) = self.prepared_epochs.back() {
261 schema_change.is_some()
262 } else {
263 false
264 }
265 }
266}
267
268struct CoordinationHandleManager {
269 param: SinkParam,
270 writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
271 next_handle_id: HandleId,
272 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
273}
274
275impl CoordinationHandleManager {
276 fn start(
277 &mut self,
278 log_store_rewind_start_epoch: Option<u64>,
279 handle_ids: impl IntoIterator<Item = HandleId>,
280 ) -> anyhow::Result<()> {
281 for handle_id in handle_ids {
282 let handle = self
283 .writer_handles
284 .get_mut(&handle_id)
285 .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
286 handle.start(log_store_rewind_start_epoch).map_err(|_| {
287 anyhow!(
288 "fail to start {:?} for handle {}",
289 log_store_rewind_start_epoch,
290 handle_id
291 )
292 })?;
293 }
294 Ok(())
295 }
296
297 fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
298 for (handle_id, handle) in &mut self.writer_handles {
299 handle
300 .ack_aligned_initial_epoch(aligned_initial_epoch)
301 .map_err(|_| {
302 anyhow!(
303 "fail to ack_aligned_initial_epoch {:?} for handle {}",
304 aligned_initial_epoch,
305 handle_id
306 )
307 })?;
308 }
309 Ok(())
310 }
311
312 fn ack_commit(
313 &mut self,
314 epoch: u64,
315 handle_ids: impl IntoIterator<Item = HandleId>,
316 ) -> anyhow::Result<()> {
317 for handle_id in handle_ids {
318 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
319 anyhow!(
320 "fail to find handle for {} when ack commit on epoch {}",
321 handle_id,
322 epoch
323 )
324 })?;
325 handle.ack_commit(epoch).map_err(|_| {
326 anyhow!(
327 "fail to ack commit on epoch {} for handle {}",
328 epoch,
329 handle_id
330 )
331 })?;
332 }
333 Ok(())
334 }
335
336 async fn next_request_inner(
337 writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
338 ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
339 poll_fn(|cx| {
340 for (handle_id, handle) in writer_handles.iter_mut() {
341 if let Poll::Ready(result) = handle.poll_next_request(cx) {
342 return Poll::Ready(result.map(|request| (*handle_id, request)));
343 }
344 }
345 Poll::Pending
346 })
347 .await
348 }
349}
350
351enum CoordinationHandleManagerEvent {
352 NewHandle,
353 UpdateVnodeBitmap,
354 Stop,
355 CommitRequest {
356 epoch: u64,
357 metadata: SinkMetadata,
358 schema_change: Option<PbSinkSchemaChange>,
359 },
360 AlignInitialEpoch(u64),
361}
362
363impl CoordinationHandleManagerEvent {
364 fn name(&self) -> &'static str {
365 match self {
366 CoordinationHandleManagerEvent::NewHandle => "NewHandle",
367 CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
368 CoordinationHandleManagerEvent::Stop => "Stop",
369 CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
370 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
371 }
372 }
373}
374
375impl CoordinationHandleManager {
376 async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
377 select! {
378 handle = self.request_rx.recv() => {
379 let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
380 if handle.param() != &self.param {
381 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
382 }
383 let handle_id = self.next_handle_id;
384 self.next_handle_id += 1;
385 self.writer_handles.insert(handle_id, handle);
386 Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
387 }
388 result = Self::next_request_inner(&mut self.writer_handles) => {
389 let (handle_id, request) = result?;
390 let event = match request {
391 coordinate_request::Msg::CommitRequest(request) => {
392 CoordinationHandleManagerEvent::CommitRequest {
393 epoch: request.epoch,
394 metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
395 schema_change: request.schema_change,
396 }
397 }
398 coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
399 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
400 }
401 coordinate_request::Msg::UpdateVnodeRequest(_) => {
402 CoordinationHandleManagerEvent::UpdateVnodeBitmap
403 }
404 coordinate_request::Msg::Stop(_) => {
405 CoordinationHandleManagerEvent::Stop
406 }
407 coordinate_request::Msg::StartRequest(_) => {
408 unreachable!("should have been handled");
409 }
410 };
411 Ok((handle_id, event))
412 }
413 }
414 }
415
416 fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
417 self.writer_handles[&handle_id].vnode_bitmap()
418 }
419
420 fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
421 self.writer_handles
422 .remove(&handle_id)
423 .expect("should exist")
424 .stop()
425 }
426
427 async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
428 assert!(self.writer_handles.is_empty());
429 let mut init_requests = AligningRequests::default();
430 while !init_requests.aligned() {
431 let (handle_id, event) = self.next_event().await?;
432 let unexpected_event = match event {
433 CoordinationHandleManagerEvent::NewHandle => {
434 init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
435 continue;
436 }
437 event => event.name(),
438 };
439 return Err(anyhow!(
440 "expect new handle during init, but got {}",
441 unexpected_event
442 ));
443 }
444 Ok(init_requests.handle_ids)
445 }
446
447 async fn alter_parallelisms(
448 &mut self,
449 altered_handles: impl Iterator<Item = HandleId>,
450 ) -> anyhow::Result<HashSet<HandleId>> {
451 let mut requests = AligningRequests::default();
452 for handle_id in altered_handles {
453 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
454 }
455 let mut remaining_handles: HashSet<_> = self
456 .writer_handles
457 .keys()
458 .filter(|handle_id| !requests.handle_ids.contains(handle_id))
459 .cloned()
460 .collect();
461 while !remaining_handles.is_empty() || !requests.aligned() {
462 let (handle_id, event) = self.next_event().await?;
463 match event {
464 CoordinationHandleManagerEvent::NewHandle => {
465 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
466 }
467 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
468 assert!(remaining_handles.remove(&handle_id));
469 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
470 }
471 CoordinationHandleManagerEvent::Stop => {
472 assert!(remaining_handles.remove(&handle_id));
473 self.stop_handle(handle_id)?;
474 }
475 CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
476 bail!(
477 "receive commit request on epoch {} from handle {} during alter parallelism",
478 epoch,
479 handle_id
480 );
481 }
482 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
483 bail!(
484 "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
485 epoch,
486 handle_id
487 );
488 }
489 }
490 }
491 Ok(requests.handle_ids)
492 }
493}
494
495enum CoordinatorWorkerState {
501 Running,
502 WaitingForFlushed(HashSet<HandleId>),
503}
504
505pub struct CoordinatorWorker {
506 handle_manager: CoordinationHandleManager,
507 last_writer_acked_epoch: Option<u64>,
513 curr_state: CoordinatorWorkerState,
514}
515
516enum CoordinatorWorkerEvent {
517 HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
518 ReadyToCommit(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>),
519}
520
521impl CoordinatorWorker {
522 #[expect(clippy::large_stack_frames)]
523 pub async fn run(
524 param: SinkParam,
525 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
526 db: DatabaseConnection,
527 subscriber: SinkCommittedEpochSubscriber,
528 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
529 ) {
530 let sink = match build_sink(param.clone()) {
531 Ok(sink) => sink,
532 Err(e) => {
533 error!(
534 error = %e.as_report(),
535 "unable to build sink with param {:?}",
536 param
537 );
538 return;
539 }
540 };
541
542 dispatch_sink!(sink, sink, {
543 let coordinator =
544 match Box::pin(sink.new_coordinator(Some(iceberg_compact_stat_sender))).await {
545 Ok(coordinator) => coordinator,
546 Err(e) => {
547 error!(
548 error = %e.as_report(),
549 "unable to build coordinator with param {:?}",
550 param
551 );
552 return;
553 }
554 };
555 Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
556 });
557 }
558
559 pub async fn execute_coordinator(
560 db: DatabaseConnection,
561 param: SinkParam,
562 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
563 coordinator: SinkCommitCoordinator,
564 subscriber: SinkCommittedEpochSubscriber,
565 ) {
566 let mut worker = CoordinatorWorker {
567 handle_manager: CoordinationHandleManager {
568 param,
569 writer_handles: HashMap::new(),
570 next_handle_id: 0,
571 request_rx,
572 },
573 last_writer_acked_epoch: None,
574 curr_state: CoordinatorWorkerState::Running,
575 };
576
577 if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
578 for handle in worker.handle_manager.writer_handles.into_values() {
579 handle.abort(Status::internal(format!(
580 "failed to run coordination: {:?}",
581 e.as_report()
582 )))
583 }
584 }
585 }
586
587 async fn try_handle_init_requests(
588 &mut self,
589 pending_handle_ids: &HashSet<HandleId>,
590 two_phase_handler: &mut TwoPhaseCommitHandler,
591 ) -> anyhow::Result<()> {
592 assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
593 if two_phase_handler.has_uncommitted_schema_change() {
594 self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
596 } else {
597 self.handle_init_requests_impl(pending_handle_ids.clone())
598 .await?;
599 }
600 Ok(())
601 }
602
603 async fn handle_init_requests_impl(
604 &mut self,
605 pending_handle_ids: impl IntoIterator<Item = HandleId>,
606 ) -> anyhow::Result<()> {
607 let log_store_rewind_start_epoch = self.last_writer_acked_epoch;
608 self.handle_manager
609 .start(log_store_rewind_start_epoch, pending_handle_ids)?;
610 if log_store_rewind_start_epoch.is_none() {
611 let mut align_requests = AligningRequests::default();
612 while !align_requests.aligned() {
613 let (handle_id, event) = self.handle_manager.next_event().await?;
614 match event {
615 CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
616 align_requests.add_new_request(
617 handle_id,
618 initial_epoch,
619 self.handle_manager.vnode_bitmap(handle_id),
620 )?;
621 }
622 other => {
623 return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
624 }
625 }
626 }
627 let aligned_initial_epoch = align_requests
628 .requests
629 .into_iter()
630 .max()
631 .expect("non-empty");
632 self.handle_manager
633 .ack_aligned_initial_epoch(aligned_initial_epoch)?;
634 }
635 Ok(())
636 }
637
638 async fn next_event(
639 &mut self,
640 two_phase_handler: &mut TwoPhaseCommitHandler,
641 ) -> anyhow::Result<CoordinatorWorkerEvent> {
642 if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state
643 && two_phase_handler.is_empty()
644 {
645 let pending_handle_ids = pending_handle_ids.clone();
646 self.handle_init_requests_impl(pending_handle_ids).await?;
647 self.curr_state = CoordinatorWorkerState::Running;
648 }
649
650 select! {
651 next_handle_event = self.handle_manager.next_event() => {
652 let (handle_id, event) = next_handle_event?;
653 Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
654 }
655
656 next_item_to_commit = two_phase_handler.next_to_commit() => {
657 let (epoch, metadata, schema_change) = next_item_to_commit?;
658 Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change))
659 }
660 }
661 }
662
663 async fn run_coordination(
664 &mut self,
665 db: DatabaseConnection,
666 mut coordinator: SinkCommitCoordinator,
667 subscriber: SinkCommittedEpochSubscriber,
668 ) -> anyhow::Result<()> {
669 let sink_id = self.handle_manager.param.sink_id;
670
671 let mut two_phase_handler = self
672 .init_state_from_store(&db, sink_id, subscriber, &mut coordinator)
673 .await?;
674 match &mut coordinator {
675 SinkCommitCoordinator::SinglePhase(coordinator) => coordinator.init().await?,
676 SinkCommitCoordinator::TwoPhase(coordinator) => coordinator.init().await?,
677 }
678
679 let mut running_handles = self.handle_manager.wait_init_handles().await?;
680 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
681 .await?;
682
683 let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
684 let mut pending_new_handles = vec![];
685 loop {
686 let event = self.next_event(&mut two_phase_handler).await?;
687 let (handle_id, epoch, commit_request) = match event {
688 CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
689 CoordinationHandleManagerEvent::NewHandle => {
690 pending_new_handles.push(handle_id);
691 continue;
692 }
693 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
694 running_handles = self
695 .handle_manager
696 .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
697 .await?;
698 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
699 .await?;
700 continue;
701 }
702 CoordinationHandleManagerEvent::Stop => {
703 self.handle_manager.stop_handle(handle_id)?;
704 running_handles = self
705 .handle_manager
706 .alter_parallelisms(pending_new_handles.drain(..))
707 .await?;
708 self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
709 .await?;
710
711 continue;
712 }
713 CoordinationHandleManagerEvent::CommitRequest {
714 epoch,
715 metadata,
716 schema_change,
717 } => (handle_id, epoch, (metadata, schema_change)),
718 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
719 bail!("receive AlignInitialEpoch after initialization")
720 }
721 },
722 CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change) => {
723 let start_time = Instant::now();
724 let commit_fut = async {
725 match &mut coordinator {
726 SinkCommitCoordinator::SinglePhase(coordinator) => {
727 assert!(metadata.is_none());
728 if let Some(schema_change) = schema_change {
729 coordinator
730 .commit_schema_change(epoch, schema_change)
731 .instrument_await(Self::commit_span(
732 "single_phase_schema_change",
733 sink_id,
734 epoch,
735 ))
736 .await?;
737 }
738 }
739 SinkCommitCoordinator::TwoPhase(coordinator) => {
740 if let Some(metadata) = metadata {
741 coordinator
742 .commit_data(epoch, metadata)
743 .instrument_await(Self::commit_span(
744 "two_phase_commit_data",
745 sink_id,
746 epoch,
747 ))
748 .await?;
749 }
750 if let Some(schema_change) = schema_change {
751 coordinator
752 .commit_schema_change(epoch, schema_change)
753 .instrument_await(Self::commit_span(
754 "two_phase_commit_schema_change",
755 sink_id,
756 epoch,
757 ))
758 .await?;
759 }
760 }
761 }
762 Ok(())
763 };
764 let commit_res =
765 run_future_with_periodic_fn(commit_fut, Duration::from_secs(5), || {
766 warn!(
767 elapsed = ?start_time.elapsed(),
768 %sink_id,
769 "committing"
770 );
771 })
772 .await;
773
774 match commit_res {
775 Ok(_) => {
776 two_phase_handler.ack_committed(epoch).await?;
777 }
778 Err(e) => {
779 two_phase_handler.failed_committed(epoch, e);
780 }
781 }
782
783 continue;
784 }
785 };
786 if !running_handles.contains(&handle_id) {
787 bail!(
788 "receiving commit request from non-running handle {}, running handles: {:?}",
789 handle_id,
790 running_handles
791 );
792 }
793 pending_epochs.entry(epoch).or_default().add_new_request(
794 handle_id,
795 commit_request,
796 self.handle_manager.vnode_bitmap(handle_id),
797 )?;
798 if pending_epochs
799 .first_key_value()
800 .expect("non-empty")
801 .1
802 .aligned()
803 {
804 let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
805 let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
806 let mut requests = commit_requests.requests.into_iter();
807 let (first_metadata, first_schema_change) = requests.next().expect("non-empty");
808 metadatas.push(first_metadata);
809 for (metadata, schema_change) in requests {
810 if first_schema_change != schema_change {
811 return Err(anyhow!(
812 "got different schema change {:?} to prev schema change {:?}",
813 schema_change,
814 first_schema_change
815 ));
816 }
817 metadatas.push(metadata);
818 }
819
820 match &mut coordinator {
821 SinkCommitCoordinator::SinglePhase(coordinator) => {
822 if !metadatas.is_empty() {
823 let start_time = Instant::now();
824 run_future_with_periodic_fn(
825 coordinator.commit_data(epoch, metadatas).instrument_await(
826 Self::commit_span("single_phase_commit_data", sink_id, epoch),
827 ),
828 Duration::from_secs(5),
829 || {
830 warn!(
831 elapsed = ?start_time.elapsed(),
832 %sink_id,
833 "committing"
834 );
835 },
836 )
837 .await
838 .map_err(|e| anyhow!(e))?;
839 }
840 if first_schema_change.is_some() {
841 persist_pre_commit_metadata(
842 &db,
843 sink_id as _,
844 epoch,
845 None,
846 first_schema_change.as_ref(),
847 )
848 .await?;
849 two_phase_handler.push_new_item(epoch, None, first_schema_change);
850 }
851 }
852 SinkCommitCoordinator::TwoPhase(coordinator) => {
853 let commit_metadata = coordinator
854 .pre_commit(epoch, metadatas, first_schema_change.clone())
855 .instrument_await(Self::commit_span(
856 "two_phase_pre_commit",
857 sink_id,
858 epoch,
859 ))
860 .await?;
861 if commit_metadata.is_some() || first_schema_change.is_some() {
862 persist_pre_commit_metadata(
863 &db,
864 sink_id as _,
865 epoch,
866 commit_metadata.clone(),
867 first_schema_change.as_ref(),
868 )
869 .await?;
870 two_phase_handler.push_new_item(
871 epoch,
872 commit_metadata,
873 first_schema_change,
874 );
875 }
876 }
877 }
878
879 self.handle_manager
880 .ack_commit(epoch, commit_requests.handle_ids)?;
881 self.last_writer_acked_epoch = Some(epoch);
882 }
883 }
884 }
885
886 async fn init_state_from_store(
888 &mut self,
889 db: &DatabaseConnection,
890 sink_id: SinkId,
891 subscriber: SinkCommittedEpochSubscriber,
892 coordinator: &mut SinkCommitCoordinator,
893 ) -> anyhow::Result<TwoPhaseCommitHandler> {
894 let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
895
896 let mut metadata_iter = ordered_metadata.into_iter().peekable();
897 let last_committed_epoch = metadata_iter
898 .next_if(|(_, state, _, _)| matches!(state, SinkState::Committed))
899 .map(|(epoch, _, _, _)| epoch);
900
901 let pending_items = metadata_iter
902 .peeking_take_while(|(_, state, _, _)| matches!(state, SinkState::Pending))
903 .map(|(epoch, _, metadata, schema_change)| (epoch, metadata, schema_change))
904 .collect_vec();
905 self.last_writer_acked_epoch = pending_items
906 .last()
907 .map(|(epoch, _, _)| *epoch)
908 .or(last_committed_epoch);
909
910 let mut aborted_epochs = vec![];
911
912 for (epoch, state, metadata, _) in metadata_iter {
913 match state {
914 SinkState::Aborted => {
915 if let Some(metadata) = metadata
916 && let SinkCommitCoordinator::TwoPhase(coordinator) = coordinator
917 {
918 coordinator.abort(epoch, metadata).await;
919 }
920 aborted_epochs.push(epoch);
921 }
922 other => {
923 unreachable!(
924 "unexpected state {:?} after pending items at epoch {}",
925 other, epoch
926 );
927 }
928 }
929 }
930
931 clean_aborted_records(db, sink_id, aborted_epochs).await?;
933
934 let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
935 let mut two_phase_handler = TwoPhaseCommitHandler::new(
936 db.clone(),
937 sink_id,
938 initial_hummock_committed_epoch,
939 job_committed_epoch_rx,
940 last_committed_epoch,
941 );
942
943 for (epoch, metadata, schema_change) in pending_items {
944 two_phase_handler.push_new_item(epoch, metadata, schema_change);
945 }
946
947 Ok(two_phase_handler)
948 }
949
950 fn commit_span(stage: &str, sink_id: SinkId, epoch: u64) -> await_tree::Span {
951 await_tree::span!("sink_coord_{stage} (sink_id {sink_id}, epoch {epoch})").long_running()
952 }
953}