risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::{Either, pending, select};
25use futures::pin_mut;
26use itertools::Itertools;
27use risingwave_common::bail;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::SinkId;
32use risingwave_connector::sink::{
33    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
34};
35use risingwave_meta_model::pending_sink_state::SinkState;
36use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
37use risingwave_pb::stream_plan::PbSinkSchemaChange;
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48    clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49    persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54    future: F,
55    interval: Duration,
56    mut f: impl FnMut(),
57) -> F::Output {
58    pin_mut!(future);
59    loop {
60        match select(&mut future, pin!(sleep(interval))).await {
61            Either::Left((output, _)) => {
62                break output;
63            }
64            Either::Right(_) => f(),
65        }
66    }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73    requests: Vec<R>,
74    handle_ids: HashSet<HandleId>,
75    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
76}
77
78impl<R> AligningRequests<R> {
79    fn add_new_request(
80        &mut self,
81        handle_id: HandleId,
82        request: R,
83        vnode_bitmap: &Bitmap,
84    ) -> anyhow::Result<()>
85    where
86        R: Debug,
87    {
88        let committed_bitmap = self
89            .committed_bitmap
90            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93        let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94        if check_bitmap.count_ones() > 0 {
95            return Err(anyhow!(
96                "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97                check_bitmap.iter_ones().collect_vec(),
98                vnode_bitmap,
99                committed_bitmap,
100                self.requests,
101                request
102            ));
103        }
104        *committed_bitmap |= vnode_bitmap;
105        self.requests.push(request);
106        assert!(self.handle_ids.insert(handle_id));
107        Ok(())
108    }
109
110    fn aligned(&self) -> bool {
111        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112    }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119    db: DatabaseConnection,
120    sink_id: SinkId,
121    curr_hummock_committed_epoch: u64,
122    job_committed_epoch_rx: UnboundedReceiver<u64>,
123    last_committed_epoch: Option<u64>,
124    pending_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
125    prepared_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
126    backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130    fn new(
131        db: DatabaseConnection,
132        sink_id: SinkId,
133        initial_hummock_committed_epoch: u64,
134        job_committed_epoch_rx: UnboundedReceiver<u64>,
135        last_committed_epoch: Option<u64>,
136    ) -> Self {
137        Self {
138            db,
139            sink_id,
140            curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141            job_committed_epoch_rx,
142            last_committed_epoch,
143            pending_epochs: VecDeque::new(),
144            prepared_epochs: VecDeque::new(),
145            backoff_state: None,
146        }
147    }
148
149    #[define_opaque(RetryBackoffStrategy)]
150    fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151        ExponentialBackoff::from_millis(10)
152            .max_delay(Duration::from_secs(60))
153            .map(jitter)
154            .map(|delay| Box::pin(tokio::time::sleep(delay)))
155    }
156
157    async fn next_to_commit(
158        &mut self,
159    ) -> anyhow::Result<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)> {
160        loop {
161            let wait_backoff = async {
162                if self.prepared_epochs.is_empty() {
163                    pending::<()>().await;
164                } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
165                    backoff_fut.await;
166                }
167            };
168
169            select! {
170                _ = wait_backoff => {
171                    let item = self.prepared_epochs.front().cloned().expect("non-empty");
172                    return Ok(item);
173                }
174
175                recv_epoch = self.job_committed_epoch_rx.recv() => {
176                    let Some(recv_epoch) = recv_epoch else {
177                        return Err(anyhow!(
178                            "Hummock committed epoch sender closed unexpectedly"
179                        ));
180                    };
181                    self.curr_hummock_committed_epoch = recv_epoch;
182                    while let Some((epoch, metadata, schema_change)) = self.pending_epochs.pop_front_if(|(epoch, _, _)| *epoch <= recv_epoch) {
183                        if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
184                            assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
185                        }
186                        self.prepared_epochs.push_back((epoch, metadata, schema_change));
187                    }
188                }
189            }
190        }
191    }
192
193    fn push_new_item(
194        &mut self,
195        epoch: u64,
196        metadata: Option<Vec<u8>>,
197        schema_change: Option<PbSinkSchemaChange>,
198    ) {
199        if epoch > self.curr_hummock_committed_epoch {
200            if let Some((last_epoch, _, _)) = self.pending_epochs.back() {
201                assert!(
202                    epoch > *last_epoch,
203                    "pending epochs must be in increasing order"
204                );
205            }
206            self.pending_epochs
207                .push_back((epoch, metadata, schema_change));
208        } else {
209            assert!(self.pending_epochs.is_empty());
210            if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
211                assert!(
212                    epoch > *last_epoch,
213                    "prepared epochs must be in increasing order"
214                );
215            }
216            self.prepared_epochs
217                .push_back((epoch, metadata, schema_change));
218        }
219    }
220
221    async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
222        self.backoff_state = None;
223        let (last_epoch, _, _) = self.prepared_epochs.pop_front().expect("non-empty");
224        assert_eq!(last_epoch, epoch);
225
226        commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
227        self.last_committed_epoch = Some(epoch);
228        Ok(())
229    }
230
231    fn failed_committed(&mut self, epoch: u64, err: SinkError) {
232        assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
233        if let Some((prev_fut, strategy)) = &mut self.backoff_state {
234            let new_fut = strategy.next().expect("infinite");
235            *prev_fut = new_fut;
236        } else {
237            let mut strategy = Self::get_retry_backoff_strategy();
238            let backoff_fut = strategy.next().expect("infinite");
239            self.backoff_state = Some((backoff_fut, strategy));
240        }
241        tracing::error!(
242            error = %err.as_report(),
243            %self.sink_id,
244            "failed to commit epoch {}, Retrying after backoff",
245            epoch,
246        );
247    }
248
249    fn is_empty(&self) -> bool {
250        self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
251    }
252
253    /// Whether there exists an uncommitted schema change.
254    ///
255    /// Per current design, if a `schema_change` exists, it should be attached to the latest
256    /// uncommitted item across `pending_epochs` and `prepared_epochs`.
257    fn has_uncommitted_schema_change(&self) -> bool {
258        if let Some((_, _, schema_change)) = self.pending_epochs.back() {
259            schema_change.is_some()
260        } else if let Some((_, _, schema_change)) = self.prepared_epochs.back() {
261            schema_change.is_some()
262        } else {
263            false
264        }
265    }
266}
267
268struct CoordinationHandleManager {
269    param: SinkParam,
270    writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
271    next_handle_id: HandleId,
272    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
273}
274
275impl CoordinationHandleManager {
276    fn start(
277        &mut self,
278        log_store_rewind_start_epoch: Option<u64>,
279        handle_ids: impl IntoIterator<Item = HandleId>,
280    ) -> anyhow::Result<()> {
281        for handle_id in handle_ids {
282            let handle = self
283                .writer_handles
284                .get_mut(&handle_id)
285                .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
286            handle.start(log_store_rewind_start_epoch).map_err(|_| {
287                anyhow!(
288                    "fail to start {:?} for handle {}",
289                    log_store_rewind_start_epoch,
290                    handle_id
291                )
292            })?;
293        }
294        Ok(())
295    }
296
297    fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
298        for (handle_id, handle) in &mut self.writer_handles {
299            handle
300                .ack_aligned_initial_epoch(aligned_initial_epoch)
301                .map_err(|_| {
302                    anyhow!(
303                        "fail to ack_aligned_initial_epoch {:?} for handle {}",
304                        aligned_initial_epoch,
305                        handle_id
306                    )
307                })?;
308        }
309        Ok(())
310    }
311
312    fn ack_commit(
313        &mut self,
314        epoch: u64,
315        handle_ids: impl IntoIterator<Item = HandleId>,
316    ) -> anyhow::Result<()> {
317        for handle_id in handle_ids {
318            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
319                anyhow!(
320                    "fail to find handle for {} when ack commit on epoch {}",
321                    handle_id,
322                    epoch
323                )
324            })?;
325            handle.ack_commit(epoch).map_err(|_| {
326                anyhow!(
327                    "fail to ack commit on epoch {} for handle {}",
328                    epoch,
329                    handle_id
330                )
331            })?;
332        }
333        Ok(())
334    }
335
336    async fn next_request_inner(
337        writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
338    ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
339        poll_fn(|cx| {
340            for (handle_id, handle) in writer_handles.iter_mut() {
341                if let Poll::Ready(result) = handle.poll_next_request(cx) {
342                    return Poll::Ready(result.map(|request| (*handle_id, request)));
343                }
344            }
345            Poll::Pending
346        })
347        .await
348    }
349}
350
351enum CoordinationHandleManagerEvent {
352    NewHandle,
353    UpdateVnodeBitmap,
354    Stop,
355    CommitRequest {
356        epoch: u64,
357        metadata: SinkMetadata,
358        schema_change: Option<PbSinkSchemaChange>,
359    },
360    AlignInitialEpoch(u64),
361}
362
363impl CoordinationHandleManagerEvent {
364    fn name(&self) -> &'static str {
365        match self {
366            CoordinationHandleManagerEvent::NewHandle => "NewHandle",
367            CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
368            CoordinationHandleManagerEvent::Stop => "Stop",
369            CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
370            CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
371        }
372    }
373}
374
375impl CoordinationHandleManager {
376    async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
377        select! {
378            handle = self.request_rx.recv() => {
379                let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
380                if handle.param() != &self.param {
381                    warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
382                }
383                let handle_id = self.next_handle_id;
384                self.next_handle_id += 1;
385                self.writer_handles.insert(handle_id, handle);
386                Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
387            }
388            result = Self::next_request_inner(&mut self.writer_handles) => {
389                let (handle_id, request) = result?;
390                let event = match request {
391                    coordinate_request::Msg::CommitRequest(request) => {
392                        CoordinationHandleManagerEvent::CommitRequest {
393                            epoch: request.epoch,
394                            metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
395                            schema_change: request.schema_change,
396                        }
397                    }
398                    coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
399                        CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
400                    }
401                    coordinate_request::Msg::UpdateVnodeRequest(_) => {
402                        CoordinationHandleManagerEvent::UpdateVnodeBitmap
403                    }
404                    coordinate_request::Msg::Stop(_) => {
405                        CoordinationHandleManagerEvent::Stop
406                    }
407                    coordinate_request::Msg::StartRequest(_) => {
408                        unreachable!("should have been handled");
409                    }
410                };
411                Ok((handle_id, event))
412            }
413        }
414    }
415
416    fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
417        self.writer_handles[&handle_id].vnode_bitmap()
418    }
419
420    fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
421        self.writer_handles
422            .remove(&handle_id)
423            .expect("should exist")
424            .stop()
425    }
426
427    async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
428        assert!(self.writer_handles.is_empty());
429        let mut init_requests = AligningRequests::default();
430        while !init_requests.aligned() {
431            let (handle_id, event) = self.next_event().await?;
432            let unexpected_event = match event {
433                CoordinationHandleManagerEvent::NewHandle => {
434                    init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
435                    continue;
436                }
437                event => event.name(),
438            };
439            return Err(anyhow!(
440                "expect new handle during init, but got {}",
441                unexpected_event
442            ));
443        }
444        Ok(init_requests.handle_ids)
445    }
446
447    async fn alter_parallelisms(
448        &mut self,
449        altered_handles: impl Iterator<Item = HandleId>,
450    ) -> anyhow::Result<HashSet<HandleId>> {
451        let mut requests = AligningRequests::default();
452        for handle_id in altered_handles {
453            requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
454        }
455        let mut remaining_handles: HashSet<_> = self
456            .writer_handles
457            .keys()
458            .filter(|handle_id| !requests.handle_ids.contains(handle_id))
459            .cloned()
460            .collect();
461        while !remaining_handles.is_empty() || !requests.aligned() {
462            let (handle_id, event) = self.next_event().await?;
463            match event {
464                CoordinationHandleManagerEvent::NewHandle => {
465                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
466                }
467                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
468                    assert!(remaining_handles.remove(&handle_id));
469                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
470                }
471                CoordinationHandleManagerEvent::Stop => {
472                    assert!(remaining_handles.remove(&handle_id));
473                    self.stop_handle(handle_id)?;
474                }
475                CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
476                    bail!(
477                        "receive commit request on epoch {} from handle {} during alter parallelism",
478                        epoch,
479                        handle_id
480                    );
481                }
482                CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
483                    bail!(
484                        "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
485                        epoch,
486                        handle_id
487                    );
488                }
489            }
490        }
491        Ok(requests.handle_ids)
492    }
493}
494
495/// Represents the coordinator worker's state machine for handling schema changes.
496///
497/// - `Running`: Normal operation, handles can be started immediately
498/// - `WaitingForFlushed`: Waiting for all pending two-phase commits to complete before starting new handles. This
499///   ensures new sink executors load the correct schema.
500enum CoordinatorWorkerState {
501    Running,
502    WaitingForFlushed(HashSet<HandleId>),
503}
504
505pub struct CoordinatorWorker {
506    handle_manager: CoordinationHandleManager,
507    /// Last epoch whose commit has been acknowledged to sink writers.
508    ///
509    /// On recovery, pending sink states are treated as already acknowledged to writers, so this is
510    /// initialized to the latest pending epoch if any. Otherwise, it starts from the latest
511    /// committed epoch persisted in `pending_sink_state`.
512    last_writer_acked_epoch: Option<u64>,
513    curr_state: CoordinatorWorkerState,
514}
515
516enum CoordinatorWorkerEvent {
517    HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
518    ReadyToCommit(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>),
519}
520
521impl CoordinatorWorker {
522    #[expect(clippy::large_stack_frames)]
523    pub async fn run(
524        param: SinkParam,
525        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
526        db: DatabaseConnection,
527        subscriber: SinkCommittedEpochSubscriber,
528        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
529    ) {
530        let sink = match build_sink(param.clone()) {
531            Ok(sink) => sink,
532            Err(e) => {
533                error!(
534                    error = %e.as_report(),
535                    "unable to build sink with param {:?}",
536                    param
537                );
538                return;
539            }
540        };
541
542        dispatch_sink!(sink, sink, {
543            let coordinator =
544                match Box::pin(sink.new_coordinator(Some(iceberg_compact_stat_sender))).await {
545                    Ok(coordinator) => coordinator,
546                    Err(e) => {
547                        error!(
548                            error = %e.as_report(),
549                            "unable to build coordinator with param {:?}",
550                            param
551                        );
552                        return;
553                    }
554                };
555            Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
556        });
557    }
558
559    pub async fn execute_coordinator(
560        db: DatabaseConnection,
561        param: SinkParam,
562        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
563        coordinator: SinkCommitCoordinator,
564        subscriber: SinkCommittedEpochSubscriber,
565    ) {
566        let mut worker = CoordinatorWorker {
567            handle_manager: CoordinationHandleManager {
568                param,
569                writer_handles: HashMap::new(),
570                next_handle_id: 0,
571                request_rx,
572            },
573            last_writer_acked_epoch: None,
574            curr_state: CoordinatorWorkerState::Running,
575        };
576
577        if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
578            for handle in worker.handle_manager.writer_handles.into_values() {
579                handle.abort(Status::internal(format!(
580                    "failed to run coordination: {:?}",
581                    e.as_report()
582                )))
583            }
584        }
585    }
586
587    async fn try_handle_init_requests(
588        &mut self,
589        pending_handle_ids: &HashSet<HandleId>,
590        two_phase_handler: &mut TwoPhaseCommitHandler,
591    ) -> anyhow::Result<()> {
592        assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
593        if two_phase_handler.has_uncommitted_schema_change() {
594            // Delay handling init requests until all pending epochs are flushed.
595            self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
596        } else {
597            self.handle_init_requests_impl(pending_handle_ids.clone())
598                .await?;
599        }
600        Ok(())
601    }
602
603    async fn handle_init_requests_impl(
604        &mut self,
605        pending_handle_ids: impl IntoIterator<Item = HandleId>,
606    ) -> anyhow::Result<()> {
607        let log_store_rewind_start_epoch = self.last_writer_acked_epoch;
608        self.handle_manager
609            .start(log_store_rewind_start_epoch, pending_handle_ids)?;
610        if log_store_rewind_start_epoch.is_none() {
611            let mut align_requests = AligningRequests::default();
612            while !align_requests.aligned() {
613                let (handle_id, event) = self.handle_manager.next_event().await?;
614                match event {
615                    CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
616                        align_requests.add_new_request(
617                            handle_id,
618                            initial_epoch,
619                            self.handle_manager.vnode_bitmap(handle_id),
620                        )?;
621                    }
622                    other => {
623                        return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
624                    }
625                }
626            }
627            let aligned_initial_epoch = align_requests
628                .requests
629                .into_iter()
630                .max()
631                .expect("non-empty");
632            self.handle_manager
633                .ack_aligned_initial_epoch(aligned_initial_epoch)?;
634        }
635        Ok(())
636    }
637
638    async fn next_event(
639        &mut self,
640        two_phase_handler: &mut TwoPhaseCommitHandler,
641    ) -> anyhow::Result<CoordinatorWorkerEvent> {
642        if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state
643            && two_phase_handler.is_empty()
644        {
645            let pending_handle_ids = pending_handle_ids.clone();
646            self.handle_init_requests_impl(pending_handle_ids).await?;
647            self.curr_state = CoordinatorWorkerState::Running;
648        }
649
650        select! {
651            next_handle_event = self.handle_manager.next_event() => {
652                let (handle_id, event) = next_handle_event?;
653                Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
654            }
655
656            next_item_to_commit = two_phase_handler.next_to_commit() => {
657                let (epoch, metadata, schema_change) = next_item_to_commit?;
658                Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change))
659            }
660        }
661    }
662
663    async fn run_coordination(
664        &mut self,
665        db: DatabaseConnection,
666        mut coordinator: SinkCommitCoordinator,
667        subscriber: SinkCommittedEpochSubscriber,
668    ) -> anyhow::Result<()> {
669        let sink_id = self.handle_manager.param.sink_id;
670
671        let mut two_phase_handler = self
672            .init_state_from_store(&db, sink_id, subscriber, &mut coordinator)
673            .await?;
674        match &mut coordinator {
675            SinkCommitCoordinator::SinglePhase(coordinator) => coordinator.init().await?,
676            SinkCommitCoordinator::TwoPhase(coordinator) => coordinator.init().await?,
677        }
678
679        let mut running_handles = self.handle_manager.wait_init_handles().await?;
680        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
681            .await?;
682
683        let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
684        let mut pending_new_handles = vec![];
685        loop {
686            let event = self.next_event(&mut two_phase_handler).await?;
687            let (handle_id, epoch, commit_request) = match event {
688                CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
689                    CoordinationHandleManagerEvent::NewHandle => {
690                        pending_new_handles.push(handle_id);
691                        continue;
692                    }
693                    CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
694                        running_handles = self
695                            .handle_manager
696                            .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
697                            .await?;
698                        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
699                            .await?;
700                        continue;
701                    }
702                    CoordinationHandleManagerEvent::Stop => {
703                        self.handle_manager.stop_handle(handle_id)?;
704                        running_handles = self
705                            .handle_manager
706                            .alter_parallelisms(pending_new_handles.drain(..))
707                            .await?;
708                        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
709                            .await?;
710
711                        continue;
712                    }
713                    CoordinationHandleManagerEvent::CommitRequest {
714                        epoch,
715                        metadata,
716                        schema_change,
717                    } => (handle_id, epoch, (metadata, schema_change)),
718                    CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
719                        bail!("receive AlignInitialEpoch after initialization")
720                    }
721                },
722                CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change) => {
723                    let start_time = Instant::now();
724                    let commit_fut = async {
725                        match &mut coordinator {
726                            SinkCommitCoordinator::SinglePhase(coordinator) => {
727                                assert!(metadata.is_none());
728                                if let Some(schema_change) = schema_change {
729                                    coordinator
730                                        .commit_schema_change(epoch, schema_change)
731                                        .instrument_await(Self::commit_span(
732                                            "single_phase_schema_change",
733                                            sink_id,
734                                            epoch,
735                                        ))
736                                        .await?;
737                                }
738                            }
739                            SinkCommitCoordinator::TwoPhase(coordinator) => {
740                                if let Some(metadata) = metadata {
741                                    coordinator
742                                        .commit_data(epoch, metadata)
743                                        .instrument_await(Self::commit_span(
744                                            "two_phase_commit_data",
745                                            sink_id,
746                                            epoch,
747                                        ))
748                                        .await?;
749                                }
750                                if let Some(schema_change) = schema_change {
751                                    coordinator
752                                        .commit_schema_change(epoch, schema_change)
753                                        .instrument_await(Self::commit_span(
754                                            "two_phase_commit_schema_change",
755                                            sink_id,
756                                            epoch,
757                                        ))
758                                        .await?;
759                                }
760                            }
761                        }
762                        Ok(())
763                    };
764                    let commit_res =
765                        run_future_with_periodic_fn(commit_fut, Duration::from_secs(5), || {
766                            warn!(
767                                elapsed = ?start_time.elapsed(),
768                                %sink_id,
769                                "committing"
770                            );
771                        })
772                        .await;
773
774                    match commit_res {
775                        Ok(_) => {
776                            two_phase_handler.ack_committed(epoch).await?;
777                        }
778                        Err(e) => {
779                            two_phase_handler.failed_committed(epoch, e);
780                        }
781                    }
782
783                    continue;
784                }
785            };
786            if !running_handles.contains(&handle_id) {
787                bail!(
788                    "receiving commit request from non-running handle {}, running handles: {:?}",
789                    handle_id,
790                    running_handles
791                );
792            }
793            pending_epochs.entry(epoch).or_default().add_new_request(
794                handle_id,
795                commit_request,
796                self.handle_manager.vnode_bitmap(handle_id),
797            )?;
798            if pending_epochs
799                .first_key_value()
800                .expect("non-empty")
801                .1
802                .aligned()
803            {
804                let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
805                let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
806                let mut requests = commit_requests.requests.into_iter();
807                let (first_metadata, first_schema_change) = requests.next().expect("non-empty");
808                metadatas.push(first_metadata);
809                for (metadata, schema_change) in requests {
810                    if first_schema_change != schema_change {
811                        return Err(anyhow!(
812                            "got different schema change {:?} to prev schema change {:?}",
813                            schema_change,
814                            first_schema_change
815                        ));
816                    }
817                    metadatas.push(metadata);
818                }
819
820                match &mut coordinator {
821                    SinkCommitCoordinator::SinglePhase(coordinator) => {
822                        if !metadatas.is_empty() {
823                            let start_time = Instant::now();
824                            run_future_with_periodic_fn(
825                                coordinator.commit_data(epoch, metadatas).instrument_await(
826                                    Self::commit_span("single_phase_commit_data", sink_id, epoch),
827                                ),
828                                Duration::from_secs(5),
829                                || {
830                                    warn!(
831                                        elapsed = ?start_time.elapsed(),
832                                        %sink_id,
833                                        "committing"
834                                    );
835                                },
836                            )
837                            .await
838                            .map_err(|e| anyhow!(e))?;
839                        }
840                        if first_schema_change.is_some() {
841                            persist_pre_commit_metadata(
842                                &db,
843                                sink_id as _,
844                                epoch,
845                                None,
846                                first_schema_change.as_ref(),
847                            )
848                            .await?;
849                            two_phase_handler.push_new_item(epoch, None, first_schema_change);
850                        }
851                    }
852                    SinkCommitCoordinator::TwoPhase(coordinator) => {
853                        let commit_metadata = coordinator
854                            .pre_commit(epoch, metadatas, first_schema_change.clone())
855                            .instrument_await(Self::commit_span(
856                                "two_phase_pre_commit",
857                                sink_id,
858                                epoch,
859                            ))
860                            .await?;
861                        if commit_metadata.is_some() || first_schema_change.is_some() {
862                            persist_pre_commit_metadata(
863                                &db,
864                                sink_id as _,
865                                epoch,
866                                commit_metadata.clone(),
867                                first_schema_change.as_ref(),
868                            )
869                            .await?;
870                            two_phase_handler.push_new_item(
871                                epoch,
872                                commit_metadata,
873                                first_schema_change,
874                            );
875                        }
876                    }
877                }
878
879                self.handle_manager
880                    .ack_commit(epoch, commit_requests.handle_ids)?;
881                self.last_writer_acked_epoch = Some(epoch);
882            }
883        }
884    }
885
886    /// Return `TwoPhaseCommitHandler` initialized from the persisted state in the meta store.
887    async fn init_state_from_store(
888        &mut self,
889        db: &DatabaseConnection,
890        sink_id: SinkId,
891        subscriber: SinkCommittedEpochSubscriber,
892        coordinator: &mut SinkCommitCoordinator,
893    ) -> anyhow::Result<TwoPhaseCommitHandler> {
894        let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
895
896        let mut metadata_iter = ordered_metadata.into_iter().peekable();
897        let last_committed_epoch = metadata_iter
898            .next_if(|(_, state, _, _)| matches!(state, SinkState::Committed))
899            .map(|(epoch, _, _, _)| epoch);
900
901        let pending_items = metadata_iter
902            .peeking_take_while(|(_, state, _, _)| matches!(state, SinkState::Pending))
903            .map(|(epoch, _, metadata, schema_change)| (epoch, metadata, schema_change))
904            .collect_vec();
905        self.last_writer_acked_epoch = pending_items
906            .last()
907            .map(|(epoch, _, _)| *epoch)
908            .or(last_committed_epoch);
909
910        let mut aborted_epochs = vec![];
911
912        for (epoch, state, metadata, _) in metadata_iter {
913            match state {
914                SinkState::Aborted => {
915                    if let Some(metadata) = metadata
916                        && let SinkCommitCoordinator::TwoPhase(coordinator) = coordinator
917                    {
918                        coordinator.abort(epoch, metadata).await;
919                    }
920                    aborted_epochs.push(epoch);
921                }
922                other => {
923                    unreachable!(
924                        "unexpected state {:?} after pending items at epoch {}",
925                        other, epoch
926                    );
927                }
928            }
929        }
930
931        // Records for all aborted epochs and previously committed epochs are no longer needed.
932        clean_aborted_records(db, sink_id, aborted_epochs).await?;
933
934        let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
935        let mut two_phase_handler = TwoPhaseCommitHandler::new(
936            db.clone(),
937            sink_id,
938            initial_hummock_committed_epoch,
939            job_committed_epoch_rx,
940            last_committed_epoch,
941        );
942
943        for (epoch, metadata, schema_change) in pending_items {
944            two_phase_handler.push_new_item(epoch, metadata, schema_change);
945        }
946
947        Ok(two_phase_handler)
948    }
949
950    fn commit_span(stage: &str, sink_id: SinkId, epoch: u64) -> await_tree::Span {
951        await_tree::span!("sink_coord_{stage} (sink_id {sink_id}, epoch {epoch})").long_running()
952    }
953}