risingwave_stream/task/barrier_worker/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::FutureExt;
25use futures::stream::FuturesOrdered;
26use prometheus::HistogramTimer;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_pb::stream_plan::barrier::BarrierKind;
30use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
31use risingwave_storage::StateStoreImpl;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::{mpsc, oneshot};
34use tokio::task::JoinHandle;
35
36use crate::error::{StreamError, StreamResult};
37use crate::executor::Barrier;
38use crate::executor::monitor::StreamingMetrics;
39use crate::task::progress::BackfillState;
40use crate::task::{
41    ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
42    StreamActorManager, UpDownActorIds,
43};
44
45struct IssuedState {
46    /// Actor ids remaining to be collected.
47    pub remaining_actors: BTreeSet<ActorId>,
48
49    pub barrier_inflight_latency: HistogramTimer,
50}
51
52impl Debug for IssuedState {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("IssuedState")
55            .field("remaining_actors", &self.remaining_actors)
56            .finish()
57    }
58}
59
60/// The state machine of local barrier manager.
61#[derive(Debug)]
62enum ManagedBarrierStateInner {
63    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
64    Issued(IssuedState),
65
66    /// The barrier has been collected by all remaining actors
67    AllCollected {
68        create_mview_progress: Vec<PbCreateMviewProgress>,
69        list_finished_source_ids: Vec<u32>,
70        load_finished_source_ids: Vec<u32>,
71        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
72        truncate_tables: Vec<u32>,
73        refresh_finished_tables: Vec<u32>,
74    },
75}
76
77#[derive(Debug)]
78struct BarrierState {
79    barrier: Barrier,
80    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
81    table_ids: Option<HashSet<TableId>>,
82    inner: ManagedBarrierStateInner,
83}
84
85use risingwave_common::must_match;
86use risingwave_pb::stream_service::InjectBarrierRequest;
87use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
88
89use crate::executor::exchange::permit;
90use crate::executor::exchange::permit::channel_from_config;
91use crate::task::barrier_worker::ScoredStreamError;
92use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
93use crate::task::cdc_progress::CdcTableBackfillState;
94
95pub(super) struct ManagedBarrierStateDebugInfo<'a> {
96    running_actors: BTreeSet<ActorId>,
97    graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
98}
99
100impl Display for ManagedBarrierStateDebugInfo<'_> {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        write!(f, "running_actors: ")?;
103        for actor_id in &self.running_actors {
104            write!(f, "{}, ", actor_id)?;
105        }
106        for (partial_graph_id, graph_states) in self.graph_states {
107            writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
108            write!(f, "{}", graph_states)?;
109        }
110        Ok(())
111    }
112}
113
114impl Display for &'_ PartialGraphManagedBarrierState {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        let mut prev_epoch = 0u64;
117        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
118            write!(f, "> Epoch {}: ", epoch)?;
119            match &barrier_state.inner {
120                ManagedBarrierStateInner::Issued(state) => {
121                    write!(
122                        f,
123                        "Issued [{:?}]. Remaining actors: [",
124                        barrier_state.barrier.kind
125                    )?;
126                    let mut is_prev_epoch_issued = false;
127                    if prev_epoch != 0 {
128                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
129                        if let ManagedBarrierStateInner::Issued(IssuedState {
130                            remaining_actors: remaining_actors_prev,
131                            ..
132                        }) = &bs.inner
133                        {
134                            // Only show the actors that are not in the previous epoch.
135                            is_prev_epoch_issued = true;
136                            let mut duplicates = 0usize;
137                            for actor_id in &state.remaining_actors {
138                                if !remaining_actors_prev.contains(actor_id) {
139                                    write!(f, "{}, ", actor_id)?;
140                                } else {
141                                    duplicates += 1;
142                                }
143                            }
144                            if duplicates > 0 {
145                                write!(f, "...and {} actors in prev epoch", duplicates)?;
146                            }
147                        }
148                    }
149                    if !is_prev_epoch_issued {
150                        for actor_id in &state.remaining_actors {
151                            write!(f, "{}, ", actor_id)?;
152                        }
153                    }
154                    write!(f, "]")?;
155                }
156                ManagedBarrierStateInner::AllCollected { .. } => {
157                    write!(f, "AllCollected")?;
158                }
159            }
160            prev_epoch = *epoch;
161            writeln!(f)?;
162        }
163
164        if !self.create_mview_progress.is_empty() {
165            writeln!(f, "Create MView Progress:")?;
166            for (epoch, progress) in &self.create_mview_progress {
167                write!(f, "> Epoch {}:", epoch)?;
168                for (actor_id, state) in progress {
169                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
170                }
171            }
172        }
173
174        Ok(())
175    }
176}
177
178enum InflightActorStatus {
179    /// The actor has been issued some barriers, but has not collected the first barrier
180    IssuedFirst(Vec<Barrier>),
181    /// The actor has been issued some barriers, and has collected the first barrier
182    Running(u64),
183}
184
185impl InflightActorStatus {
186    fn max_issued_epoch(&self) -> u64 {
187        match self {
188            InflightActorStatus::Running(epoch) => *epoch,
189            InflightActorStatus::IssuedFirst(issued_barriers) => {
190                issued_barriers.last().expect("non-empty").epoch.prev
191            }
192        }
193    }
194}
195
196pub(crate) struct InflightActorState {
197    actor_id: ActorId,
198    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
199    /// `prev_epoch` -> partial graph id
200    pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
201    status: InflightActorStatus,
202    /// Whether the actor has been issued a stop barrier
203    is_stopping: bool,
204
205    new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
206    join_handle: JoinHandle<()>,
207    monitor_task_handle: Option<JoinHandle<()>>,
208}
209
210impl InflightActorState {
211    pub(super) fn start(
212        actor_id: ActorId,
213        initial_partial_graph_id: PartialGraphId,
214        initial_barrier: &Barrier,
215        new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
216        join_handle: JoinHandle<()>,
217        monitor_task_handle: Option<JoinHandle<()>>,
218    ) -> Self {
219        Self {
220            actor_id,
221            barrier_senders: vec![],
222            inflight_barriers: BTreeMap::from_iter([(
223                initial_barrier.epoch.prev,
224                initial_partial_graph_id,
225            )]),
226            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
227            is_stopping: false,
228            new_output_request_tx,
229            join_handle,
230            monitor_task_handle,
231        }
232    }
233
234    pub(super) fn issue_barrier(
235        &mut self,
236        partial_graph_id: PartialGraphId,
237        barrier: &Barrier,
238        is_stop: bool,
239    ) -> StreamResult<()> {
240        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
241
242        for barrier_sender in &self.barrier_senders {
243            barrier_sender.send(barrier.clone()).map_err(|_| {
244                StreamError::barrier_send(
245                    barrier.clone(),
246                    self.actor_id,
247                    "failed to send to registered sender",
248                )
249            })?;
250        }
251
252        assert!(
253            self.inflight_barriers
254                .insert(barrier.epoch.prev, partial_graph_id)
255                .is_none()
256        );
257
258        match &mut self.status {
259            InflightActorStatus::IssuedFirst(pending_barriers) => {
260                pending_barriers.push(barrier.clone());
261            }
262            InflightActorStatus::Running(prev_epoch) => {
263                *prev_epoch = barrier.epoch.prev;
264            }
265        };
266
267        if is_stop {
268            assert!(!self.is_stopping, "stopped actor should not issue barrier");
269            self.is_stopping = true;
270        }
271        Ok(())
272    }
273
274    pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
275        let (prev_epoch, prev_partial_graph_id) =
276            self.inflight_barriers.pop_first().expect("should exist");
277        assert_eq!(prev_epoch, epoch.prev);
278        match &self.status {
279            InflightActorStatus::IssuedFirst(pending_barriers) => {
280                assert_eq!(
281                    prev_epoch,
282                    pending_barriers.first().expect("non-empty").epoch.prev
283                );
284                self.status = InflightActorStatus::Running(
285                    pending_barriers.last().expect("non-empty").epoch.prev,
286                );
287            }
288            InflightActorStatus::Running(_) => {}
289        }
290        (
291            prev_partial_graph_id,
292            self.inflight_barriers.is_empty() && self.is_stopping,
293        )
294    }
295}
296
297/// Part of [`DatabaseManagedBarrierState`]
298pub(crate) struct PartialGraphManagedBarrierState {
299    /// Record barrier state for each epoch of concurrent checkpoints.
300    ///
301    /// The key is `prev_epoch`, and the first value is `curr_epoch`
302    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
303
304    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
305
306    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
307    ///
308    /// The process of progress reporting is as follows:
309    /// 1. updated by [`crate::task::barrier_manager::CreateMviewProgressReporter::update`]
310    /// 2. converted to [`ManagedBarrierStateInner`] in [`Self::may_have_collected_all`]
311    /// 3. handled by [`Self::pop_barrier_to_complete`]
312    /// 4. put in [`crate::task::barrier_worker::BarrierCompleteResult`] and reported to meta.
313    pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
314
315    /// Record the source list finished reports for each epoch of concurrent checkpoints.
316    /// Used for refreshable batch source.
317    pub(crate) list_finished_source_ids: HashMap<u64, HashSet<u32>>,
318
319    /// Record the source load finished reports for each epoch of concurrent checkpoints.
320    /// Used for refreshable batch source.
321    pub(crate) load_finished_source_ids: HashMap<u64, HashSet<u32>>,
322
323    pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
324
325    /// Record the tables to truncate for each epoch of concurrent checkpoints.
326    pub(crate) truncate_tables: HashMap<u64, HashSet<u32>>,
327    /// Record the tables that have finished refresh for each epoch of concurrent checkpoints.
328    /// Used for materialized view refresh completion reporting.
329    pub(crate) refresh_finished_tables: HashMap<u64, HashSet<u32>>,
330
331    state_store: StateStoreImpl,
332
333    streaming_metrics: Arc<StreamingMetrics>,
334}
335
336impl PartialGraphManagedBarrierState {
337    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
338        Self::new_inner(
339            actor_manager.env.state_store(),
340            actor_manager.streaming_metrics.clone(),
341        )
342    }
343
344    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
345        Self {
346            epoch_barrier_state_map: Default::default(),
347            prev_barrier_table_ids: None,
348            create_mview_progress: Default::default(),
349            list_finished_source_ids: Default::default(),
350            load_finished_source_ids: Default::default(),
351            cdc_table_backfill_progress: Default::default(),
352            truncate_tables: Default::default(),
353            refresh_finished_tables: Default::default(),
354            state_store,
355            streaming_metrics,
356        }
357    }
358
359    #[cfg(test)]
360    pub(crate) fn for_test() -> Self {
361        Self::new_inner(
362            StateStoreImpl::for_test(),
363            Arc::new(StreamingMetrics::unused()),
364        )
365    }
366
367    pub(super) fn is_empty(&self) -> bool {
368        self.epoch_barrier_state_map.is_empty()
369    }
370}
371
372pub(crate) struct SuspendedDatabaseState {
373    pub(super) suspend_time: Instant,
374    inner: DatabaseManagedBarrierState,
375    failure: Option<(Option<ActorId>, StreamError)>,
376}
377
378impl SuspendedDatabaseState {
379    fn new(
380        state: DatabaseManagedBarrierState,
381        failure: Option<(Option<ActorId>, StreamError)>,
382        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
383    ) -> Self {
384        Self {
385            suspend_time: Instant::now(),
386            inner: state,
387            failure,
388        }
389    }
390
391    async fn reset(mut self) -> ResetDatabaseOutput {
392        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
393        self.inner.abort_and_wait_actors().await;
394        if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
395            hummock.clear_tables(self.inner.table_ids).await;
396        }
397        ResetDatabaseOutput { root_err }
398    }
399}
400
401pub(crate) struct ResettingDatabaseState {
402    join_handle: JoinHandle<ResetDatabaseOutput>,
403    reset_request_id: u32,
404}
405
406pub(crate) struct ResetDatabaseOutput {
407    pub(crate) root_err: Option<ScoredStreamError>,
408}
409
410pub(crate) enum DatabaseStatus {
411    ReceivedExchangeRequest(
412        Vec<(
413            UpDownActorIds,
414            oneshot::Sender<StreamResult<permit::Receiver>>,
415        )>,
416    ),
417    Running(DatabaseManagedBarrierState),
418    Suspended(SuspendedDatabaseState),
419    Resetting(ResettingDatabaseState),
420    /// temporary place holder
421    Unspecified,
422}
423
424impl DatabaseStatus {
425    pub(crate) async fn abort(&mut self) {
426        match self {
427            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
428                for (_, sender) in pending_requests.drain(..) {
429                    let _ = sender.send(Err(anyhow!("database aborted").into()));
430                }
431            }
432            DatabaseStatus::Running(state) => {
433                state.abort_and_wait_actors().await;
434            }
435            DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
436                state.abort_and_wait_actors().await;
437            }
438            DatabaseStatus::Resetting(state) => {
439                (&mut state.join_handle)
440                    .await
441                    .expect("failed to join reset database join handle");
442            }
443            DatabaseStatus::Unspecified => {
444                unreachable!()
445            }
446        }
447    }
448
449    pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
450        match self {
451            DatabaseStatus::ReceivedExchangeRequest(_) => {
452                unreachable!("should not handle request")
453            }
454            DatabaseStatus::Running(state) => Some(state),
455            DatabaseStatus::Suspended(_) => None,
456            DatabaseStatus::Resetting(_) => {
457                unreachable!("should not receive further request during cleaning")
458            }
459            DatabaseStatus::Unspecified => {
460                unreachable!()
461            }
462        }
463    }
464
465    pub(super) fn poll_next_event(
466        &mut self,
467        cx: &mut Context<'_>,
468    ) -> Poll<ManagedBarrierStateEvent> {
469        match self {
470            DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
471            DatabaseStatus::Running(state) => state.poll_next_event(cx),
472            DatabaseStatus::Suspended(_) => Poll::Pending,
473            DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
474                let output = result.expect("should be able to join");
475                ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
476            }),
477            DatabaseStatus::Unspecified => {
478                unreachable!()
479            }
480        }
481    }
482
483    pub(super) fn suspend(
484        &mut self,
485        failed_actor: Option<ActorId>,
486        err: StreamError,
487        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
488    ) {
489        let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
490        *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
491            state,
492            Some((failed_actor, err)),
493            completing_futures,
494        ));
495    }
496
497    pub(super) fn start_reset(
498        &mut self,
499        database_id: DatabaseId,
500        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
501        reset_request_id: u32,
502    ) {
503        let join_handle = match replace(self, DatabaseStatus::Unspecified) {
504            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
505                for (_, sender) in pending_requests {
506                    let _ = sender.send(Err(anyhow!("database reset").into()));
507                }
508                tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
509            }
510            DatabaseStatus::Running(state) => {
511                assert_eq!(database_id, state.database_id);
512                info!(
513                    database_id = database_id.database_id,
514                    reset_request_id, "start database reset from Running"
515                );
516                tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
517            }
518            DatabaseStatus::Suspended(state) => {
519                assert!(
520                    completing_futures.is_none(),
521                    "should have been clear when suspended"
522                );
523                assert_eq!(database_id, state.inner.database_id);
524                info!(
525                    database_id = database_id.database_id,
526                    reset_request_id,
527                    suspend_elapsed = ?state.suspend_time.elapsed(),
528                    "start database reset after suspended"
529                );
530                tokio::spawn(state.reset())
531            }
532            DatabaseStatus::Resetting(state) => {
533                let prev_request_id = state.reset_request_id;
534                info!(
535                    database_id = database_id.database_id,
536                    reset_request_id, prev_request_id, "receive duplicate reset request"
537                );
538                assert!(reset_request_id > prev_request_id);
539                state.join_handle
540            }
541            DatabaseStatus::Unspecified => {
542                unreachable!()
543            }
544        };
545        *self = DatabaseStatus::Resetting(ResettingDatabaseState {
546            join_handle,
547            reset_request_id,
548        });
549    }
550}
551
552#[derive(Default)]
553pub(crate) struct ManagedBarrierState {
554    pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
555}
556
557pub(super) enum ManagedBarrierStateEvent {
558    BarrierCollected {
559        partial_graph_id: PartialGraphId,
560        barrier: Barrier,
561    },
562    ActorError {
563        actor_id: ActorId,
564        err: StreamError,
565    },
566    DatabaseReset(ResetDatabaseOutput, u32),
567}
568
569impl ManagedBarrierState {
570    pub(super) fn next_event(
571        &mut self,
572    ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
573        poll_fn(|cx| {
574            for (database_id, database) in &mut self.databases {
575                if let Poll::Ready(event) = database.poll_next_event(cx) {
576                    return Poll::Ready((*database_id, event));
577                }
578            }
579            Poll::Pending
580        })
581    }
582}
583
584/// Per-database barrier state manager. Handles barriers for one specific database.
585/// Part of [`ManagedBarrierState`] in [`super::LocalBarrierWorker`].
586///
587/// See [`crate::task`] for architecture overview.
588pub(crate) struct DatabaseManagedBarrierState {
589    database_id: DatabaseId,
590    pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
591    pub(super) actor_pending_new_output_requests:
592        HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
593
594    pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
595
596    table_ids: HashSet<TableId>,
597
598    actor_manager: Arc<StreamActorManager>,
599
600    pub(super) local_barrier_manager: LocalBarrierManager,
601
602    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
603    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
604}
605
606impl DatabaseManagedBarrierState {
607    /// Create a barrier manager state. This will be called only once.
608    pub(super) fn new(
609        database_id: DatabaseId,
610        term_id: String,
611        actor_manager: Arc<StreamActorManager>,
612    ) -> Self {
613        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
614            LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
615        Self {
616            database_id,
617            actor_states: Default::default(),
618            actor_pending_new_output_requests: Default::default(),
619            graph_states: Default::default(),
620            table_ids: Default::default(),
621            actor_manager,
622            local_barrier_manager,
623            barrier_event_rx,
624            actor_failure_rx,
625        }
626    }
627
628    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
629        ManagedBarrierStateDebugInfo {
630            running_actors: self.actor_states.keys().cloned().collect(),
631            graph_states: &self.graph_states,
632        }
633    }
634
635    async fn abort_and_wait_actors(&mut self) {
636        for (actor_id, state) in &self.actor_states {
637            tracing::debug!("force stopping actor {}", actor_id);
638            state.join_handle.abort();
639            if let Some(monitor_task_handle) = &state.monitor_task_handle {
640                monitor_task_handle.abort();
641            }
642        }
643
644        for (actor_id, state) in self.actor_states.drain() {
645            tracing::debug!("join actor {}", actor_id);
646            let result = state.join_handle.await;
647            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
648        }
649    }
650}
651
652impl InflightActorState {
653    pub(super) fn register_barrier_sender(
654        &mut self,
655        tx: mpsc::UnboundedSender<Barrier>,
656    ) -> StreamResult<()> {
657        match &self.status {
658            InflightActorStatus::IssuedFirst(pending_barriers) => {
659                for barrier in pending_barriers {
660                    tx.send(barrier.clone()).map_err(|_| {
661                        StreamError::barrier_send(
662                            barrier.clone(),
663                            self.actor_id,
664                            "failed to send pending barriers to newly registered sender",
665                        )
666                    })?;
667                }
668                self.barrier_senders.push(tx);
669            }
670            InflightActorStatus::Running(_) => {
671                unreachable!("should not register barrier sender when entering Running status")
672            }
673        }
674        Ok(())
675    }
676}
677
678impl DatabaseManagedBarrierState {
679    pub(super) fn register_barrier_sender(
680        &mut self,
681        actor_id: ActorId,
682        tx: mpsc::UnboundedSender<Barrier>,
683    ) -> StreamResult<()> {
684        self.actor_states
685            .get_mut(&actor_id)
686            .expect("should exist")
687            .register_barrier_sender(tx)
688    }
689}
690
691impl DatabaseManagedBarrierState {
692    pub(super) fn transform_to_issued(
693        &mut self,
694        barrier: &Barrier,
695        request: InjectBarrierRequest,
696    ) -> StreamResult<()> {
697        let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
698        let actor_to_stop = barrier.all_stop_actors();
699        let is_stop_actor = |actor_id| {
700            actor_to_stop
701                .map(|actors| actors.contains(&actor_id))
702                .unwrap_or(false)
703        };
704        let graph_state = self
705            .graph_states
706            .get_mut(&partial_graph_id)
707            .expect("should exist");
708
709        let table_ids =
710            HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
711        self.table_ids.extend(table_ids.iter().cloned());
712
713        graph_state.transform_to_issued(
714            barrier,
715            request.actor_ids_to_collect.iter().cloned(),
716            table_ids,
717        );
718
719        let mut new_actors = HashSet::new();
720        for (node, fragment_id, actor) in
721            request
722                .actors_to_build
723                .into_iter()
724                .flat_map(|fragment_actors| {
725                    let node = Arc::new(fragment_actors.node.unwrap());
726                    fragment_actors
727                        .actors
728                        .into_iter()
729                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
730                })
731        {
732            let actor_id = actor.actor_id;
733            assert!(!is_stop_actor(actor_id));
734            assert!(new_actors.insert(actor_id));
735            assert!(request.actor_ids_to_collect.contains(&actor_id));
736            let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
737            if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
738            {
739                for request in pending_requests {
740                    let _ = new_output_request_tx.send(request);
741                }
742            }
743            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
744                actor,
745                fragment_id,
746                node,
747                self.local_barrier_manager.clone(),
748                new_output_request_rx,
749            );
750            assert!(
751                self.actor_states
752                    .try_insert(
753                        actor_id,
754                        InflightActorState::start(
755                            actor_id,
756                            partial_graph_id,
757                            barrier,
758                            new_output_request_tx,
759                            join_handle,
760                            monitor_join_handle
761                        )
762                    )
763                    .is_ok()
764            );
765        }
766
767        // Spawn a trivial join handle to be compatible with the unit test. In the unit tests that involve local barrier manager,
768        // actors are spawned in the local test logic, but we assume that there is an entry for each spawned actor in ·actor_states`,
769        // so under cfg!(test) we add a dummy entry for each new actor.
770        if cfg!(test) {
771            for actor_id in &request.actor_ids_to_collect {
772                if !self.actor_states.contains_key(actor_id) {
773                    let (tx, rx) = unbounded_channel();
774                    let join_handle = self.actor_manager.runtime.spawn(async move {
775                        // The rx is spawned so that tx.send() will not fail.
776                        let _ = rx;
777                        pending().await
778                    });
779                    assert!(
780                        self.actor_states
781                            .try_insert(
782                                *actor_id,
783                                InflightActorState::start(
784                                    *actor_id,
785                                    partial_graph_id,
786                                    barrier,
787                                    tx,
788                                    join_handle,
789                                    None,
790                                )
791                            )
792                            .is_ok()
793                    );
794                    new_actors.insert(*actor_id);
795                }
796            }
797        }
798
799        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
800        // we call `start_epoch` on the graph before the actors receive the barrier
801        for actor_id in &request.actor_ids_to_collect {
802            if new_actors.contains(actor_id) {
803                continue;
804            }
805            self.actor_states
806                .get_mut(actor_id)
807                .unwrap_or_else(|| {
808                    panic!(
809                        "should exist: {} {:?}",
810                        actor_id, request.actor_ids_to_collect
811                    );
812                })
813                .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
814        }
815
816        Ok(())
817    }
818
819    pub(super) fn new_actor_remote_output_request(
820        &mut self,
821        actor_id: ActorId,
822        upstream_actor_id: ActorId,
823        result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
824    ) {
825        let (tx, rx) = channel_from_config(self.local_barrier_manager.env.config());
826        self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
827        let _ = result_sender.send(Ok(rx));
828    }
829
830    pub(super) fn new_actor_output_request(
831        &mut self,
832        actor_id: ActorId,
833        upstream_actor_id: ActorId,
834        request: NewOutputRequest,
835    ) {
836        if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
837            let _ = actor.new_output_request_tx.send((actor_id, request));
838        } else {
839            self.actor_pending_new_output_requests
840                .entry(upstream_actor_id)
841                .or_default()
842                .push((actor_id, request));
843        }
844    }
845
846    /// Handles [`LocalBarrierEvent`] from [`crate::task::barrier_manager::LocalBarrierManager`].
847    pub(super) fn poll_next_event(
848        &mut self,
849        cx: &mut Context<'_>,
850    ) -> Poll<ManagedBarrierStateEvent> {
851        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
852            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
853            return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
854        }
855        // yield some pending collected epochs
856        for (partial_graph_id, graph_state) in &mut self.graph_states {
857            if let Some(barrier) = graph_state.may_have_collected_all() {
858                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
859                    partial_graph_id: *partial_graph_id,
860                    barrier,
861                });
862            }
863        }
864        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
865            match event.expect("non-empty when tx in local_barrier_manager") {
866                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
867                    if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
868                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
869                            partial_graph_id,
870                            barrier,
871                        });
872                    }
873                }
874                LocalBarrierEvent::ReportCreateProgress {
875                    epoch,
876                    actor,
877                    state,
878                } => {
879                    self.update_create_mview_progress(epoch, actor, state);
880                }
881                LocalBarrierEvent::ReportSourceListFinished {
882                    epoch,
883                    actor_id,
884                    table_id,
885                    associated_source_id,
886                } => {
887                    self.report_source_list_finished(
888                        epoch,
889                        actor_id,
890                        table_id,
891                        associated_source_id,
892                    );
893                }
894                LocalBarrierEvent::ReportSourceLoadFinished {
895                    epoch,
896                    actor_id,
897                    table_id,
898                    associated_source_id,
899                } => {
900                    self.report_source_load_finished(
901                        epoch,
902                        actor_id,
903                        table_id,
904                        associated_source_id,
905                    );
906                }
907                LocalBarrierEvent::RefreshFinished {
908                    epoch,
909                    actor_id,
910                    table_id,
911                    staging_table_id,
912                } => {
913                    self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
914                }
915                LocalBarrierEvent::RegisterBarrierSender {
916                    actor_id,
917                    barrier_sender,
918                } => {
919                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
920                        return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
921                    }
922                }
923                LocalBarrierEvent::RegisterLocalUpstreamOutput {
924                    actor_id,
925                    upstream_actor_id,
926                    tx,
927                } => {
928                    self.new_actor_output_request(
929                        actor_id,
930                        upstream_actor_id,
931                        NewOutputRequest::Local(tx),
932                    );
933                }
934                LocalBarrierEvent::ReportCdcTableBackfillProgress {
935                    actor_id,
936                    epoch,
937                    state,
938                } => {
939                    self.update_cdc_table_backfill_progress(epoch, actor_id, state);
940                }
941            }
942        }
943
944        debug_assert!(
945            self.graph_states
946                .values_mut()
947                .all(|graph_state| graph_state.may_have_collected_all().is_none())
948        );
949        Poll::Pending
950    }
951}
952
953impl DatabaseManagedBarrierState {
954    #[must_use]
955    pub(super) fn collect(
956        &mut self,
957        actor_id: ActorId,
958        epoch: EpochPair,
959    ) -> Option<(PartialGraphId, Barrier)> {
960        let (prev_partial_graph_id, is_finished) = self
961            .actor_states
962            .get_mut(&actor_id)
963            .expect("should exist")
964            .collect(epoch);
965        if is_finished {
966            let state = self.actor_states.remove(&actor_id).expect("should exist");
967            if let Some(monitor_task_handle) = state.monitor_task_handle {
968                monitor_task_handle.abort();
969            }
970        }
971        let prev_graph_state = self
972            .graph_states
973            .get_mut(&prev_partial_graph_id)
974            .expect("should exist");
975        prev_graph_state.collect(actor_id, epoch);
976        prev_graph_state
977            .may_have_collected_all()
978            .map(|barrier| (prev_partial_graph_id, barrier))
979    }
980
981    #[allow(clippy::type_complexity)]
982    pub(super) fn pop_barrier_to_complete(
983        &mut self,
984        partial_graph_id: PartialGraphId,
985        prev_epoch: u64,
986    ) -> BarrierToComplete {
987        self.graph_states
988            .get_mut(&partial_graph_id)
989            .expect("should exist")
990            .pop_barrier_to_complete(prev_epoch)
991    }
992
993    /// Collect actor errors for a while and find the one that might be the root cause.
994    ///
995    /// Returns `None` if there's no actor error received.
996    async fn try_find_root_actor_failure(
997        &mut self,
998        first_failure: Option<(Option<ActorId>, StreamError)>,
999    ) -> Option<ScoredStreamError> {
1000        let mut later_errs = vec![];
1001        // fetch more actor errors within a timeout
1002        let _ = tokio::time::timeout(Duration::from_secs(3), async {
1003            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1004            if let Some((Some(failed_actor), _)) = &first_failure {
1005                uncollected_actors.remove(failed_actor);
1006            }
1007            while !uncollected_actors.is_empty()
1008                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1009            {
1010                uncollected_actors.remove(&actor_id);
1011                later_errs.push(error);
1012            }
1013        })
1014        .await;
1015
1016        first_failure
1017            .into_iter()
1018            .map(|(_, err)| err)
1019            .chain(later_errs.into_iter())
1020            .map(|e| e.with_score())
1021            .max_by_key(|e| e.score)
1022    }
1023
1024    /// Report that a source has finished listing for a specific epoch
1025    pub(super) fn report_source_list_finished(
1026        &mut self,
1027        epoch: EpochPair,
1028        actor_id: ActorId,
1029        _table_id: u32,
1030        associated_source_id: u32,
1031    ) {
1032        // Find the correct partial graph state by matching the actor's partial graph id
1033        if let Some(actor_state) = self.actor_states.get(&actor_id)
1034            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1035            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1036        {
1037            graph_state
1038                .list_finished_source_ids
1039                .entry(epoch.curr)
1040                .or_default()
1041                .insert(associated_source_id);
1042        } else {
1043            warn!(
1044                ?epoch,
1045                actor_id, associated_source_id, "ignore source list finished"
1046            );
1047        }
1048    }
1049
1050    /// Report that a source has finished loading for a specific epoch
1051    pub(super) fn report_source_load_finished(
1052        &mut self,
1053        epoch: EpochPair,
1054        actor_id: ActorId,
1055        _table_id: u32,
1056        associated_source_id: u32,
1057    ) {
1058        // Find the correct partial graph state by matching the actor's partial graph id
1059        if let Some(actor_state) = self.actor_states.get(&actor_id)
1060            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1061            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1062        {
1063            graph_state
1064                .load_finished_source_ids
1065                .entry(epoch.curr)
1066                .or_default()
1067                .insert(associated_source_id);
1068        } else {
1069            warn!(
1070                ?epoch,
1071                actor_id, associated_source_id, "ignore source load finished"
1072            );
1073        }
1074    }
1075
1076    /// Report that a table has finished refreshing for a specific epoch
1077    pub(super) fn report_refresh_finished(
1078        &mut self,
1079        epoch: EpochPair,
1080        actor_id: ActorId,
1081        table_id: u32,
1082        staging_table_id: u32,
1083    ) {
1084        // Find the correct partial graph state by matching the actor's partial graph id
1085        let Some(actor_state) = self.actor_states.get(&actor_id) else {
1086            warn!(
1087                ?epoch,
1088                actor_id, table_id, "ignore refresh finished table: actor_state not found"
1089            );
1090            return;
1091        };
1092        let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) else {
1093            let inflight_barriers = actor_state.inflight_barriers.keys().collect::<Vec<_>>();
1094            warn!(
1095                ?epoch,
1096                actor_id,
1097                table_id,
1098                ?inflight_barriers,
1099                "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1100            );
1101            return;
1102        };
1103        let Some(graph_state) = self.graph_states.get_mut(partial_graph_id) else {
1104            warn!(
1105                ?epoch,
1106                actor_id, table_id, "ignore refresh finished table: graph_state not found"
1107            );
1108            return;
1109        };
1110        graph_state
1111            .refresh_finished_tables
1112            .entry(epoch.curr)
1113            .or_default()
1114            .insert(table_id);
1115        graph_state
1116            .truncate_tables
1117            .entry(epoch.curr)
1118            .or_default()
1119            .insert(staging_table_id);
1120    }
1121}
1122
1123impl PartialGraphManagedBarrierState {
1124    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
1125    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
1126    /// has been collected from all actors for an `Issued` barrier.
1127    fn may_have_collected_all(&mut self) -> Option<Barrier> {
1128        for barrier_state in self.epoch_barrier_state_map.values_mut() {
1129            match &barrier_state.inner {
1130                ManagedBarrierStateInner::Issued(IssuedState {
1131                    remaining_actors, ..
1132                }) if remaining_actors.is_empty() => {}
1133                ManagedBarrierStateInner::AllCollected { .. } => {
1134                    continue;
1135                }
1136                ManagedBarrierStateInner::Issued(_) => {
1137                    break;
1138                }
1139            }
1140
1141            self.streaming_metrics.barrier_manager_progress.inc();
1142
1143            let create_mview_progress = self
1144                .create_mview_progress
1145                .remove(&barrier_state.barrier.epoch.curr)
1146                .unwrap_or_default()
1147                .into_iter()
1148                .map(|(actor, state)| state.to_pb(actor))
1149                .collect();
1150
1151            let list_finished_source_ids = self
1152                .list_finished_source_ids
1153                .remove(&barrier_state.barrier.epoch.curr)
1154                .unwrap_or_default()
1155                .into_iter()
1156                .collect();
1157
1158            let load_finished_source_ids = self
1159                .load_finished_source_ids
1160                .remove(&barrier_state.barrier.epoch.curr)
1161                .unwrap_or_default()
1162                .into_iter()
1163                .collect();
1164
1165            let cdc_table_backfill_progress = self
1166                .cdc_table_backfill_progress
1167                .remove(&barrier_state.barrier.epoch.curr)
1168                .unwrap_or_default()
1169                .into_iter()
1170                .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1171                .collect();
1172
1173            let truncate_tables = self
1174                .truncate_tables
1175                .remove(&barrier_state.barrier.epoch.curr)
1176                .unwrap_or_default()
1177                .into_iter()
1178                .collect();
1179
1180            let refresh_finished_tables = self
1181                .refresh_finished_tables
1182                .remove(&barrier_state.barrier.epoch.curr)
1183                .unwrap_or_default()
1184                .into_iter()
1185                .collect();
1186            let prev_state = replace(
1187                &mut barrier_state.inner,
1188                ManagedBarrierStateInner::AllCollected {
1189                    create_mview_progress,
1190                    list_finished_source_ids,
1191                    load_finished_source_ids,
1192                    truncate_tables,
1193                    refresh_finished_tables,
1194                    cdc_table_backfill_progress,
1195                },
1196            );
1197
1198            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1199                barrier_inflight_latency: timer,
1200                ..
1201            }) => {
1202                timer.observe_duration();
1203            });
1204
1205            return Some(barrier_state.barrier.clone());
1206        }
1207        None
1208    }
1209
1210    fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1211        let (popped_prev_epoch, barrier_state) = self
1212            .epoch_barrier_state_map
1213            .pop_first()
1214            .expect("should exist");
1215
1216        assert_eq!(prev_epoch, popped_prev_epoch);
1217
1218        let (
1219            create_mview_progress,
1220            list_finished_source_ids,
1221            load_finished_source_ids,
1222            cdc_table_backfill_progress,
1223            truncate_tables,
1224            refresh_finished_tables,
1225        ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1226            create_mview_progress,
1227            list_finished_source_ids,
1228            load_finished_source_ids,
1229            truncate_tables,
1230            refresh_finished_tables,
1231            cdc_table_backfill_progress,
1232        } => {
1233            (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, truncate_tables, refresh_finished_tables)
1234        });
1235        BarrierToComplete {
1236            barrier: barrier_state.barrier,
1237            table_ids: barrier_state.table_ids,
1238            create_mview_progress,
1239            list_finished_source_ids,
1240            load_finished_source_ids,
1241            truncate_tables,
1242            refresh_finished_tables,
1243            cdc_table_backfill_progress,
1244        }
1245    }
1246}
1247
1248pub(crate) struct BarrierToComplete {
1249    pub barrier: Barrier,
1250    pub table_ids: Option<HashSet<TableId>>,
1251    pub create_mview_progress: Vec<PbCreateMviewProgress>,
1252    pub list_finished_source_ids: Vec<u32>,
1253    pub load_finished_source_ids: Vec<u32>,
1254    pub truncate_tables: Vec<u32>,
1255    pub refresh_finished_tables: Vec<u32>,
1256    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1257}
1258
1259impl PartialGraphManagedBarrierState {
1260    /// Collect a `barrier` from the actor with `actor_id`.
1261    pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1262        tracing::debug!(
1263            target: "events::stream::barrier::manager::collect",
1264            ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1265            "collect_barrier",
1266        );
1267
1268        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1269            None => {
1270                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1271                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1272                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1273                panic!(
1274                    "cannot collect new actor barrier {:?} at current state: None",
1275                    epoch,
1276                )
1277            }
1278            Some(&mut BarrierState {
1279                ref barrier,
1280                inner:
1281                    ManagedBarrierStateInner::Issued(IssuedState {
1282                        ref mut remaining_actors,
1283                        ..
1284                    }),
1285                ..
1286            }) => {
1287                let exist = remaining_actors.remove(&actor_id);
1288                assert!(
1289                    exist,
1290                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1291                    actor_id, epoch.curr
1292                );
1293                assert_eq!(barrier.epoch.curr, epoch.curr);
1294            }
1295            Some(BarrierState { inner, .. }) => {
1296                panic!(
1297                    "cannot collect new actor barrier {:?} at current state: {:?}",
1298                    epoch, inner
1299                )
1300            }
1301        }
1302    }
1303
1304    /// When the meta service issues a `send_barrier` request, call this function to transform to
1305    /// `Issued` and start to collect or to notify.
1306    pub(super) fn transform_to_issued(
1307        &mut self,
1308        barrier: &Barrier,
1309        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1310        table_ids: HashSet<TableId>,
1311    ) {
1312        let timer = self
1313            .streaming_metrics
1314            .barrier_inflight_latency
1315            .start_timer();
1316
1317        if let Some(hummock) = self.state_store.as_hummock() {
1318            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1319        }
1320
1321        let table_ids = match barrier.kind {
1322            BarrierKind::Unspecified => {
1323                unreachable!()
1324            }
1325            BarrierKind::Initial => {
1326                assert!(
1327                    self.prev_barrier_table_ids.is_none(),
1328                    "non empty table_ids at initial barrier: {:?}",
1329                    self.prev_barrier_table_ids
1330                );
1331                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1332                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1333                None
1334            }
1335            BarrierKind::Barrier => {
1336                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1337                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1338                    assert_eq!(prev_table_ids, &table_ids);
1339                    *prev_epoch = barrier.epoch;
1340                } else {
1341                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1342                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1343                }
1344                None
1345            }
1346            BarrierKind::Checkpoint => Some(
1347                if let Some((prev_epoch, prev_table_ids)) = self
1348                    .prev_barrier_table_ids
1349                    .replace((barrier.epoch, table_ids))
1350                    && prev_epoch.curr == barrier.epoch.prev
1351                {
1352                    prev_table_ids
1353                } else {
1354                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1355                    HashSet::new()
1356                },
1357            ),
1358        };
1359
1360        if let Some(&mut BarrierState { ref inner, .. }) =
1361            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1362        {
1363            {
1364                panic!(
1365                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1366                    barrier.epoch, inner
1367                );
1368            }
1369        };
1370
1371        self.epoch_barrier_state_map.insert(
1372            barrier.epoch.prev,
1373            BarrierState {
1374                barrier: barrier.clone(),
1375                inner: ManagedBarrierStateInner::Issued(IssuedState {
1376                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1377                    barrier_inflight_latency: timer,
1378                }),
1379                table_ids,
1380            },
1381        );
1382    }
1383
1384    #[cfg(test)]
1385    async fn pop_next_completed_epoch(&mut self) -> u64 {
1386        if let Some(barrier) = self.may_have_collected_all() {
1387            self.pop_barrier_to_complete(barrier.epoch.prev);
1388            return barrier.epoch.prev;
1389        }
1390        pending().await
1391    }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396    use std::collections::HashSet;
1397
1398    use risingwave_common::util::epoch::test_epoch;
1399
1400    use crate::executor::Barrier;
1401    use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1402
1403    #[tokio::test]
1404    async fn test_managed_state_add_actor() {
1405        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1406        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1407        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1408        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1409        let actor_ids_to_collect1 = HashSet::from([1, 2]);
1410        let actor_ids_to_collect2 = HashSet::from([1, 2]);
1411        let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1412        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1413        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1414        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1415        managed_barrier_state.collect(1, barrier1.epoch);
1416        managed_barrier_state.collect(2, barrier1.epoch);
1417        assert_eq!(
1418            managed_barrier_state.pop_next_completed_epoch().await,
1419            test_epoch(0)
1420        );
1421        assert_eq!(
1422            managed_barrier_state
1423                .epoch_barrier_state_map
1424                .first_key_value()
1425                .unwrap()
1426                .0,
1427            &test_epoch(1)
1428        );
1429        managed_barrier_state.collect(1, barrier2.epoch);
1430        managed_barrier_state.collect(1, barrier3.epoch);
1431        managed_barrier_state.collect(2, barrier2.epoch);
1432        assert_eq!(
1433            managed_barrier_state.pop_next_completed_epoch().await,
1434            test_epoch(1)
1435        );
1436        assert_eq!(
1437            managed_barrier_state
1438                .epoch_barrier_state_map
1439                .first_key_value()
1440                .unwrap()
1441                .0,
1442            &test_epoch(2)
1443        );
1444        managed_barrier_state.collect(2, barrier3.epoch);
1445        managed_barrier_state.collect(3, barrier3.epoch);
1446        assert_eq!(
1447            managed_barrier_state.pop_next_completed_epoch().await,
1448            test_epoch(2)
1449        );
1450        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1451    }
1452
1453    #[tokio::test]
1454    async fn test_managed_state_stop_actor() {
1455        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1456        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1457        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1458        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1459        let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1460        let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1461        let actor_ids_to_collect3 = HashSet::from([1, 2]);
1462        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1463        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1464        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1465
1466        managed_barrier_state.collect(1, barrier1.epoch);
1467        managed_barrier_state.collect(1, barrier2.epoch);
1468        managed_barrier_state.collect(1, barrier3.epoch);
1469        managed_barrier_state.collect(2, barrier1.epoch);
1470        managed_barrier_state.collect(2, barrier2.epoch);
1471        managed_barrier_state.collect(2, barrier3.epoch);
1472        assert_eq!(
1473            managed_barrier_state
1474                .epoch_barrier_state_map
1475                .first_key_value()
1476                .unwrap()
1477                .0,
1478            &0
1479        );
1480        managed_barrier_state.collect(3, barrier1.epoch);
1481        managed_barrier_state.collect(3, barrier2.epoch);
1482        assert_eq!(
1483            managed_barrier_state
1484                .epoch_barrier_state_map
1485                .first_key_value()
1486                .unwrap()
1487                .0,
1488            &0
1489        );
1490        managed_barrier_state.collect(4, barrier1.epoch);
1491        assert_eq!(
1492            managed_barrier_state.pop_next_completed_epoch().await,
1493            test_epoch(0)
1494        );
1495        assert_eq!(
1496            managed_barrier_state.pop_next_completed_epoch().await,
1497            test_epoch(1)
1498        );
1499        assert_eq!(
1500            managed_barrier_state.pop_next_completed_epoch().await,
1501            test_epoch(2)
1502        );
1503        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1504    }
1505}