risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::{Either, pending, select};
25use futures::pin_mut;
26use itertools::Itertools;
27use risingwave_common::bail;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::SinkId;
32use risingwave_connector::sink::{
33    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam, build_sink,
34};
35use risingwave_meta_model::pending_sink_state::SinkState;
36use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
37use risingwave_pb::stream_plan::PbSinkSchemaChange;
38use sea_orm::DatabaseConnection;
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::time::sleep;
43use tokio_retry::strategy::{ExponentialBackoff, jitter};
44use tonic::Status;
45use tracing::{error, warn};
46
47use crate::manager::sink_coordination::exactly_once_util::{
48    clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
49    persist_pre_commit_metadata,
50};
51use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
52
53async fn run_future_with_periodic_fn<F: Future>(
54    future: F,
55    interval: Duration,
56    mut f: impl FnMut(),
57) -> F::Output {
58    pin_mut!(future);
59    loop {
60        match select(&mut future, pin!(sleep(interval))).await {
61            Either::Left((output, _)) => {
62                break output;
63            }
64            Either::Right(_) => f(),
65        }
66    }
67}
68
69type HandleId = usize;
70
71#[derive(Default)]
72struct AligningRequests<R> {
73    requests: Vec<R>,
74    handle_ids: HashSet<HandleId>,
75    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
76}
77
78impl<R> AligningRequests<R> {
79    fn add_new_request(
80        &mut self,
81        handle_id: HandleId,
82        request: R,
83        vnode_bitmap: &Bitmap,
84    ) -> anyhow::Result<()>
85    where
86        R: Debug,
87    {
88        let committed_bitmap = self
89            .committed_bitmap
90            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
91        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
92
93        let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
94        if check_bitmap.count_ones() > 0 {
95            return Err(anyhow!(
96                "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
97                check_bitmap.iter_ones().collect_vec(),
98                vnode_bitmap,
99                committed_bitmap,
100                self.requests,
101                request
102            ));
103        }
104        *committed_bitmap |= vnode_bitmap;
105        self.requests.push(request);
106        assert!(self.handle_ids.insert(handle_id));
107        Ok(())
108    }
109
110    fn aligned(&self) -> bool {
111        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
112    }
113}
114
115type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
116type RetryBackoffStrategy = impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
117
118struct TwoPhaseCommitHandler {
119    db: DatabaseConnection,
120    sink_id: SinkId,
121    curr_hummock_committed_epoch: u64,
122    job_committed_epoch_rx: UnboundedReceiver<u64>,
123    last_committed_epoch: Option<u64>,
124    pending_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
125    prepared_epochs: VecDeque<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)>,
126    backoff_state: Option<(RetryBackoffFuture, RetryBackoffStrategy)>,
127}
128
129impl TwoPhaseCommitHandler {
130    fn new(
131        db: DatabaseConnection,
132        sink_id: SinkId,
133        initial_hummock_committed_epoch: u64,
134        job_committed_epoch_rx: UnboundedReceiver<u64>,
135        last_committed_epoch: Option<u64>,
136    ) -> Self {
137        Self {
138            db,
139            sink_id,
140            curr_hummock_committed_epoch: initial_hummock_committed_epoch,
141            job_committed_epoch_rx,
142            last_committed_epoch,
143            pending_epochs: VecDeque::new(),
144            prepared_epochs: VecDeque::new(),
145            backoff_state: None,
146        }
147    }
148
149    #[define_opaque(RetryBackoffStrategy)]
150    fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
151        ExponentialBackoff::from_millis(10)
152            .max_delay(Duration::from_secs(60))
153            .map(jitter)
154            .map(|delay| Box::pin(tokio::time::sleep(delay)))
155    }
156
157    async fn next_to_commit(
158        &mut self,
159    ) -> anyhow::Result<(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>)> {
160        loop {
161            let wait_backoff = async {
162                if self.prepared_epochs.is_empty() {
163                    pending::<()>().await;
164                } else if let Some((backoff_fut, _)) = &mut self.backoff_state {
165                    backoff_fut.await;
166                }
167            };
168
169            select! {
170                _ = wait_backoff => {
171                    let item = self.prepared_epochs.front().cloned().expect("non-empty");
172                    return Ok(item);
173                }
174
175                recv_epoch = self.job_committed_epoch_rx.recv() => {
176                    let Some(recv_epoch) = recv_epoch else {
177                        return Err(anyhow!(
178                            "Hummock committed epoch sender closed unexpectedly"
179                        ));
180                    };
181                    self.curr_hummock_committed_epoch = recv_epoch;
182                    while let Some((epoch, metadata, schema_change)) = self.pending_epochs.pop_front_if(|(epoch, _, _)| *epoch <= recv_epoch) {
183                        if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
184                            assert!(epoch > *last_epoch, "prepared epochs must be in increasing order");
185                        }
186                        self.prepared_epochs.push_back((epoch, metadata, schema_change));
187                    }
188                }
189            }
190        }
191    }
192
193    fn push_new_item(
194        &mut self,
195        epoch: u64,
196        metadata: Option<Vec<u8>>,
197        schema_change: Option<PbSinkSchemaChange>,
198    ) {
199        if epoch > self.curr_hummock_committed_epoch {
200            if let Some((last_epoch, _, _)) = self.pending_epochs.back() {
201                assert!(
202                    epoch > *last_epoch,
203                    "pending epochs must be in increasing order"
204                );
205            }
206            self.pending_epochs
207                .push_back((epoch, metadata, schema_change));
208        } else {
209            assert!(self.pending_epochs.is_empty());
210            if let Some((last_epoch, _, _)) = self.prepared_epochs.back() {
211                assert!(
212                    epoch > *last_epoch,
213                    "prepared epochs must be in increasing order"
214                );
215            }
216            self.prepared_epochs
217                .push_back((epoch, metadata, schema_change));
218        }
219    }
220
221    async fn ack_committed(&mut self, epoch: u64) -> anyhow::Result<()> {
222        self.backoff_state = None;
223        let (last_epoch, _, _) = self.prepared_epochs.pop_front().expect("non-empty");
224        assert_eq!(last_epoch, epoch);
225
226        commit_and_prune_epoch(&self.db, self.sink_id, epoch, self.last_committed_epoch).await?;
227        self.last_committed_epoch = Some(epoch);
228        Ok(())
229    }
230
231    fn failed_committed(&mut self, epoch: u64, err: SinkError) {
232        assert_eq!(self.prepared_epochs.front().expect("non-empty").0, epoch,);
233        if let Some((prev_fut, strategy)) = &mut self.backoff_state {
234            let new_fut = strategy.next().expect("infinite");
235            *prev_fut = new_fut;
236        } else {
237            let mut strategy = Self::get_retry_backoff_strategy();
238            let backoff_fut = strategy.next().expect("infinite");
239            self.backoff_state = Some((backoff_fut, strategy));
240        }
241        tracing::error!(
242            error = %err.as_report(),
243            %self.sink_id,
244            "failed to commit epoch {}, Retrying after backoff",
245            epoch,
246        );
247    }
248
249    fn is_empty(&self) -> bool {
250        self.pending_epochs.is_empty() && self.prepared_epochs.is_empty()
251    }
252
253    /// Whether there exists an uncommitted schema change.
254    ///
255    /// Per current design, if a `schema_change` exists, it should be attached to the latest
256    /// uncommitted item across `pending_epochs` and `prepared_epochs`.
257    fn has_uncommitted_schema_change(&self) -> bool {
258        if let Some((_, _, schema_change)) = self.pending_epochs.back() {
259            schema_change.is_some()
260        } else if let Some((_, _, schema_change)) = self.prepared_epochs.back() {
261            schema_change.is_some()
262        } else {
263            false
264        }
265    }
266}
267
268struct CoordinationHandleManager {
269    param: SinkParam,
270    writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
271    next_handle_id: HandleId,
272    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
273}
274
275impl CoordinationHandleManager {
276    fn start(
277        &mut self,
278        log_store_rewind_start_epoch: Option<u64>,
279        handle_ids: impl IntoIterator<Item = HandleId>,
280    ) -> anyhow::Result<()> {
281        for handle_id in handle_ids {
282            let handle = self
283                .writer_handles
284                .get_mut(&handle_id)
285                .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
286            handle.start(log_store_rewind_start_epoch).map_err(|_| {
287                anyhow!(
288                    "fail to start {:?} for handle {}",
289                    log_store_rewind_start_epoch,
290                    handle_id
291                )
292            })?;
293        }
294        Ok(())
295    }
296
297    fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
298        for (handle_id, handle) in &mut self.writer_handles {
299            handle
300                .ack_aligned_initial_epoch(aligned_initial_epoch)
301                .map_err(|_| {
302                    anyhow!(
303                        "fail to ack_aligned_initial_epoch {:?} for handle {}",
304                        aligned_initial_epoch,
305                        handle_id
306                    )
307                })?;
308        }
309        Ok(())
310    }
311
312    fn ack_commit(
313        &mut self,
314        epoch: u64,
315        handle_ids: impl IntoIterator<Item = HandleId>,
316    ) -> anyhow::Result<()> {
317        for handle_id in handle_ids {
318            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
319                anyhow!(
320                    "fail to find handle for {} when ack commit on epoch {}",
321                    handle_id,
322                    epoch
323                )
324            })?;
325            handle.ack_commit(epoch).map_err(|_| {
326                anyhow!(
327                    "fail to ack commit on epoch {} for handle {}",
328                    epoch,
329                    handle_id
330                )
331            })?;
332        }
333        Ok(())
334    }
335
336    async fn next_request_inner(
337        writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
338    ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
339        poll_fn(|cx| {
340            for (handle_id, handle) in writer_handles.iter_mut() {
341                if let Poll::Ready(result) = handle.poll_next_request(cx) {
342                    return Poll::Ready(result.map(|request| (*handle_id, request)));
343                }
344            }
345            Poll::Pending
346        })
347        .await
348    }
349}
350
351enum CoordinationHandleManagerEvent {
352    NewHandle,
353    UpdateVnodeBitmap,
354    Stop,
355    CommitRequest {
356        epoch: u64,
357        metadata: SinkMetadata,
358        schema_change: Option<PbSinkSchemaChange>,
359    },
360    AlignInitialEpoch(u64),
361}
362
363impl CoordinationHandleManagerEvent {
364    fn name(&self) -> &'static str {
365        match self {
366            CoordinationHandleManagerEvent::NewHandle => "NewHandle",
367            CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
368            CoordinationHandleManagerEvent::Stop => "Stop",
369            CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
370            CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
371        }
372    }
373}
374
375impl CoordinationHandleManager {
376    async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
377        select! {
378            handle = self.request_rx.recv() => {
379                let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
380                if handle.param() != &self.param {
381                    warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
382                }
383                let handle_id = self.next_handle_id;
384                self.next_handle_id += 1;
385                self.writer_handles.insert(handle_id, handle);
386                Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
387            }
388            result = Self::next_request_inner(&mut self.writer_handles) => {
389                let (handle_id, request) = result?;
390                let event = match request {
391                    coordinate_request::Msg::CommitRequest(request) => {
392                        CoordinationHandleManagerEvent::CommitRequest {
393                            epoch: request.epoch,
394                            metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
395                            schema_change: request.schema_change,
396                        }
397                    }
398                    coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
399                        CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
400                    }
401                    coordinate_request::Msg::UpdateVnodeRequest(_) => {
402                        CoordinationHandleManagerEvent::UpdateVnodeBitmap
403                    }
404                    coordinate_request::Msg::Stop(_) => {
405                        CoordinationHandleManagerEvent::Stop
406                    }
407                    coordinate_request::Msg::StartRequest(_) => {
408                        unreachable!("should have been handled");
409                    }
410                };
411                Ok((handle_id, event))
412            }
413        }
414    }
415
416    fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
417        self.writer_handles[&handle_id].vnode_bitmap()
418    }
419
420    fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
421        self.writer_handles
422            .remove(&handle_id)
423            .expect("should exist")
424            .stop()
425    }
426
427    async fn wait_init_handles(&mut self) -> anyhow::Result<HashSet<HandleId>> {
428        assert!(self.writer_handles.is_empty());
429        let mut init_requests = AligningRequests::default();
430        while !init_requests.aligned() {
431            let (handle_id, event) = self.next_event().await?;
432            let unexpected_event = match event {
433                CoordinationHandleManagerEvent::NewHandle => {
434                    init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
435                    continue;
436                }
437                event => event.name(),
438            };
439            return Err(anyhow!(
440                "expect new handle during init, but got {}",
441                unexpected_event
442            ));
443        }
444        Ok(init_requests.handle_ids)
445    }
446
447    async fn alter_parallelisms(
448        &mut self,
449        altered_handles: impl Iterator<Item = HandleId>,
450    ) -> anyhow::Result<HashSet<HandleId>> {
451        let mut requests = AligningRequests::default();
452        for handle_id in altered_handles {
453            requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
454        }
455        let mut remaining_handles: HashSet<_> = self
456            .writer_handles
457            .keys()
458            .filter(|handle_id| !requests.handle_ids.contains(handle_id))
459            .cloned()
460            .collect();
461        while !remaining_handles.is_empty() || !requests.aligned() {
462            let (handle_id, event) = self.next_event().await?;
463            match event {
464                CoordinationHandleManagerEvent::NewHandle => {
465                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
466                }
467                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
468                    assert!(remaining_handles.remove(&handle_id));
469                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
470                }
471                CoordinationHandleManagerEvent::Stop => {
472                    assert!(remaining_handles.remove(&handle_id));
473                    self.stop_handle(handle_id)?;
474                }
475                CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
476                    bail!(
477                        "receive commit request on epoch {} from handle {} during alter parallelism",
478                        epoch,
479                        handle_id
480                    );
481                }
482                CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
483                    bail!(
484                        "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
485                        epoch,
486                        handle_id
487                    );
488                }
489            }
490        }
491        Ok(requests.handle_ids)
492    }
493}
494
495/// Represents the coordinator worker's state machine for handling schema changes.
496///
497/// - `Running`: Normal operation, handles can be started immediately
498/// - `WaitingForFlushed`: Waiting for all pending two-phase commits to complete before starting new handles. This
499///   ensures new sink executors load the correct schema.
500enum CoordinatorWorkerState {
501    Running,
502    WaitingForFlushed(HashSet<HandleId>),
503}
504
505pub struct CoordinatorWorker {
506    handle_manager: CoordinationHandleManager,
507    prev_committed_epoch: Option<u64>,
508    curr_state: CoordinatorWorkerState,
509}
510
511enum CoordinatorWorkerEvent {
512    HandleManagerEvent(HandleId, CoordinationHandleManagerEvent),
513    ReadyToCommit(u64, Option<Vec<u8>>, Option<PbSinkSchemaChange>),
514}
515
516impl CoordinatorWorker {
517    pub async fn run(
518        param: SinkParam,
519        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
520        db: DatabaseConnection,
521        subscriber: SinkCommittedEpochSubscriber,
522        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
523    ) {
524        let sink = match build_sink(param.clone()) {
525            Ok(sink) => sink,
526            Err(e) => {
527                error!(
528                    error = %e.as_report(),
529                    "unable to build sink with param {:?}",
530                    param
531                );
532                return;
533            }
534        };
535
536        dispatch_sink!(sink, sink, {
537            let coordinator = match sink
538                .new_coordinator(Some(iceberg_compact_stat_sender))
539                .await
540            {
541                Ok(coordinator) => coordinator,
542                Err(e) => {
543                    error!(
544                        error = %e.as_report(),
545                        "unable to build coordinator with param {:?}",
546                        param
547                    );
548                    return;
549                }
550            };
551            Self::execute_coordinator(db, param, request_rx, coordinator, subscriber).await
552        });
553    }
554
555    pub async fn execute_coordinator(
556        db: DatabaseConnection,
557        param: SinkParam,
558        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
559        coordinator: SinkCommitCoordinator,
560        subscriber: SinkCommittedEpochSubscriber,
561    ) {
562        let mut worker = CoordinatorWorker {
563            handle_manager: CoordinationHandleManager {
564                param,
565                writer_handles: HashMap::new(),
566                next_handle_id: 0,
567                request_rx,
568            },
569            prev_committed_epoch: None,
570            curr_state: CoordinatorWorkerState::Running,
571        };
572
573        if let Err(e) = worker.run_coordination(db, coordinator, subscriber).await {
574            for handle in worker.handle_manager.writer_handles.into_values() {
575                handle.abort(Status::internal(format!(
576                    "failed to run coordination: {:?}",
577                    e.as_report()
578                )))
579            }
580        }
581    }
582
583    async fn try_handle_init_requests(
584        &mut self,
585        pending_handle_ids: &HashSet<HandleId>,
586        two_phase_handler: &mut TwoPhaseCommitHandler,
587    ) -> anyhow::Result<()> {
588        assert!(matches!(self.curr_state, CoordinatorWorkerState::Running));
589        if two_phase_handler.has_uncommitted_schema_change() {
590            // Delay handling init requests until all pending epochs are flushed.
591            self.curr_state = CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids.clone());
592        } else {
593            self.handle_init_requests_impl(pending_handle_ids.clone())
594                .await?;
595        }
596        Ok(())
597    }
598
599    async fn handle_init_requests_impl(
600        &mut self,
601        pending_handle_ids: impl IntoIterator<Item = HandleId>,
602    ) -> anyhow::Result<()> {
603        let log_store_rewind_start_epoch = self.prev_committed_epoch;
604        self.handle_manager
605            .start(log_store_rewind_start_epoch, pending_handle_ids)?;
606        if log_store_rewind_start_epoch.is_none() {
607            let mut align_requests = AligningRequests::default();
608            while !align_requests.aligned() {
609                let (handle_id, event) = self.handle_manager.next_event().await?;
610                match event {
611                    CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
612                        align_requests.add_new_request(
613                            handle_id,
614                            initial_epoch,
615                            self.handle_manager.vnode_bitmap(handle_id),
616                        )?;
617                    }
618                    other => {
619                        return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
620                    }
621                }
622            }
623            let aligned_initial_epoch = align_requests
624                .requests
625                .into_iter()
626                .max()
627                .expect("non-empty");
628            self.handle_manager
629                .ack_aligned_initial_epoch(aligned_initial_epoch)?;
630        }
631        Ok(())
632    }
633
634    async fn next_event(
635        &mut self,
636        two_phase_handler: &mut TwoPhaseCommitHandler,
637    ) -> anyhow::Result<CoordinatorWorkerEvent> {
638        if let CoordinatorWorkerState::WaitingForFlushed(pending_handle_ids) = &self.curr_state
639            && two_phase_handler.is_empty()
640        {
641            let pending_handle_ids = pending_handle_ids.clone();
642            self.handle_init_requests_impl(pending_handle_ids).await?;
643            self.curr_state = CoordinatorWorkerState::Running;
644        }
645
646        select! {
647            next_handle_event = self.handle_manager.next_event() => {
648                let (handle_id, event) = next_handle_event?;
649                Ok(CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event))
650            }
651
652            next_item_to_commit = two_phase_handler.next_to_commit() => {
653                let (epoch, metadata, schema_change) = next_item_to_commit?;
654                Ok(CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change))
655            }
656        }
657    }
658
659    async fn run_coordination(
660        &mut self,
661        db: DatabaseConnection,
662        mut coordinator: SinkCommitCoordinator,
663        subscriber: SinkCommittedEpochSubscriber,
664    ) -> anyhow::Result<()> {
665        let sink_id = self.handle_manager.param.sink_id;
666
667        let mut two_phase_handler = self
668            .init_state_from_store(&db, sink_id, subscriber, &mut coordinator)
669            .await?;
670        match &mut coordinator {
671            SinkCommitCoordinator::SinglePhase(coordinator) => coordinator.init().await?,
672            SinkCommitCoordinator::TwoPhase(coordinator) => coordinator.init().await?,
673        }
674
675        let mut running_handles = self.handle_manager.wait_init_handles().await?;
676        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
677            .await?;
678
679        let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
680        let mut pending_new_handles = vec![];
681        loop {
682            let event = self.next_event(&mut two_phase_handler).await?;
683            let (handle_id, epoch, commit_request) = match event {
684                CoordinatorWorkerEvent::HandleManagerEvent(handle_id, event) => match event {
685                    CoordinationHandleManagerEvent::NewHandle => {
686                        pending_new_handles.push(handle_id);
687                        continue;
688                    }
689                    CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
690                        running_handles = self
691                            .handle_manager
692                            .alter_parallelisms(pending_new_handles.drain(..).chain([handle_id]))
693                            .await?;
694                        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
695                            .await?;
696                        continue;
697                    }
698                    CoordinationHandleManagerEvent::Stop => {
699                        self.handle_manager.stop_handle(handle_id)?;
700                        running_handles = self
701                            .handle_manager
702                            .alter_parallelisms(pending_new_handles.drain(..))
703                            .await?;
704                        self.try_handle_init_requests(&running_handles, &mut two_phase_handler)
705                            .await?;
706
707                        continue;
708                    }
709                    CoordinationHandleManagerEvent::CommitRequest {
710                        epoch,
711                        metadata,
712                        schema_change,
713                    } => (handle_id, epoch, (metadata, schema_change)),
714                    CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
715                        bail!("receive AlignInitialEpoch after initialization")
716                    }
717                },
718                CoordinatorWorkerEvent::ReadyToCommit(epoch, metadata, schema_change) => {
719                    let start_time = Instant::now();
720                    let commit_fut = async {
721                        match &mut coordinator {
722                            SinkCommitCoordinator::SinglePhase(coordinator) => {
723                                assert!(metadata.is_none());
724                                if let Some(schema_change) = schema_change {
725                                    coordinator
726                                        .commit_schema_change(epoch, schema_change)
727                                        .instrument_await(Self::commit_span(
728                                            "single_phase_schema_change",
729                                            sink_id,
730                                            epoch,
731                                        ))
732                                        .await?;
733                                }
734                            }
735                            SinkCommitCoordinator::TwoPhase(coordinator) => {
736                                if let Some(metadata) = metadata {
737                                    coordinator
738                                        .commit_data(epoch, metadata)
739                                        .instrument_await(Self::commit_span(
740                                            "two_phase_commit_data",
741                                            sink_id,
742                                            epoch,
743                                        ))
744                                        .await?;
745                                }
746                                if let Some(schema_change) = schema_change {
747                                    coordinator
748                                        .commit_schema_change(epoch, schema_change)
749                                        .instrument_await(Self::commit_span(
750                                            "two_phase_commit_schema_change",
751                                            sink_id,
752                                            epoch,
753                                        ))
754                                        .await?;
755                                }
756                            }
757                        }
758                        Ok(())
759                    };
760                    let commit_res =
761                        run_future_with_periodic_fn(commit_fut, Duration::from_secs(5), || {
762                            warn!(
763                                elapsed = ?start_time.elapsed(),
764                                %sink_id,
765                                "committing"
766                            );
767                        })
768                        .await;
769
770                    match commit_res {
771                        Ok(_) => {
772                            two_phase_handler.ack_committed(epoch).await?;
773                            self.prev_committed_epoch = Some(epoch);
774                        }
775                        Err(e) => {
776                            two_phase_handler.failed_committed(epoch, e);
777                        }
778                    }
779
780                    continue;
781                }
782            };
783            if !running_handles.contains(&handle_id) {
784                bail!(
785                    "receiving commit request from non-running handle {}, running handles: {:?}",
786                    handle_id,
787                    running_handles
788                );
789            }
790            pending_epochs.entry(epoch).or_default().add_new_request(
791                handle_id,
792                commit_request,
793                self.handle_manager.vnode_bitmap(handle_id),
794            )?;
795            if pending_epochs
796                .first_key_value()
797                .expect("non-empty")
798                .1
799                .aligned()
800            {
801                let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
802                let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
803                let mut requests = commit_requests.requests.into_iter();
804                let (first_metadata, first_schema_change) = requests.next().expect("non-empty");
805                metadatas.push(first_metadata);
806                for (metadata, schema_change) in requests {
807                    if first_schema_change != schema_change {
808                        return Err(anyhow!(
809                            "got different schema change {:?} to prev schema change {:?}",
810                            schema_change,
811                            first_schema_change
812                        ));
813                    }
814                    metadatas.push(metadata);
815                }
816
817                match &mut coordinator {
818                    SinkCommitCoordinator::SinglePhase(coordinator) => {
819                        if !metadatas.is_empty() {
820                            let start_time = Instant::now();
821                            run_future_with_periodic_fn(
822                                coordinator.commit_data(epoch, metadatas).instrument_await(
823                                    Self::commit_span("single_phase_commit_data", sink_id, epoch),
824                                ),
825                                Duration::from_secs(5),
826                                || {
827                                    warn!(
828                                        elapsed = ?start_time.elapsed(),
829                                        %sink_id,
830                                        "committing"
831                                    );
832                                },
833                            )
834                            .await
835                            .map_err(|e| anyhow!(e))?;
836                        }
837                        if first_schema_change.is_some() {
838                            persist_pre_commit_metadata(
839                                &db,
840                                sink_id as _,
841                                epoch,
842                                None,
843                                first_schema_change.as_ref(),
844                            )
845                            .await?;
846                            two_phase_handler.push_new_item(epoch, None, first_schema_change);
847                        } else {
848                            self.prev_committed_epoch = Some(epoch);
849                        }
850                    }
851                    SinkCommitCoordinator::TwoPhase(coordinator) => {
852                        let commit_metadata = coordinator
853                            .pre_commit(epoch, metadatas, first_schema_change.clone())
854                            .instrument_await(Self::commit_span(
855                                "two_phase_pre_commit",
856                                sink_id,
857                                epoch,
858                            ))
859                            .await?;
860                        if commit_metadata.is_some() || first_schema_change.is_some() {
861                            persist_pre_commit_metadata(
862                                &db,
863                                sink_id as _,
864                                epoch,
865                                commit_metadata.clone(),
866                                first_schema_change.as_ref(),
867                            )
868                            .await?;
869                            two_phase_handler.push_new_item(
870                                epoch,
871                                commit_metadata,
872                                first_schema_change,
873                            );
874                        } else {
875                            // No data to commit and no schema change.
876                            self.prev_committed_epoch = Some(epoch);
877                        }
878                    }
879                }
880
881                self.handle_manager
882                    .ack_commit(epoch, commit_requests.handle_ids)?;
883            }
884        }
885    }
886
887    /// Return `TwoPhaseCommitHandler` initialized from the persisted state in the meta store.
888    async fn init_state_from_store(
889        &mut self,
890        db: &DatabaseConnection,
891        sink_id: SinkId,
892        subscriber: SinkCommittedEpochSubscriber,
893        coordinator: &mut SinkCommitCoordinator,
894    ) -> anyhow::Result<TwoPhaseCommitHandler> {
895        let ordered_metadata = list_sink_states_ordered_by_epoch(db, sink_id as _).await?;
896
897        let mut metadata_iter = ordered_metadata.into_iter().peekable();
898        let last_committed_epoch = metadata_iter
899            .next_if(|(_, state, _, _)| matches!(state, SinkState::Committed))
900            .map(|(epoch, _, _, _)| epoch);
901        self.prev_committed_epoch = last_committed_epoch;
902
903        let pending_items = metadata_iter
904            .peeking_take_while(|(_, state, _, _)| matches!(state, SinkState::Pending))
905            .map(|(epoch, _, metadata, schema_change)| (epoch, metadata, schema_change))
906            .collect_vec();
907
908        let mut aborted_epochs = vec![];
909
910        for (epoch, state, metadata, _) in metadata_iter {
911            match state {
912                SinkState::Aborted => {
913                    if let Some(metadata) = metadata
914                        && let SinkCommitCoordinator::TwoPhase(coordinator) = coordinator
915                    {
916                        coordinator.abort(epoch, metadata).await;
917                    }
918                    aborted_epochs.push(epoch);
919                }
920                other => {
921                    unreachable!(
922                        "unexpected state {:?} after pending items at epoch {}",
923                        other, epoch
924                    );
925                }
926            }
927        }
928
929        // Records for all aborted epochs and previously committed epochs are no longer needed.
930        clean_aborted_records(db, sink_id, aborted_epochs).await?;
931
932        let (initial_hummock_committed_epoch, job_committed_epoch_rx) = subscriber(sink_id).await?;
933        let mut two_phase_handler = TwoPhaseCommitHandler::new(
934            db.clone(),
935            sink_id,
936            initial_hummock_committed_epoch,
937            job_committed_epoch_rx,
938            last_committed_epoch,
939        );
940
941        for (epoch, metadata, schema_change) in pending_items {
942            two_phase_handler.push_new_item(epoch, metadata, schema_change);
943        }
944
945        Ok(two_phase_handler)
946    }
947
948    fn commit_span(stage: &str, sink_id: SinkId, epoch: u64) -> await_tree::Span {
949        await_tree::span!("sink_coord_{stage} (sink_id {sink_id}, epoch {epoch})").long_running()
950    }
951}