risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use futures::future::{Either, pending, select};
24use futures::pin_mut;
25use itertools::Itertools;
26use risingwave_common::bail;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::Field;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::boxed::BoxTwoPhaseCoordinator;
32use risingwave_connector::sink::catalog::SinkId;
33use risingwave_connector::sink::{
34    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
35};
36use risingwave_meta_model::pending_sink_state::SinkState;
37use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48    clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49    persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54    future: F,
55    interval: Duration,
56    mut f: impl FnMut(),
57) -> F::Output {
58    pin_mut!(future);
59    loop {
60        match select(&mut future, pin!(sleep(interval))).await {
61            Either::Left((output, _)) => {
62                break output;
63            }
64            Either::Right(_) => f(),
65        }
66    }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73    requests: Vec<R>,
74    handle_ids: HashSet<HandleId>,
75    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
76}
77
78impl<R> AligningRequests<R> {
79    fn add_new_request(
80        &mut self,
81        handle_id: HandleId,
82        request: R,
83        vnode_bitmap: &Bitmap,
84    ) -> anyhow::Result<()>
85    where
86        R: Debug,
87    {
88        let committed_bitmap = self
89            .committed_bitmap
90            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93        let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94        if check_bitmap.count_ones() > 0 {
95            return Err(anyhow!(
96                "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97                check_bitmap.iter_ones().collect_vec(),
98                vnode_bitmap,
99                committed_bitmap,
100                self.requests,
101                request
102            ));
103        }
104        *committed_bitmap |= vnode_bitmap;
105        self.requests.push(request);
106        assert!(self.handle_ids.insert(handle_id));
107        Ok(())
108    }
109
110    fn aligned(&self) -> bool {
111        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112    }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119    db: DatabaseConnection,
120    sink_id: SinkId,
121    curr_hummock_committed_epoch: u64,
122    job_committed_epoch_rx: UnboundedReceiver<u64>,
123    last_committed_epoch: Option<u64>,
124    pending_epochs: VecDeque<(u64, Vec<u8>)>,
125    prepared_epochs: VecDeque<(u64, Vec<u8>)>,
126    backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130    fn new(
131        db: DatabaseConnection,
132        sink_id: SinkId,
133        initial_hummock_committed_epoch: u64,
134        job_committed_epoch_rx: UnboundedReceiver<u64>,
135        last_committed_epoch: Option<u64>,
136    ) -> Self {
137        Self {
138            db,
139            sink_id,
140            curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141            job_committed_epoch_rx,
142            last_committed_epoch,
143            pending_epochs: VecDeque::new(),
144            prepared_epochs: VecDeque::new(),
145            backoff_state: None,
146        }
147    }
148
149    #[define_opaque(RetryBackoffStrategy)]
150    fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151        ExponentialBackoff::from_millis(10)
152            .max_delay(Duration::from_secs(60))
153            .map(jitter)
154            .map(|delay| Box::pin(tokio::time::sleep(delay)))
155    }
156
157    async fn next_to_commit(&mut self) -> anyhow::Result<(u64, Vec<u8>)> {
158        loop {
159            let wait_backoff = async {
160                if self.prepared_epochs.is_empty() {
161                    pending::<()>().await;
162                } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
163                    backoff_fut.await;
164                }
165            };
166
167            select! {
168                _ = wait_backoff => {
169                    let (epoch, metadata) = self.prepared_epochs.front().cloned().expect("non-empty");
170                    return Ok((epoch, metadata));
171                }
172
173                recv_epoch = self.job_committed_epoch_rx.recv() => {
174                    let Some(recv_epoch) = recv_epoch else {
175                        return Err(anyhow!(
176                            "Hummock committed epoch sender closed unexpectedly"
177                        ));
178                    };
179                    self.curr_hummock_committed_epoch = recv_epoch;
180                    while let Some((epoch, metadata)) = self.pending_epochs.pop_front_if(|(epoch, _)| *epoch <= recv_epoch) {
181                        if let Some((last_epoch, _)) = self.prepared_epochs.back() {
182                            assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
183                        }
184                        self.prepared_epochs.push_back((epoch, metadata));
185                    }
186                }
187            }
188        }
189    }
190
191    fn push_new_item(&mut self, epoch: u64, metadata: Vec<u8>) {
192        if epoch > self.curr_hummock_committed_epoch {
193            if let Some((last_epoch, _)) = self.pending_epochs.back() {
194                assert!(
195                    epoch > *last_epoch,
196                    "pending epochs must be in increasing order"
197                );
198            }
199            self.pending_epochs.push_back((epoch, metadata));
200        } else {
201            assert!(self.pending_epochs.is_empty());
202            if let Some((last_epoch, _)) = self.prepared_epochs.back() {
203                assert!(
204                    epoch > *last_epoch,
205                    "prepared epochs must be in increasing order"
206                );
207            }
208            self.prepared_epochs.push_back((epoch, metadata));
209        }
210    }
211
212    async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
213        self.backoff_state = None;
214        let (last_epoch, _) = self.prepared_epochs.pop_front().expect("non-empty");
215        assert_eq!(last_epoch, epoch);
216
217        commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
218        self.last_committed_epoch = Some(epoch);
219        Ok(())
220    }
221
222    fn failed_committed(&mut self, epoch: u64, err: SinkError) {
223        assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
224        if let Some((prev_fut, strategy)) = &mut self.backoff_state {
225            let new_fut = strategy.next().expect("infinite");
226            *prev_fut = new_fut;
227        } else {
228            let mut strategy = Self::get_retry_backoff_strategy();
229            let backoff_fut = strategy.next().expect("infinite");
230            self.backoff_state = Some((backoff_fut, strategy));
231        }
232        tracing::error!(
233            error = %err.as_report(),
234            %self.sink_id,
235            "failed to commit epoch {}, Retrying after backoff",
236            epoch,
237        );
238    }
239
240    fn is_empty(&self) -> bool {
241        self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
242    }
243}
244
245struct CoordinationHandleManager {
246    param: SinkParam,
247    writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
248    next_handle_id: HandleId,
249    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
250}
251
252impl CoordinationHandleManager {
253    fn start(
254        &mut self,
255        log_store_rewind_start_epoch: Option<u64>,
256        handle_ids: impl IntoIterator<Item = HandleId>,
257    ) -> anyhow::Result<()> {
258        for handle_id in handle_ids {
259            let handle = self
260                .writer_handles
261                .get_mut(&handle_id)
262                .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
263            handle.start(log_store_rewind_start_epoch).map_err(|_| {
264                anyhow!(
265                    "fail to start {:?} for handle {}",
266                    log_store_rewind_start_epoch,
267                    handle_id
268                )
269            })?;
270        }
271        Ok(())
272    }
273
274    fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
275        for (handle_id, handle) in &mut self.writer_handles {
276            handle
277                .ack_aligned_initial_epoch(aligned_initial_epoch)
278                .map_err(|_| {
279                    anyhow!(
280                        "fail to ack_aligned_initial_epoch {:?} for handle {}",
281                        aligned_initial_epoch,
282                        handle_id
283                    )
284                })?;
285        }
286        Ok(())
287    }
288
289    fn ack_commit(
290        &mut self,
291        epoch: u64,
292        handle_ids: impl IntoIterator<Item = HandleId>,
293    ) -> anyhow::Result<()> {
294        for handle_id in handle_ids {
295            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
296                anyhow!(
297                    "fail to find handle for {} when ack commit on epoch {}",
298                    handle_id,
299                    epoch
300                )
301            })?;
302            handle.ack_commit(epoch).map_err(|_| {
303                anyhow!(
304                    "fail to ack commit on epoch {} for handle {}",
305                    epoch,
306                    handle_id
307                )
308            })?;
309        }
310        Ok(())
311    }
312
313    async fn next_request_inner(
314        writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
315    ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
316        poll_fn(|cx| {
317            for (handle_id, handle) in writer_handles.iter_mut() {
318                if let Poll::Ready(result) = handle.poll_next_request(cx) {
319                    return Poll::Ready(result.map(|request| (*handle_id, request)));
320                }
321            }
322            Poll::Pending
323        })
324        .await
325    }
326}
327
328enum CoordinationHandleManagerEvent {
329    NewHandle,
330    UpdateVnodeBitmap,
331    Stop,
332    CommitRequest {
333        epoch: u64,
334        metadata: SinkMetadata,
335        add_columns: Option<Vec<Field>>,
336    },
337    AlignInitialEpoch(u64),
338}
339
340impl CoordinationHandleManagerEvent {
341    fn name(&self) -> &'static str {
342        match self {
343            CoordinationHandleManagerEvent::NewHandle => "NewHandle",
344            CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
345            CoordinationHandleManagerEvent::Stop => "Stop",
346            CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
347            CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
348        }
349    }
350}
351
352impl CoordinationHandleManager {
353    async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
354        select! {
355            handle = self.request_rx.recv() => {
356                let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
357                if handle.param() != &self.param {
358                    warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
359                }
360                let handle_id = self.next_handle_id;
361                self.next_handle_id += 1;
362                self.writer_handles.insert(handle_id, handle);
363                Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
364            }
365            result = Self::next_request_inner(&mut self.writer_handles) => {
366                let (handle_id, request) = result?;
367                let event = match request {
368                    coordinate_request::Msg::CommitRequest(request) => {
369                        CoordinationHandleManagerEvent::CommitRequest {
370                            epoch: request.epoch,
371                            metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
372                            add_columns: request.add_columns.map(|add_columns| add_columns.fields.into_iter().map(|field| Field::from_prost(&field)).collect()),
373                        }
374                    }
375                    coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
376                        CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
377                    }
378                    coordinate_request::Msg::UpdateVnodeRequest(_) => {
379                        CoordinationHandleManagerEvent::UpdateVnodeBitmap
380                    }
381                    coordinate_request::Msg::Stop(_) => {
382                        CoordinationHandleManagerEvent::Stop
383                    }
384                    coordinate_request::Msg::StartRequest(_) => {
385                        unreachable!("should have been handled");
386                    }
387                };
388                Ok((handle_id, event))
389            }
390        }
391    }
392
393    fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
394        self.writer_handles[&handle_id].vnode_bitmap()
395    }
396
397    fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
398        self.writer_handles
399            .remove(&handle_id)
400            .expect("should exist")
401            .stop()
402    }
403
404    async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
405        assert!(self.writer_handles.is_empty());
406        let mut init_requests = AligningRequests::default();
407        while !init_requests.aligned() {
408            let (handle_id, event) = self.next_event().await?;
409            let unexpected_event = match event {
410                CoordinationHandleManagerEvent::NewHandle => {
411                    init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
412                    continue;
413                }
414                event => event.name(),
415            };
416            return Err(anyhow!(
417                "expect new handle during init, but got {}",
418                unexpected_event
419            ));
420        }
421        Ok(init_requests.handle_ids)
422    }
423
424    async fn alter_parallelisms(
425        &mut self,
426        altered_handles: impl Iterator<Item = HandleId>,
427    ) -> anyhow::Result<HashSet<HandleId>> {
428        let mut requests = AligningRequests::default();
429        for handle_id in altered_handles {
430            requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
431        }
432        let mut remaining_handles: HashSet<_> = self
433            .writer_handles
434            .keys()
435            .filter(|handle_id| !requests.handle_ids.contains(handle_id))
436            .cloned()
437            .collect();
438        while !remaining_handles.is_empty() || !requests.aligned() {
439            let (handle_id, event) = self.next_event().await?;
440            match event {
441                CoordinationHandleManagerEvent::NewHandle => {
442                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
443                }
444                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
445                    assert!(remaining_handles.remove(&handle_id));
446                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
447                }
448                CoordinationHandleManagerEvent::Stop => {
449                    assert!(remaining_handles.remove(&handle_id));
450                    self.stop_handle(handle_id)?;
451                }
452                CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
453                    bail!(
454                        "receive commit request on epoch {} from handle {} during alter parallelism",
455                        epoch,
456                        handle_id
457                    );
458                }
459                CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
460                    bail!(
461                        "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
462                        epoch,
463                        handle_id
464                    );
465                }
466            }
467        }
468        Ok(requests.handle_ids)
469    }
470}
471
472/// Represents the coordinator worker's state machine for handling schema changes.
473///
474/// - `Running`: Normal operation, handles can be started immediately
475/// - `WaitingForFlushed`: Waiting for all pending two-phase commits to complete before starting new handles. This
476///   ensures new sink executors load the correct schema.
477enum CoordinatorWorkerState {
478    Running,
479    WaitingForFlushed(HashSet<HandleId>),
480}
481
482pub struct CoordinatorWorker {
483    handle_manager: CoordinationHandleManager,
484    prev_committed_epoch: Option<u64>,
485    curr_state: CoordinatorWorkerState,
486}
487
488enum CoordinatorWorkerEvent {
489    HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
490    ReadyToCommit(u64, Vec<u8>),
491}
492
493impl CoordinatorWorker {
494    pub async fn run(
495        param: SinkParam,
496        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
497        db: DatabaseConnection,
498        subscriber: SinkCommittedEpochSubscriber,
499        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
500    ) {
501        let sink = match build_sink(param.clone()) {
502            Ok(sink) => sink,
503            Err(e) => {
504                error!(
505                    error = %e.as_report(),
506                    "unable to build sink with param {:?}",
507                    param
508                );
509                return;
510            }
511        };
512
513        dispatch_sink!(sink, sink, {
514            let coordinator = match sink
515                .new_coordinator(Some(iceberg_compact_stat_sender))
516                .await
517            {
518                Ok(coordinator) => coordinator,
519                Err(e) => {
520                    error!(
521                        error = %e.as_report(),
522                        "unable to build coordinator with param {:?}",
523                        param
524                    );
525                    return;
526                }
527            };
528            Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
529        });
530    }
531
532    pub async fn execute_coordinator(
533        db: DatabaseConnection,
534        param: SinkParam,
535        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
536        coordinator: SinkCommitCoordinator,
537        subscriber: SinkCommittedEpochSubscriber,
538    ) {
539        let mut worker = CoordinatorWorker {
540            handle_manager: CoordinationHandleManager {
541                param,
542                writer_handles: HashMap::new(),
543                next_handle_id: 0,
544                request_rx,
545            },
546            prev_committed_epoch: None,
547            curr_state: CoordinatorWorkerState::Running,
548        };
549
550        if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
551            for handle in worker.handle_manager.writer_handles.into_values() {
552                handle.abort(Status::internal(format!(
553                    "failed to run coordination: {:?}",
554                    e.as_report()
555                )))
556            }
557        }
558    }
559
560    async fn try_handle_init_requests(
561        &mut self,
562        pending_handle_ids: &HashSet<HandleId>,
563        two_phase_handler: Option<&mut TwoPhaseCommitHandler>,
564    ) -> anyhow::Result<()> {
565        assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
566        if let Some(two_phase_handler) = two_phase_handler
567            && !two_phase_handler.is_empty()
568        {
569            // Delay handling init requests until all pending epochs are flushed.
570            self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
571        } else {
572            self.handle_init_requests_impl(pending_handle_ids.clone())
573                .await?;
574        }
575        Ok(())
576    }
577
578    async fn handle_init_requests_impl(
579        &mut self,
580        pending_handle_ids: impl IntoIterator<Item = HandleId>,
581    ) -> anyhow::Result<()> {
582        let log_store_rewind_start_epoch = self.prev_committed_epoch;
583        self.handle_manager
584            .start(log_store_rewind_start_epoch, pending_handle_ids)?;
585        if log_store_rewind_start_epoch.is_none() {
586            let mut align_requests = AligningRequests::default();
587            while !align_requests.aligned() {
588                let (handle_id, event) = self.handle_manager.next_event().await?;
589                match event {
590                    CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
591                        align_requests.add_new_request(
592                            handle_id,
593                            initial_epoch,
594                            self.handle_manager.vnode_bitmap(handle_id),
595                        )?;
596                    }
597                    other => {
598                        return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
599                    }
600                }
601            }
602            let aligned_initial_epoch = align_requests
603                .requests
604                .into_iter()
605                .max()
606                .expect("non-empty");
607            self.handle_manager
608                .ack_aligned_initial_epoch(aligned_initial_epoch)?;
609        }
610        Ok(())
611    }
612
613    async fn next_event(
614        &mut self,
615        mut two_phase_handler: Option<&mut TwoPhaseCommitHandler>,
616    ) -> anyhow::Result<CoordinatorWorkerEvent> {
617        if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state {
618            let handler = two_phase_handler
619                .as_mut()
620                .expect("two-phase handler should exist when waiting for flush");
621            if handler.is_empty() {
622                let pending_handle_ids = pending_handle_ids.clone();
623                self.handle_init_requests_impl(pending_handle_ids).await?;
624                self.curr_state = CoordinatorWorkerState::Running;
625            }
626        }
627
628        // For single-phase coordinator, there is no need to wait.
629        let two_phase_next_fut = async {
630            if let Some(handler) = two_phase_handler {
631                handler.next_to_commit().await
632            } else {
633                pending().await
634            }
635        };
636        select! {
637            next_handle_event = self.handle_manager.next_event() => {
638                let (handle_id, event) = next_handle_event?;
639                Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
640            }
641
642            next_item_to_commit = two_phase_next_fut => {
643                let (epoch, metadata) = next_item_to_commit?;
644                Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata))
645            }
646        }
647    }
648
649    async fn run_coordination(
650        &mut self,
651        db: DatabaseConnection,
652        mut coordinator: SinkCommitCoordinator,
653        subscriber: SinkCommittedEpochSubscriber,
654    ) -> anyhow::Result<()> {
655        let sink_id = self.handle_manager.param.sink_id;
656
657        let mut two_phase_handler = match &mut coordinator {
658            SinkCommitCoordinator::SinglePhase(coordinator) => {
659                coordinator.init().await?;
660                None
661            }
662            SinkCommitCoordinator::TwoPhase(coordinator) => {
663                let two_phase_handler = self
664                    .init_state_from_store(&db, sink_id, subscriber, coordinator)
665                    .await?;
666                coordinator.init().await?;
667                Some(two_phase_handler)
668            }
669        };
670
671        let mut running_handles = self.handle_manager.wait_init_handles().await?;
672        self.try_handle_init_requests(&running_handles, two_phase_handler.as_mut())
673            .await?;
674
675        let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
676        let mut pending_new_handles = vec![];
677        loop {
678            let event = self.next_event(two_phase_handler.as_mut()).await?;
679            let (handle_id, epoch, commit_request) = match event {
680                CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
681                    CoordinationHandleManagerEvent::NewHandle => {
682                        pending_new_handles.push(handle_id);
683                        continue;
684                    }
685                    CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
686                        running_handles = self
687                            .handle_manager
688                            .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
689                            .await?;
690                        self.try_handle_init_requests(&running_handles, two_phase_handler.as_mut())
691                            .await?;
692                        continue;
693                    }
694                    CoordinationHandleManagerEvent::Stop => {
695                        self.handle_manager.stop_handle(handle_id)?;
696                        running_handles = self
697                            .handle_manager
698                            .alter_parallelisms(pending_new_handles.drain(..))
699                            .await?;
700                        self.try_handle_init_requests(&running_handles, two_phase_handler.as_mut())
701                            .await?;
702
703                        continue;
704                    }
705                    CoordinationHandleManagerEvent::CommitRequest {
706                        epoch,
707                        metadata,
708                        add_columns,
709                    } => (handle_id, epoch, (metadata, add_columns)),
710                    CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
711                        bail!("receive AlignInitialEpoch after initialization")
712                    }
713                },
714                CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata) => {
715                    let SinkCommitCoordinator::TwoPhase(coordinator) = &mut coordinator else {
716                        unreachable!("should be two-phase commit coordinator");
717                    };
718                    let two_phase_handler = two_phase_handler.as_mut().expect("should exist");
719                    let start_time = Instant::now();
720                    let commit_res = run_future_with_periodic_fn(
721                        coordinator.commit(epoch, metadata),
722                        Duration::from_secs(5),
723                        || {
724                            warn!(
725                                elapsed = ?start_time.elapsed(),
726                                %sink_id,
727                                "committing"
728                            );
729                        },
730                    )
731                    .await;
732
733                    match commit_res {
734                        Ok(_) => {
735                            two_phase_handler.ack_committed(epoch).await?;
736                            self.prev_committed_epoch = Some(epoch);
737                        }
738                        Err(e) => {
739                            two_phase_handler.failed_committed(epoch, e);
740                        }
741                    }
742
743                    continue;
744                }
745            };
746            if !running_handles.contains(&handle_id) {
747                bail!(
748                    "receiving commit request from non-running handle {}, running handles: {:?}",
749                    handle_id,
750                    running_handles
751                );
752            }
753            pending_epochs.entry(epoch).or_default().add_new_request(
754                handle_id,
755                commit_request,
756                self.handle_manager.vnode_bitmap(handle_id),
757            )?;
758            if pending_epochs
759                .first_key_value()
760                .expect("non-empty")
761                .1
762                .aligned()
763            {
764                let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
765                let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
766                let mut requests = commit_requests.requests.into_iter();
767                let (first_metadata, first_add_columns) = requests.next().expect("non-empty");
768                metadatas.push(first_metadata);
769                for (metadata, add_columns) in requests {
770                    if first_add_columns != add_columns {
771                        return Err(anyhow!(
772                            "got different add columns {:?} to prev add columns {:?}",
773                            add_columns,
774                            first_add_columns
775                        ));
776                    }
777                    metadatas.push(metadata);
778                }
779
780                match &mut coordinator {
781                    SinkCommitCoordinator::SinglePhase(coordinator) => {
782                        let start_time = Instant::now();
783                        run_future_with_periodic_fn(
784                            coordinator.commit(epoch, metadatas, first_add_columns),
785                            Duration::from_secs(5),
786                            || {
787                                warn!(
788                                    elapsed = ?start_time.elapsed(),
789                                    %sink_id,
790                                    "committing"
791                                );
792                            },
793                        )
794                        .await
795                        .map_err(|e| anyhow!(e))?;
796                        self.handle_manager
797                            .ack_commit(epoch, commit_requests.handle_ids)?;
798                        self.prev_committed_epoch = Some(epoch);
799                    }
800                    SinkCommitCoordinator::TwoPhase(coordinator) => {
801                        let commit_metadata = coordinator
802                            .pre_commit(epoch, metadatas, first_add_columns)
803                            .await?;
804                        persist_pre_commit_metadata(
805                            &db,
806                            sink_id as _,
807                            epoch,
808                            commit_metadata.clone(),
809                        )
810                        .await?;
811                        self.handle_manager
812                            .ack_commit(epoch, commit_requests.handle_ids)?;
813
814                        let two_phase_handler = two_phase_handler.as_mut().expect("should exist");
815                        two_phase_handler.push_new_item(epoch, commit_metadata);
816                    }
817                }
818            }
819        }
820    }
821
822    /// Return `TwoPhaseCommitHandler` initialized from the persisted state in the meta store.
823    async fn init_state_from_store(
824        &mut self,
825        db: &DatabaseConnection,
826        sink_id: SinkId,
827        subscriber: SinkCommittedEpochSubscriber,
828        coordinator: &mut BoxTwoPhaseCoordinator,
829    ) -> anyhow::Result<TwoPhaseCommitHandler> {
830        let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
831
832        let mut metadata_iter = ordered_metadata.into_iter().peekable();
833        let last_committed_epoch = metadata_iter
834            .next_if(|(_, state, _)| matches!(state, SinkState::Committed))
835            .map(|(epoch, _, _)| epoch);
836        self.prev_committed_epoch = last_committed_epoch;
837
838        let pending_items = metadata_iter
839            .peeking_take_while(|(_, state, _)| matches!(state, SinkState::Pending))
840            .map(|(epoch, _, metadata)| (epoch, metadata))
841            .collect_vec();
842
843        let mut aborted_epochs = vec![];
844
845        for (epoch, state, metadata) in metadata_iter {
846            match state {
847                SinkState::Aborted => {
848                    coordinator.abort(epoch, metadata).await;
849                    aborted_epochs.push(epoch);
850                }
851                other => {
852                    unreachable!(
853                        "unexpected state {:?} after pending items at epoch {}",
854                        other, epoch
855                    );
856                }
857            }
858        }
859
860        // Records for all aborted epochs and previously committed epochs are no longer needed.
861        clean_aborted_records(db, sink_id, aborted_epochs).await?;
862
863        let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
864        let mut two_phase_handler = TwoPhaseCommitHandler::new(
865            db.clone(),
866            sink_id,
867            initial_hummock_committed_epoch,
868            job_committed_epoch_rx,
869            last_committed_epoch,
870        );
871
872        for (epoch, metadata) in pending_items {
873            two_phase_handler.push_new_item(epoch, metadata);
874        }
875
876        Ok(two_phase_handler)
877    }
878}