risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::{Either, pending, select};
25use futures::pin_mut;
26use itertools::Itertools;
27use risingwave_common::bail;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::SinkId;
32use risingwave_connector::sink::{
33    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
34};
35use risingwave_meta_model::pending_sink_state::SinkState;
36use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
37use risingwave_pb::stream_plan::PbSinkSchemaChange;
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48    clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49    persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54    future: F,
55    interval: Duration,
56    mut f: impl FnMut(),
57) -> F::Output {
58    pin_mut!(future);
59    loop {
60        match select(&mut future, pin!(sleep(interval))).await {
61            Either::Left((output, _)) => {
62                break output;
63            }
64            Either::Right(_) => f(),
65        }
66    }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73    requests: Vec<R>,
74    handle_ids: HashSet<HandleId>,
75    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
76}
77
78impl<R> AligningRequests<R> {
79    fn add_new_request(
80        &mut self,
81        handle_id: HandleId,
82        request: R,
83        vnode_bitmap: &Bitmap,
84    ) -> anyhow::Result<()>
85    where
86        R: Debug,
87    {
88        let committed_bitmap = self
89            .committed_bitmap
90            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93        let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94        if check_bitmap.count_ones() > 0 {
95            return Err(anyhow!(
96                "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97                check_bitmap.iter_ones().collect_vec(),
98                vnode_bitmap,
99                committed_bitmap,
100                self.requests,
101                request
102            ));
103        }
104        *committed_bitmap |= vnode_bitmap;
105        self.requests.push(request);
106        assert!(self.handle_ids.insert(handle_id));
107        Ok(())
108    }
109
110    fn aligned(&self) -> bool {
111        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112    }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119    db: DatabaseConnection,
120    sink_id: SinkId,
121    curr_hummock_committed_epoch: u64,
122    job_committed_epoch_rx: UnboundedReceiver<u64>,
123    last_committed_epoch: Option<u64>,
124    pending_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
125    prepared_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
126    backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130    fn new(
131        db: DatabaseConnection,
132        sink_id: SinkId,
133        initial_hummock_committed_epoch: u64,
134        job_committed_epoch_rx: UnboundedReceiver<u64>,
135        last_committed_epoch: Option<u64>,
136    ) -> Self {
137        Self {
138            db,
139            sink_id,
140            curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141            job_committed_epoch_rx,
142            last_committed_epoch,
143            pending_epochs: VecDeque::new(),
144            prepared_epochs: VecDeque::new(),
145            backoff_state: None,
146        }
147    }
148
149    #[define_opaque(RetryBackoffStrategy)]
150    fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151        ExponentialBackoff::from_millis(10)
152            .max_delay(Duration::from_secs(60))
153            .map(jitter)
154            .map(|delay| Box::pin(tokio::time::sleep(delay)))
155    }
156
157    async fn next_to_commit(
158        &mut self,
159    ) -> anyhow::Result<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)> {
160        loop {
161            let wait_backoff = async {
162                if self.prepared_epochs.is_empty() {
163                    pending::<()>().await;
164                } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
165                    backoff_fut.await;
166                }
167            };
168
169            select! {
170                _ = wait_backoff => {
171                    let item = self.prepared_epochs.front().cloned().expect("non-empty");
172                    return Ok(item);
173                }
174
175                recv_epoch = self.job_committed_epoch_rx.recv() => {
176                    let Some(recv_epoch) = recv_epoch else {
177                        return Err(anyhow!(
178                            "Hummock committed epoch sender closed unexpectedly"
179                        ));
180                    };
181                    self.curr_hummock_committed_epoch = recv_epoch;
182                    while let Some((epoch, metadata, schema_change)) = self.pending_epochs.pop_front_if(|(epoch, _, _)| *epoch <= recv_epoch) {
183                        if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
184                            assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
185                        }
186                        self.prepared_epochs.push_back((epoch, metadata, schema_change));
187                    }
188                }
189            }
190        }
191    }
192
193    fn push_new_item(
194        &mut self,
195        epoch: u64,
196        metadata: Option<Vec<u8>>,
197        schema_change: Option<PbSinkSchemaChange>,
198    ) {
199        if epoch > self.curr_hummock_committed_epoch {
200            if let Some((last_epoch, _, _)) = self.pending_epochs.back() {
201                assert!(
202                    epoch > *last_epoch,
203                    "pending epochs must be in increasing order"
204                );
205            }
206            self.pending_epochs
207                .push_back((epoch, metadata, schema_change));
208        } else {
209            assert!(self.pending_epochs.is_empty());
210            if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
211                assert!(
212                    epoch > *last_epoch,
213                    "prepared epochs must be in increasing order"
214                );
215            }
216            self.prepared_epochs
217                .push_back((epoch, metadata, schema_change));
218        }
219    }
220
221    async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
222        self.backoff_state = None;
223        let (last_epoch, _, _) = self.prepared_epochs.pop_front().expect("non-empty");
224        assert_eq!(last_epoch, epoch);
225
226        commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
227        self.last_committed_epoch = Some(epoch);
228        Ok(())
229    }
230
231    fn failed_committed(&mut self, epoch: u64, err: SinkError) {
232        assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
233        if let Some((prev_fut, strategy)) = &mut self.backoff_state {
234            let new_fut = strategy.next().expect("infinite");
235            *prev_fut = new_fut;
236        } else {
237            let mut strategy = Self::get_retry_backoff_strategy();
238            let backoff_fut = strategy.next().expect("infinite");
239            self.backoff_state = Some((backoff_fut, strategy));
240        }
241        tracing::error!(
242            error = %err.as_report(),
243            %self.sink_id,
244            "failed to commit epoch {}, Retrying after backoff",
245            epoch,
246        );
247    }
248
249    fn is_empty(&self) -> bool {
250        self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
251    }
252
253    /// Whether there exists an uncommitted schema change.
254    ///
255    /// Per current design, if a `schema_change` exists, it should be attached to the latest
256    /// uncommitted item across `pending_epochs` and `prepared_epochs`.
257    fn has_uncommitted_schema_change(&self) -> bool {
258        if let Some((_, _, schema_change)) = self.pending_epochs.back() {
259            schema_change.is_some()
260        } else if let Some((_, _, schema_change)) = self.prepared_epochs.back() {
261            schema_change.is_some()
262        } else {
263            false
264        }
265    }
266}
267
268struct CoordinationHandleManager {
269    param: SinkParam,
270    writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
271    next_handle_id: HandleId,
272    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
273}
274
275impl CoordinationHandleManager {
276    fn start(
277        &mut self,
278        log_store_rewind_start_epoch: Option<u64>,
279        handle_ids: impl IntoIterator<Item = HandleId>,
280    ) -> anyhow::Result<()> {
281        for handle_id in handle_ids {
282            let handle = self
283                .writer_handles
284                .get_mut(&handle_id)
285                .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
286            handle.start(log_store_rewind_start_epoch).map_err(|_| {
287                anyhow!(
288                    "fail to start {:?} for handle {}",
289                    log_store_rewind_start_epoch,
290                    handle_id
291                )
292            })?;
293        }
294        Ok(())
295    }
296
297    fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
298        for (handle_id, handle) in &mut self.writer_handles {
299            handle
300                .ack_aligned_initial_epoch(aligned_initial_epoch)
301                .map_err(|_| {
302                    anyhow!(
303                        "fail to ack_aligned_initial_epoch {:?} for handle {}",
304                        aligned_initial_epoch,
305                        handle_id
306                    )
307                })?;
308        }
309        Ok(())
310    }
311
312    fn ack_commit(
313        &mut self,
314        epoch: u64,
315        handle_ids: impl IntoIterator<Item = HandleId>,
316    ) -> anyhow::Result<()> {
317        for handle_id in handle_ids {
318            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
319                anyhow!(
320                    "fail to find handle for {} when ack commit on epoch {}",
321                    handle_id,
322                    epoch
323                )
324            })?;
325            handle.ack_commit(epoch).map_err(|_| {
326                anyhow!(
327                    "fail to ack commit on epoch {} for handle {}",
328                    epoch,
329                    handle_id
330                )
331            })?;
332        }
333        Ok(())
334    }
335
336    async fn next_request_inner(
337        writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
338    ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
339        poll_fn(|cx| {
340            for (handle_id, handle) in writer_handles.iter_mut() {
341                if let Poll::Ready(result) = handle.poll_next_request(cx) {
342                    return Poll::Ready(result.map(|request| (*handle_id, request)));
343                }
344            }
345            Poll::Pending
346        })
347        .await
348    }
349}
350
351enum CoordinationHandleManagerEvent {
352    NewHandle,
353    UpdateVnodeBitmap,
354    Stop,
355    CommitRequest {
356        epoch: u64,
357        metadata: SinkMetadata,
358        schema_change: Option<PbSinkSchemaChange>,
359    },
360    AlignInitialEpoch(u64),
361}
362
363impl CoordinationHandleManagerEvent {
364    fn name(&self) -> &'static str {
365        match self {
366            CoordinationHandleManagerEvent::NewHandle => "NewHandle",
367            CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
368            CoordinationHandleManagerEvent::Stop => "Stop",
369            CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
370            CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
371        }
372    }
373}
374
375impl CoordinationHandleManager {
376    async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
377        select! {
378            handle = self.request_rx.recv() => {
379                let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
380                if handle.param() != &self.param {
381                    warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
382                }
383                let handle_id = self.next_handle_id;
384                self.next_handle_id += 1;
385                self.writer_handles.insert(handle_id, handle);
386                Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
387            }
388            result = Self::next_request_inner(&mut self.writer_handles) => {
389                let (handle_id, request) = result?;
390                let event = match request {
391                    coordinate_request::Msg::CommitRequest(request) => {
392                        CoordinationHandleManagerEvent::CommitRequest {
393                            epoch: request.epoch,
394                            metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
395                            schema_change: request.schema_change,
396                        }
397                    }
398                    coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
399                        CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
400                    }
401                    coordinate_request::Msg::UpdateVnodeRequest(_) => {
402                        CoordinationHandleManagerEvent::UpdateVnodeBitmap
403                    }
404                    coordinate_request::Msg::Stop(_) => {
405                        CoordinationHandleManagerEvent::Stop
406                    }
407                    coordinate_request::Msg::StartRequest(_) => {
408                        unreachable!("should have been handled");
409                    }
410                };
411                Ok((handle_id, event))
412            }
413        }
414    }
415
416    fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
417        self.writer_handles[&handle_id].vnode_bitmap()
418    }
419
420    fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
421        self.writer_handles
422            .remove(&handle_id)
423            .expect("should exist")
424            .stop()
425    }
426
427    async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
428        assert!(self.writer_handles.is_empty());
429        let mut init_requests = AligningRequests::default();
430        while !init_requests.aligned() {
431            let (handle_id, event) = self.next_event().await?;
432            let unexpected_event = match event {
433                CoordinationHandleManagerEvent::NewHandle => {
434                    init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
435                    continue;
436                }
437                event => event.name(),
438            };
439            return Err(anyhow!(
440                "expect new handle during init, but got {}",
441                unexpected_event
442            ));
443        }
444        Ok(init_requests.handle_ids)
445    }
446
447    async fn alter_parallelisms(
448        &mut self,
449        altered_handles: impl Iterator<Item = HandleId>,
450    ) -> anyhow::Result<HashSet<HandleId>> {
451        let mut requests = AligningRequests::default();
452        for handle_id in altered_handles {
453            requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
454        }
455        let mut remaining_handles: HashSet<_> = self
456            .writer_handles
457            .keys()
458            .filter(|handle_id| !requests.handle_ids.contains(handle_id))
459            .cloned()
460            .collect();
461        while !remaining_handles.is_empty() || !requests.aligned() {
462            let (handle_id, event) = self.next_event().await?;
463            match event {
464                CoordinationHandleManagerEvent::NewHandle => {
465                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
466                }
467                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
468                    assert!(remaining_handles.remove(&handle_id));
469                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
470                }
471                CoordinationHandleManagerEvent::Stop => {
472                    assert!(remaining_handles.remove(&handle_id));
473                    self.stop_handle(handle_id)?;
474                }
475                CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
476                    bail!(
477                        "receive commit request on epoch {} from handle {} during alter parallelism",
478                        epoch,
479                        handle_id
480                    );
481                }
482                CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
483                    bail!(
484                        "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
485                        epoch,
486                        handle_id
487                    );
488                }
489            }
490        }
491        Ok(requests.handle_ids)
492    }
493}
494
495/// Represents the coordinator worker's state machine for handling schema changes.
496///
497/// - `Running`: Normal operation, handles can be started immediately
498/// - `WaitingForFlushed`: Waiting for all pending two-phase commits to complete before starting new handles. This
499///   ensures new sink executors load the correct schema.
500enum CoordinatorWorkerState {
501    Running,
502    WaitingForFlushed(HashSet<HandleId>),
503}
504
505pub struct CoordinatorWorker {
506    handle_manager: CoordinationHandleManager,
507    prev_committed_epoch: Option<u64>,
508    curr_state: CoordinatorWorkerState,
509}
510
511enum CoordinatorWorkerEvent {
512    HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
513    ReadyToCommit(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>),
514}
515
516impl CoordinatorWorker {
517    #[allow(clippy::large_stack_frames)]
518    pub async fn run(
519        param: SinkParam,
520        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
521        db: DatabaseConnection,
522        subscriber: SinkCommittedEpochSubscriber,
523        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
524    ) {
525        let sink = match build_sink(param.clone()) {
526            Ok(sink) => sink,
527            Err(e) => {
528                error!(
529                    error = %e.as_report(),
530                    "unable to build sink with param {:?}",
531                    param
532                );
533                return;
534            }
535        };
536
537        dispatch_sink!(sink, sink, {
538            let coordinator =
539                match Box::pin(sink.new_coordinator(Some(iceberg_compact_stat_sender))).await {
540                    Ok(coordinator) => coordinator,
541                    Err(e) => {
542                        error!(
543                            error = %e.as_report(),
544                            "unable to build coordinator with param {:?}",
545                            param
546                        );
547                        return;
548                    }
549                };
550            Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
551        });
552    }
553
554    pub async fn execute_coordinator(
555        db: DatabaseConnection,
556        param: SinkParam,
557        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
558        coordinator: SinkCommitCoordinator,
559        subscriber: SinkCommittedEpochSubscriber,
560    ) {
561        let mut worker = CoordinatorWorker {
562            handle_manager: CoordinationHandleManager {
563                param,
564                writer_handles: HashMap::new(),
565                next_handle_id: 0,
566                request_rx,
567            },
568            prev_committed_epoch: None,
569            curr_state: CoordinatorWorkerState::Running,
570        };
571
572        if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
573            for handle in worker.handle_manager.writer_handles.into_values() {
574                handle.abort(Status::internal(format!(
575                    "failed to run coordination: {:?}",
576                    e.as_report()
577                )))
578            }
579        }
580    }
581
582    async fn try_handle_init_requests(
583        &mut self,
584        pending_handle_ids: &HashSet<HandleId>,
585        two_phase_handler: &mut TwoPhaseCommitHandler,
586    ) -> anyhow::Result<()> {
587        assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
588        if two_phase_handler.has_uncommitted_schema_change() {
589            // Delay handling init requests until all pending epochs are flushed.
590            self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
591        } else {
592            self.handle_init_requests_impl(pending_handle_ids.clone())
593                .await?;
594        }
595        Ok(())
596    }
597
598    async fn handle_init_requests_impl(
599        &mut self,
600        pending_handle_ids: impl IntoIterator<Item = HandleId>,
601    ) -> anyhow::Result<()> {
602        let log_store_rewind_start_epoch = self.prev_committed_epoch;
603        self.handle_manager
604            .start(log_store_rewind_start_epoch, pending_handle_ids)?;
605        if log_store_rewind_start_epoch.is_none() {
606            let mut align_requests = AligningRequests::default();
607            while !align_requests.aligned() {
608                let (handle_id, event) = self.handle_manager.next_event().await?;
609                match event {
610                    CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
611                        align_requests.add_new_request(
612                            handle_id,
613                            initial_epoch,
614                            self.handle_manager.vnode_bitmap(handle_id),
615                        )?;
616                    }
617                    other => {
618                        return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
619                    }
620                }
621            }
622            let aligned_initial_epoch = align_requests
623                .requests
624                .into_iter()
625                .max()
626                .expect("non-empty");
627            self.handle_manager
628                .ack_aligned_initial_epoch(aligned_initial_epoch)?;
629        }
630        Ok(())
631    }
632
633    async fn next_event(
634        &mut self,
635        two_phase_handler: &mut TwoPhaseCommitHandler,
636    ) -> anyhow::Result<CoordinatorWorkerEvent> {
637        if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state
638            && two_phase_handler.is_empty()
639        {
640            let pending_handle_ids = pending_handle_ids.clone();
641            self.handle_init_requests_impl(pending_handle_ids).await?;
642            self.curr_state = CoordinatorWorkerState::Running;
643        }
644
645        select! {
646            next_handle_event = self.handle_manager.next_event() => {
647                let (handle_id, event) = next_handle_event?;
648                Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
649            }
650
651            next_item_to_commit = two_phase_handler.next_to_commit() => {
652                let (epoch, metadata, schema_change) = next_item_to_commit?;
653                Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change))
654            }
655        }
656    }
657
658    async fn run_coordination(
659        &mut self,
660        db: DatabaseConnection,
661        mut coordinator: SinkCommitCoordinator,
662        subscriber: SinkCommittedEpochSubscriber,
663    ) -> anyhow::Result<()> {
664        let sink_id = self.handle_manager.param.sink_id;
665
666        let mut two_phase_handler = self
667            .init_state_from_store(&db, sink_id, subscriber, &mut coordinator)
668            .await?;
669        match &mut coordinator {
670            SinkCommitCoordinator::SinglePhase(coordinator) => coordinator.init().await?,
671            SinkCommitCoordinator::TwoPhase(coordinator) => coordinator.init().await?,
672        }
673
674        let mut running_handles = self.handle_manager.wait_init_handles().await?;
675        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
676            .await?;
677
678        let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
679        let mut pending_new_handles = vec![];
680        loop {
681            let event = self.next_event(&mut two_phase_handler).await?;
682            let (handle_id, epoch, commit_request) = match event {
683                CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
684                    CoordinationHandleManagerEvent::NewHandle => {
685                        pending_new_handles.push(handle_id);
686                        continue;
687                    }
688                    CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
689                        running_handles = self
690                            .handle_manager
691                            .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
692                            .await?;
693                        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
694                            .await?;
695                        continue;
696                    }
697                    CoordinationHandleManagerEvent::Stop => {
698                        self.handle_manager.stop_handle(handle_id)?;
699                        running_handles = self
700                            .handle_manager
701                            .alter_parallelisms(pending_new_handles.drain(..))
702                            .await?;
703                        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
704                            .await?;
705
706                        continue;
707                    }
708                    CoordinationHandleManagerEvent::CommitRequest {
709                        epoch,
710                        metadata,
711                        schema_change,
712                    } => (handle_id, epoch, (metadata, schema_change)),
713                    CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
714                        bail!("receive AlignInitialEpoch after initialization")
715                    }
716                },
717                CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change) => {
718                    let start_time = Instant::now();
719                    let commit_fut = async {
720                        match &mut coordinator {
721                            SinkCommitCoordinator::SinglePhase(coordinator) => {
722                                assert!(metadata.is_none());
723                                if let Some(schema_change) = schema_change {
724                                    coordinator
725                                        .commit_schema_change(epoch, schema_change)
726                                        .instrument_await(Self::commit_span(
727                                            "single_phase_schema_change",
728                                            sink_id,
729                                            epoch,
730                                        ))
731                                        .await?;
732                                }
733                            }
734                            SinkCommitCoordinator::TwoPhase(coordinator) => {
735                                if let Some(metadata) = metadata {
736                                    coordinator
737                                        .commit_data(epoch, metadata)
738                                        .instrument_await(Self::commit_span(
739                                            "two_phase_commit_data",
740                                            sink_id,
741                                            epoch,
742                                        ))
743                                        .await?;
744                                }
745                                if let Some(schema_change) = schema_change {
746                                    coordinator
747                                        .commit_schema_change(epoch, schema_change)
748                                        .instrument_await(Self::commit_span(
749                                            "two_phase_commit_schema_change",
750                                            sink_id,
751                                            epoch,
752                                        ))
753                                        .await?;
754                                }
755                            }
756                        }
757                        Ok(())
758                    };
759                    let commit_res =
760                        run_future_with_periodic_fn(commit_fut, Duration::from_secs(5), || {
761                            warn!(
762                                elapsed = ?start_time.elapsed(),
763                                %sink_id,
764                                "committing"
765                            );
766                        })
767                        .await;
768
769                    match commit_res {
770                        Ok(_) => {
771                            two_phase_handler.ack_committed(epoch).await?;
772                            self.prev_committed_epoch = Some(epoch);
773                        }
774                        Err(e) => {
775                            two_phase_handler.failed_committed(epoch, e);
776                        }
777                    }
778
779                    continue;
780                }
781            };
782            if !running_handles.contains(&handle_id) {
783                bail!(
784                    "receiving commit request from non-running handle {}, running handles: {:?}",
785                    handle_id,
786                    running_handles
787                );
788            }
789            pending_epochs.entry(epoch).or_default().add_new_request(
790                handle_id,
791                commit_request,
792                self.handle_manager.vnode_bitmap(handle_id),
793            )?;
794            if pending_epochs
795                .first_key_value()
796                .expect("non-empty")
797                .1
798                .aligned()
799            {
800                let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
801                let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
802                let mut requests = commit_requests.requests.into_iter();
803                let (first_metadata, first_schema_change) = requests.next().expect("non-empty");
804                metadatas.push(first_metadata);
805                for (metadata, schema_change) in requests {
806                    if first_schema_change != schema_change {
807                        return Err(anyhow!(
808                            "got different schema change {:?} to prev schema change {:?}",
809                            schema_change,
810                            first_schema_change
811                        ));
812                    }
813                    metadatas.push(metadata);
814                }
815
816                match &mut coordinator {
817                    SinkCommitCoordinator::SinglePhase(coordinator) => {
818                        if !metadatas.is_empty() {
819                            let start_time = Instant::now();
820                            run_future_with_periodic_fn(
821                                coordinator.commit_data(epoch, metadatas).instrument_await(
822                                    Self::commit_span("single_phase_commit_data", sink_id, epoch),
823                                ),
824                                Duration::from_secs(5),
825                                || {
826                                    warn!(
827                                        elapsed = ?start_time.elapsed(),
828                                        %sink_id,
829                                        "committing"
830                                    );
831                                },
832                            )
833                            .await
834                            .map_err(|e| anyhow!(e))?;
835                        }
836                        if first_schema_change.is_some() {
837                            persist_pre_commit_metadata(
838                                &db,
839                                sink_id as _,
840                                epoch,
841                                None,
842                                first_schema_change.as_ref(),
843                            )
844                            .await?;
845                            two_phase_handler.push_new_item(epoch, None, first_schema_change);
846                        } else {
847                            self.prev_committed_epoch = Some(epoch);
848                        }
849                    }
850                    SinkCommitCoordinator::TwoPhase(coordinator) => {
851                        let commit_metadata = coordinator
852                            .pre_commit(epoch, metadatas, first_schema_change.clone())
853                            .instrument_await(Self::commit_span(
854                                "two_phase_pre_commit",
855                                sink_id,
856                                epoch,
857                            ))
858                            .await?;
859                        if commit_metadata.is_some() || first_schema_change.is_some() {
860                            persist_pre_commit_metadata(
861                                &db,
862                                sink_id as _,
863                                epoch,
864                                commit_metadata.clone(),
865                                first_schema_change.as_ref(),
866                            )
867                            .await?;
868                            two_phase_handler.push_new_item(
869                                epoch,
870                                commit_metadata,
871                                first_schema_change,
872                            );
873                        } else {
874                            // No data to commit and no schema change.
875                            self.prev_committed_epoch = Some(epoch);
876                        }
877                    }
878                }
879
880                self.handle_manager
881                    .ack_commit(epoch, commit_requests.handle_ids)?;
882            }
883        }
884    }
885
886    /// Return `TwoPhaseCommitHandler` initialized from the persisted state in the meta store.
887    async fn init_state_from_store(
888        &mut self,
889        db: &DatabaseConnection,
890        sink_id: SinkId,
891        subscriber: SinkCommittedEpochSubscriber,
892        coordinator: &mut SinkCommitCoordinator,
893    ) -> anyhow::Result<TwoPhaseCommitHandler> {
894        let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
895
896        let mut metadata_iter = ordered_metadata.into_iter().peekable();
897        let last_committed_epoch = metadata_iter
898            .next_if(|(_, state, _, _)| matches!(state, SinkState::Committed))
899            .map(|(epoch, _, _, _)| epoch);
900        self.prev_committed_epoch = last_committed_epoch;
901
902        let pending_items = metadata_iter
903            .peeking_take_while(|(_, state, _, _)| matches!(state, SinkState::Pending))
904            .map(|(epoch, _, metadata, schema_change)| (epoch, metadata, schema_change))
905            .collect_vec();
906
907        let mut aborted_epochs = vec![];
908
909        for (epoch, state, metadata, _) in metadata_iter {
910            match state {
911                SinkState::Aborted => {
912                    if let Some(metadata) = metadata
913                        && let SinkCommitCoordinator::TwoPhase(coordinator) = coordinator
914                    {
915                        coordinator.abort(epoch, metadata).await;
916                    }
917                    aborted_epochs.push(epoch);
918                }
919                other => {
920                    unreachable!(
921                        "unexpected state {:?} after pending items at epoch {}",
922                        other, epoch
923                    );
924                }
925            }
926        }
927
928        // Records for all aborted epochs and previously committed epochs are no longer needed.
929        clean_aborted_records(db, sink_id, aborted_epochs).await?;
930
931        let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
932        let mut two_phase_handler = TwoPhaseCommitHandler::new(
933            db.clone(),
934            sink_id,
935            initial_hummock_committed_epoch,
936            job_committed_epoch_rx,
937            last_committed_epoch,
938        );
939
940        for (epoch, metadata, schema_change) in pending_items {
941            two_phase_handler.push_new_item(epoch, metadata, schema_change);
942        }
943
944        Ok(two_phase_handler)
945    }
946
947    fn commit_span(stage: &str, sink_id: SinkId, epoch: u64) -> await_tree::Span {
948        await_tree::span!("sink_coord_{stage} (sink_id {sink_id}, epoch {epoch})").long_running()
949    }
950}