risingwave_stream/task/barrier_worker/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::LazyCell;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::fmt::{Debug, Display, Formatter};
18use std::future::{Future, pending, poll_fn};
19use std::mem::replace;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use futures::FutureExt;
26use futures::stream::FuturesOrdered;
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::{DatabaseId, TableId};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
32use risingwave_storage::StateStoreImpl;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::{mpsc, oneshot};
35use tokio::task::JoinHandle;
36
37use crate::error::{StreamError, StreamResult};
38use crate::executor::Barrier;
39use crate::executor::monitor::StreamingMetrics;
40use crate::task::progress::BackfillState;
41use crate::task::{
42    ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
43    StreamActorManager, UpDownActorIds,
44};
45
46struct IssuedState {
47    /// Actor ids remaining to be collected.
48    pub remaining_actors: BTreeSet<ActorId>,
49
50    pub barrier_inflight_latency: HistogramTimer,
51}
52
53impl Debug for IssuedState {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("IssuedState")
56            .field("remaining_actors", &self.remaining_actors)
57            .finish()
58    }
59}
60
61/// The state machine of local barrier manager.
62#[derive(Debug)]
63enum ManagedBarrierStateInner {
64    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
65    Issued(IssuedState),
66
67    /// The barrier has been collected by all remaining actors
68    AllCollected {
69        create_mview_progress: Vec<PbCreateMviewProgress>,
70        load_finished_source_ids: Vec<u32>,
71        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
72    },
73}
74
75#[derive(Debug)]
76struct BarrierState {
77    barrier: Barrier,
78    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
79    table_ids: Option<HashSet<TableId>>,
80    inner: ManagedBarrierStateInner,
81}
82
83use risingwave_common::must_match;
84use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
85use risingwave_pb::stream_service::InjectBarrierRequest;
86use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
87use risingwave_pb::stream_service::streaming_control_stream_request::{
88    DatabaseInitialPartialGraph, InitialPartialGraph,
89};
90
91use crate::executor::exchange::permit;
92use crate::executor::exchange::permit::channel_from_config;
93use crate::task::barrier_worker::ScoredStreamError;
94use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
95use crate::task::cdc_progress::CdcTableBackfillState;
96
97pub(super) struct ManagedBarrierStateDebugInfo<'a> {
98    running_actors: BTreeSet<ActorId>,
99    graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
100}
101
102impl Display for ManagedBarrierStateDebugInfo<'_> {
103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104        write!(f, "running_actors: ")?;
105        for actor_id in &self.running_actors {
106            write!(f, "{}, ", actor_id)?;
107        }
108        for (partial_graph_id, graph_states) in self.graph_states {
109            writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
110            write!(f, "{}", graph_states)?;
111        }
112        Ok(())
113    }
114}
115
116impl Display for &'_ PartialGraphManagedBarrierState {
117    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118        let mut prev_epoch = 0u64;
119        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
120            write!(f, "> Epoch {}: ", epoch)?;
121            match &barrier_state.inner {
122                ManagedBarrierStateInner::Issued(state) => {
123                    write!(
124                        f,
125                        "Issued [{:?}]. Remaining actors: [",
126                        barrier_state.barrier.kind
127                    )?;
128                    let mut is_prev_epoch_issued = false;
129                    if prev_epoch != 0 {
130                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
131                        if let ManagedBarrierStateInner::Issued(IssuedState {
132                            remaining_actors: remaining_actors_prev,
133                            ..
134                        }) = &bs.inner
135                        {
136                            // Only show the actors that are not in the previous epoch.
137                            is_prev_epoch_issued = true;
138                            let mut duplicates = 0usize;
139                            for actor_id in &state.remaining_actors {
140                                if !remaining_actors_prev.contains(actor_id) {
141                                    write!(f, "{}, ", actor_id)?;
142                                } else {
143                                    duplicates += 1;
144                                }
145                            }
146                            if duplicates > 0 {
147                                write!(f, "...and {} actors in prev epoch", duplicates)?;
148                            }
149                        }
150                    }
151                    if !is_prev_epoch_issued {
152                        for actor_id in &state.remaining_actors {
153                            write!(f, "{}, ", actor_id)?;
154                        }
155                    }
156                    write!(f, "]")?;
157                }
158                ManagedBarrierStateInner::AllCollected { .. } => {
159                    write!(f, "AllCollected")?;
160                }
161            }
162            prev_epoch = *epoch;
163            writeln!(f)?;
164        }
165
166        if !self.create_mview_progress.is_empty() {
167            writeln!(f, "Create MView Progress:")?;
168            for (epoch, progress) in &self.create_mview_progress {
169                write!(f, "> Epoch {}:", epoch)?;
170                for (actor_id, state) in progress {
171                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
172                }
173            }
174        }
175
176        Ok(())
177    }
178}
179
180enum InflightActorStatus {
181    /// The actor has been issued some barriers, but has not collected the first barrier
182    IssuedFirst(Vec<Barrier>),
183    /// The actor has been issued some barriers, and has collected the first barrier
184    Running(u64),
185}
186
187impl InflightActorStatus {
188    fn max_issued_epoch(&self) -> u64 {
189        match self {
190            InflightActorStatus::Running(epoch) => *epoch,
191            InflightActorStatus::IssuedFirst(issued_barriers) => {
192                issued_barriers.last().expect("non-empty").epoch.prev
193            }
194        }
195    }
196}
197
198pub(crate) struct InflightActorState {
199    actor_id: ActorId,
200    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
201    /// `prev_epoch` -> partial graph id
202    pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
203    status: InflightActorStatus,
204    /// Whether the actor has been issued a stop barrier
205    is_stopping: bool,
206
207    new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
208    join_handle: JoinHandle<()>,
209    monitor_task_handle: Option<JoinHandle<()>>,
210}
211
212impl InflightActorState {
213    pub(super) fn start(
214        actor_id: ActorId,
215        initial_partial_graph_id: PartialGraphId,
216        initial_barrier: &Barrier,
217        new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
218        join_handle: JoinHandle<()>,
219        monitor_task_handle: Option<JoinHandle<()>>,
220    ) -> Self {
221        Self {
222            actor_id,
223            barrier_senders: vec![],
224            inflight_barriers: BTreeMap::from_iter([(
225                initial_barrier.epoch.prev,
226                initial_partial_graph_id,
227            )]),
228            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
229            is_stopping: false,
230            new_output_request_tx,
231            join_handle,
232            monitor_task_handle,
233        }
234    }
235
236    pub(super) fn issue_barrier(
237        &mut self,
238        partial_graph_id: PartialGraphId,
239        barrier: &Barrier,
240        is_stop: bool,
241    ) -> StreamResult<()> {
242        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
243
244        for barrier_sender in &self.barrier_senders {
245            barrier_sender.send(barrier.clone()).map_err(|_| {
246                StreamError::barrier_send(
247                    barrier.clone(),
248                    self.actor_id,
249                    "failed to send to registered sender",
250                )
251            })?;
252        }
253
254        assert!(
255            self.inflight_barriers
256                .insert(barrier.epoch.prev, partial_graph_id)
257                .is_none()
258        );
259
260        match &mut self.status {
261            InflightActorStatus::IssuedFirst(pending_barriers) => {
262                pending_barriers.push(barrier.clone());
263            }
264            InflightActorStatus::Running(prev_epoch) => {
265                *prev_epoch = barrier.epoch.prev;
266            }
267        };
268
269        if is_stop {
270            assert!(!self.is_stopping, "stopped actor should not issue barrier");
271            self.is_stopping = true;
272        }
273        Ok(())
274    }
275
276    pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
277        let (prev_epoch, prev_partial_graph_id) =
278            self.inflight_barriers.pop_first().expect("should exist");
279        assert_eq!(prev_epoch, epoch.prev);
280        match &self.status {
281            InflightActorStatus::IssuedFirst(pending_barriers) => {
282                assert_eq!(
283                    prev_epoch,
284                    pending_barriers.first().expect("non-empty").epoch.prev
285                );
286                self.status = InflightActorStatus::Running(
287                    pending_barriers.last().expect("non-empty").epoch.prev,
288                );
289            }
290            InflightActorStatus::Running(_) => {}
291        }
292        (
293            prev_partial_graph_id,
294            self.inflight_barriers.is_empty() && self.is_stopping,
295        )
296    }
297}
298
299/// Part of [`DatabaseManagedBarrierState`]
300pub(crate) struct PartialGraphManagedBarrierState {
301    /// Record barrier state for each epoch of concurrent checkpoints.
302    ///
303    /// The key is `prev_epoch`, and the first value is `curr_epoch`
304    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
305
306    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
307
308    mv_depended_subscriptions: HashMap<TableId, HashSet<u32>>,
309
310    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
311    ///
312    /// The process of progress reporting is as follows:
313    /// 1. updated by [`crate::task::barrier_manager::CreateMviewProgressReporter::update`]
314    /// 2. converted to [`ManagedBarrierStateInner`] in [`Self::may_have_collected_all`]
315    /// 3. handled by [`Self::pop_barrier_to_complete`]
316    /// 4. put in [`crate::task::barrier_worker::BarrierCompleteResult`] and reported to meta.
317    pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
318
319    /// Record the source load finished reports for each epoch of concurrent checkpoints.
320    /// Used for refreshable batch source.
321    pub(crate) load_finished_source_ids: HashMap<u64, HashSet<u32>>,
322
323    pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
324
325    state_store: StateStoreImpl,
326
327    streaming_metrics: Arc<StreamingMetrics>,
328}
329
330impl PartialGraphManagedBarrierState {
331    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
332        Self::new_inner(
333            actor_manager.env.state_store(),
334            actor_manager.streaming_metrics.clone(),
335        )
336    }
337
338    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
339        Self {
340            epoch_barrier_state_map: Default::default(),
341            prev_barrier_table_ids: None,
342            mv_depended_subscriptions: Default::default(),
343            create_mview_progress: Default::default(),
344            load_finished_source_ids: Default::default(),
345            cdc_table_backfill_progress: Default::default(),
346            state_store,
347            streaming_metrics,
348        }
349    }
350
351    #[cfg(test)]
352    pub(crate) fn for_test() -> Self {
353        Self::new_inner(
354            StateStoreImpl::for_test(),
355            Arc::new(StreamingMetrics::unused()),
356        )
357    }
358
359    pub(super) fn is_empty(&self) -> bool {
360        self.epoch_barrier_state_map.is_empty()
361    }
362}
363
364pub(crate) struct SuspendedDatabaseState {
365    pub(super) suspend_time: Instant,
366    inner: DatabaseManagedBarrierState,
367    failure: Option<(Option<ActorId>, StreamError)>,
368}
369
370impl SuspendedDatabaseState {
371    fn new(
372        state: DatabaseManagedBarrierState,
373        failure: Option<(Option<ActorId>, StreamError)>,
374        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
375    ) -> Self {
376        Self {
377            suspend_time: Instant::now(),
378            inner: state,
379            failure,
380        }
381    }
382
383    async fn reset(mut self) -> ResetDatabaseOutput {
384        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
385        self.inner.abort_and_wait_actors().await;
386        if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
387            hummock.clear_tables(self.inner.table_ids).await;
388        }
389        ResetDatabaseOutput { root_err }
390    }
391}
392
393pub(crate) struct ResettingDatabaseState {
394    join_handle: JoinHandle<ResetDatabaseOutput>,
395    reset_request_id: u32,
396}
397
398pub(crate) struct ResetDatabaseOutput {
399    pub(crate) root_err: Option<ScoredStreamError>,
400}
401
402pub(crate) enum DatabaseStatus {
403    ReceivedExchangeRequest(
404        Vec<(
405            UpDownActorIds,
406            oneshot::Sender<StreamResult<permit::Receiver>>,
407        )>,
408    ),
409    Running(DatabaseManagedBarrierState),
410    Suspended(SuspendedDatabaseState),
411    Resetting(ResettingDatabaseState),
412    /// temporary place holder
413    Unspecified,
414}
415
416impl DatabaseStatus {
417    pub(crate) async fn abort(&mut self) {
418        match self {
419            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
420                for (_, sender) in pending_requests.drain(..) {
421                    let _ = sender.send(Err(anyhow!("database aborted").into()));
422                }
423            }
424            DatabaseStatus::Running(state) => {
425                state.abort_and_wait_actors().await;
426            }
427            DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
428                state.abort_and_wait_actors().await;
429            }
430            DatabaseStatus::Resetting(state) => {
431                (&mut state.join_handle)
432                    .await
433                    .expect("failed to join reset database join handle");
434            }
435            DatabaseStatus::Unspecified => {
436                unreachable!()
437            }
438        }
439    }
440
441    pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
442        match self {
443            DatabaseStatus::ReceivedExchangeRequest(_) => {
444                unreachable!("should not handle request")
445            }
446            DatabaseStatus::Running(state) => Some(state),
447            DatabaseStatus::Suspended(_) => None,
448            DatabaseStatus::Resetting(_) => {
449                unreachable!("should not receive further request during cleaning")
450            }
451            DatabaseStatus::Unspecified => {
452                unreachable!()
453            }
454        }
455    }
456
457    pub(super) fn poll_next_event(
458        &mut self,
459        cx: &mut Context<'_>,
460    ) -> Poll<ManagedBarrierStateEvent> {
461        match self {
462            DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
463            DatabaseStatus::Running(state) => state.poll_next_event(cx),
464            DatabaseStatus::Suspended(_) => Poll::Pending,
465            DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
466                let output = result.expect("should be able to join");
467                ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
468            }),
469            DatabaseStatus::Unspecified => {
470                unreachable!()
471            }
472        }
473    }
474
475    pub(super) fn suspend(
476        &mut self,
477        failed_actor: Option<ActorId>,
478        err: StreamError,
479        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
480    ) {
481        let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
482        *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
483            state,
484            Some((failed_actor, err)),
485            completing_futures,
486        ));
487    }
488
489    pub(super) fn start_reset(
490        &mut self,
491        database_id: DatabaseId,
492        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
493        reset_request_id: u32,
494    ) {
495        let join_handle = match replace(self, DatabaseStatus::Unspecified) {
496            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
497                for (_, sender) in pending_requests {
498                    let _ = sender.send(Err(anyhow!("database reset").into()));
499                }
500                tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
501            }
502            DatabaseStatus::Running(state) => {
503                assert_eq!(database_id, state.database_id);
504                info!(
505                    database_id = database_id.database_id,
506                    reset_request_id, "start database reset from Running"
507                );
508                tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
509            }
510            DatabaseStatus::Suspended(state) => {
511                assert!(
512                    completing_futures.is_none(),
513                    "should have been clear when suspended"
514                );
515                assert_eq!(database_id, state.inner.database_id);
516                info!(
517                    database_id = database_id.database_id,
518                    reset_request_id,
519                    suspend_elapsed = ?state.suspend_time.elapsed(),
520                    "start database reset after suspended"
521                );
522                tokio::spawn(state.reset())
523            }
524            DatabaseStatus::Resetting(state) => {
525                let prev_request_id = state.reset_request_id;
526                info!(
527                    database_id = database_id.database_id,
528                    reset_request_id, prev_request_id, "receive duplicate reset request"
529                );
530                assert!(reset_request_id > prev_request_id);
531                state.join_handle
532            }
533            DatabaseStatus::Unspecified => {
534                unreachable!()
535            }
536        };
537        *self = DatabaseStatus::Resetting(ResettingDatabaseState {
538            join_handle,
539            reset_request_id,
540        });
541    }
542}
543
544pub(crate) struct ManagedBarrierState {
545    pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
546}
547
548pub(super) enum ManagedBarrierStateEvent {
549    BarrierCollected {
550        partial_graph_id: PartialGraphId,
551        barrier: Barrier,
552    },
553    ActorError {
554        actor_id: ActorId,
555        err: StreamError,
556    },
557    DatabaseReset(ResetDatabaseOutput, u32),
558}
559
560impl ManagedBarrierState {
561    pub(super) fn new(
562        actor_manager: Arc<StreamActorManager>,
563        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
564        term_id: String,
565    ) -> Self {
566        let mut databases = HashMap::new();
567        for database in initial_partial_graphs {
568            let database_id = DatabaseId::new(database.database_id);
569            assert!(!databases.contains_key(&database_id));
570            let state = DatabaseManagedBarrierState::new(
571                database_id,
572                term_id.clone(),
573                actor_manager.clone(),
574                database.graphs,
575            );
576            databases.insert(database_id, DatabaseStatus::Running(state));
577        }
578
579        Self { databases }
580    }
581
582    pub(super) fn next_event(
583        &mut self,
584    ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
585        poll_fn(|cx| {
586            for (database_id, database) in &mut self.databases {
587                if let Poll::Ready(event) = database.poll_next_event(cx) {
588                    return Poll::Ready((*database_id, event));
589                }
590            }
591            Poll::Pending
592        })
593    }
594}
595
596/// Per-database barrier state manager. Handles barriers for one specific database.
597/// Part of [`ManagedBarrierState`] in [`super::LocalBarrierWorker`].
598///
599/// See [`crate::task`] for architecture overview.
600pub(crate) struct DatabaseManagedBarrierState {
601    database_id: DatabaseId,
602    pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
603    pub(super) actor_pending_new_output_requests:
604        HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
605
606    pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
607
608    table_ids: HashSet<TableId>,
609
610    actor_manager: Arc<StreamActorManager>,
611
612    pub(super) local_barrier_manager: LocalBarrierManager,
613
614    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
615    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
616}
617
618impl DatabaseManagedBarrierState {
619    /// Create a barrier manager state. This will be called only once.
620    pub(super) fn new(
621        database_id: DatabaseId,
622        term_id: String,
623        actor_manager: Arc<StreamActorManager>,
624        initial_partial_graphs: Vec<InitialPartialGraph>,
625    ) -> Self {
626        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
627            LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
628        Self {
629            database_id,
630            actor_states: Default::default(),
631            actor_pending_new_output_requests: Default::default(),
632            graph_states: initial_partial_graphs
633                .into_iter()
634                .map(|graph| {
635                    let mut state = PartialGraphManagedBarrierState::new(&actor_manager);
636                    state.add_subscriptions(graph.subscriptions);
637                    (PartialGraphId::new(graph.partial_graph_id), state)
638                })
639                .collect(),
640            table_ids: Default::default(),
641            actor_manager,
642            local_barrier_manager,
643            barrier_event_rx,
644            actor_failure_rx,
645        }
646    }
647
648    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
649        ManagedBarrierStateDebugInfo {
650            running_actors: self.actor_states.keys().cloned().collect(),
651            graph_states: &self.graph_states,
652        }
653    }
654
655    async fn abort_and_wait_actors(&mut self) {
656        for (actor_id, state) in &self.actor_states {
657            tracing::debug!("force stopping actor {}", actor_id);
658            state.join_handle.abort();
659            if let Some(monitor_task_handle) = &state.monitor_task_handle {
660                monitor_task_handle.abort();
661            }
662        }
663
664        for (actor_id, state) in self.actor_states.drain() {
665            tracing::debug!("join actor {}", actor_id);
666            let result = state.join_handle.await;
667            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
668        }
669    }
670}
671
672impl InflightActorState {
673    pub(super) fn register_barrier_sender(
674        &mut self,
675        tx: mpsc::UnboundedSender<Barrier>,
676    ) -> StreamResult<()> {
677        match &self.status {
678            InflightActorStatus::IssuedFirst(pending_barriers) => {
679                for barrier in pending_barriers {
680                    tx.send(barrier.clone()).map_err(|_| {
681                        StreamError::barrier_send(
682                            barrier.clone(),
683                            self.actor_id,
684                            "failed to send pending barriers to newly registered sender",
685                        )
686                    })?;
687                }
688                self.barrier_senders.push(tx);
689            }
690            InflightActorStatus::Running(_) => {
691                unreachable!("should not register barrier sender when entering Running status")
692            }
693        }
694        Ok(())
695    }
696}
697
698impl DatabaseManagedBarrierState {
699    pub(super) fn register_barrier_sender(
700        &mut self,
701        actor_id: ActorId,
702        tx: mpsc::UnboundedSender<Barrier>,
703    ) -> StreamResult<()> {
704        self.actor_states
705            .get_mut(&actor_id)
706            .expect("should exist")
707            .register_barrier_sender(tx)
708    }
709}
710
711impl PartialGraphManagedBarrierState {
712    pub(super) fn add_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
713        for subscription_to_add in subscriptions {
714            if !self
715                .mv_depended_subscriptions
716                .entry(TableId::new(subscription_to_add.upstream_mv_table_id))
717                .or_default()
718                .insert(subscription_to_add.subscriber_id)
719            {
720                if cfg!(debug_assertions) {
721                    panic!("add an existing subscription: {:?}", subscription_to_add);
722                }
723                warn!(?subscription_to_add, "add an existing subscription");
724            }
725        }
726    }
727
728    pub(super) fn remove_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
729        for subscription_to_remove in subscriptions {
730            let upstream_table_id = TableId::new(subscription_to_remove.upstream_mv_table_id);
731            let Some(subscribers) = self.mv_depended_subscriptions.get_mut(&upstream_table_id)
732            else {
733                if cfg!(debug_assertions) {
734                    panic!(
735                        "unable to find upstream mv table to remove: {:?}",
736                        subscription_to_remove
737                    );
738                }
739                warn!(
740                    ?subscription_to_remove,
741                    "unable to find upstream mv table to remove"
742                );
743                continue;
744            };
745            if !subscribers.remove(&subscription_to_remove.subscriber_id) {
746                if cfg!(debug_assertions) {
747                    panic!(
748                        "unable to find subscriber to remove: {:?}",
749                        subscription_to_remove
750                    );
751                }
752                warn!(
753                    ?subscription_to_remove,
754                    "unable to find subscriber to remove"
755                );
756            }
757            if subscribers.is_empty() {
758                self.mv_depended_subscriptions.remove(&upstream_table_id);
759            }
760        }
761    }
762}
763
764impl DatabaseManagedBarrierState {
765    pub(super) fn transform_to_issued(
766        &mut self,
767        barrier: &Barrier,
768        request: InjectBarrierRequest,
769    ) -> StreamResult<()> {
770        let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
771        let actor_to_stop = barrier.all_stop_actors();
772        let is_stop_actor = |actor_id| {
773            actor_to_stop
774                .map(|actors| actors.contains(&actor_id))
775                .unwrap_or(false)
776        };
777        let graph_state = self
778            .graph_states
779            .get_mut(&partial_graph_id)
780            .expect("should exist");
781
782        graph_state.add_subscriptions(request.subscriptions_to_add);
783        graph_state.remove_subscriptions(request.subscriptions_to_remove);
784
785        let table_ids =
786            HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
787        self.table_ids.extend(table_ids.iter().cloned());
788
789        graph_state.transform_to_issued(
790            barrier,
791            request.actor_ids_to_collect.iter().cloned(),
792            table_ids,
793        );
794
795        let mut new_actors = HashSet::new();
796        let subscriptions =
797            LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
798        for (node, fragment_id, actor) in
799            request
800                .actors_to_build
801                .into_iter()
802                .flat_map(|fragment_actors| {
803                    let node = Arc::new(fragment_actors.node.unwrap());
804                    fragment_actors
805                        .actors
806                        .into_iter()
807                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
808                })
809        {
810            let actor_id = actor.actor_id;
811            assert!(!is_stop_actor(actor_id));
812            assert!(new_actors.insert(actor_id));
813            assert!(request.actor_ids_to_collect.contains(&actor_id));
814            let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
815            if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
816            {
817                for request in pending_requests {
818                    let _ = new_output_request_tx.send(request);
819                }
820            }
821            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
822                actor,
823                fragment_id,
824                node,
825                (*subscriptions).clone(),
826                self.local_barrier_manager.clone(),
827                new_output_request_rx,
828            );
829            assert!(
830                self.actor_states
831                    .try_insert(
832                        actor_id,
833                        InflightActorState::start(
834                            actor_id,
835                            partial_graph_id,
836                            barrier,
837                            new_output_request_tx,
838                            join_handle,
839                            monitor_join_handle
840                        )
841                    )
842                    .is_ok()
843            );
844        }
845
846        // Spawn a trivial join handle to be compatible with the unit test. In the unit tests that involve local barrier manager,
847        // actors are spawned in the local test logic, but we assume that there is an entry for each spawned actor in ·actor_states`,
848        // so under cfg!(test) we add a dummy entry for each new actor.
849        if cfg!(test) {
850            for actor_id in &request.actor_ids_to_collect {
851                if !self.actor_states.contains_key(actor_id) {
852                    let (tx, rx) = unbounded_channel();
853                    let join_handle = self.actor_manager.runtime.spawn(async move {
854                        // The rx is spawned so that tx.send() will not fail.
855                        let _ = rx;
856                        pending().await
857                    });
858                    assert!(
859                        self.actor_states
860                            .try_insert(
861                                *actor_id,
862                                InflightActorState::start(
863                                    *actor_id,
864                                    partial_graph_id,
865                                    barrier,
866                                    tx,
867                                    join_handle,
868                                    None,
869                                )
870                            )
871                            .is_ok()
872                    );
873                    new_actors.insert(*actor_id);
874                }
875            }
876        }
877
878        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
879        // we call `start_epoch` on the graph before the actors receive the barrier
880        for actor_id in &request.actor_ids_to_collect {
881            if new_actors.contains(actor_id) {
882                continue;
883            }
884            self.actor_states
885                .get_mut(actor_id)
886                .unwrap_or_else(|| {
887                    panic!(
888                        "should exist: {} {:?}",
889                        actor_id, request.actor_ids_to_collect
890                    );
891                })
892                .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
893        }
894
895        Ok(())
896    }
897
898    pub(super) fn new_actor_remote_output_request(
899        &mut self,
900        actor_id: ActorId,
901        upstream_actor_id: ActorId,
902        result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
903    ) {
904        let (tx, rx) = channel_from_config(self.local_barrier_manager.env.config());
905        self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
906        let _ = result_sender.send(Ok(rx));
907    }
908
909    pub(super) fn new_actor_output_request(
910        &mut self,
911        actor_id: ActorId,
912        upstream_actor_id: ActorId,
913        request: NewOutputRequest,
914    ) {
915        if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
916            let _ = actor.new_output_request_tx.send((actor_id, request));
917        } else {
918            self.actor_pending_new_output_requests
919                .entry(upstream_actor_id)
920                .or_default()
921                .push((actor_id, request));
922        }
923    }
924
925    /// Handles [`LocalBarrierEvent`] from [`crate::task::barrier_manager::LocalBarrierManager`].
926    pub(super) fn poll_next_event(
927        &mut self,
928        cx: &mut Context<'_>,
929    ) -> Poll<ManagedBarrierStateEvent> {
930        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
931            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
932            return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
933        }
934        // yield some pending collected epochs
935        for (partial_graph_id, graph_state) in &mut self.graph_states {
936            if let Some(barrier) = graph_state.may_have_collected_all() {
937                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
938                    partial_graph_id: *partial_graph_id,
939                    barrier,
940                });
941            }
942        }
943        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
944            match event.expect("non-empty when tx in local_barrier_manager") {
945                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
946                    if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
947                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
948                            partial_graph_id,
949                            barrier,
950                        });
951                    }
952                }
953                LocalBarrierEvent::ReportCreateProgress {
954                    epoch,
955                    actor,
956                    state,
957                } => {
958                    self.update_create_mview_progress(epoch, actor, state);
959                }
960                LocalBarrierEvent::ReportSourceLoadFinished {
961                    epoch,
962                    actor_id,
963                    table_id,
964                    associated_source_id,
965                } => {
966                    self.report_source_load_finished(
967                        epoch,
968                        actor_id,
969                        table_id,
970                        associated_source_id,
971                    );
972                }
973                LocalBarrierEvent::RegisterBarrierSender {
974                    actor_id,
975                    barrier_sender,
976                } => {
977                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
978                        return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
979                    }
980                }
981                LocalBarrierEvent::RegisterLocalUpstreamOutput {
982                    actor_id,
983                    upstream_actor_id,
984                    tx,
985                } => {
986                    self.new_actor_output_request(
987                        actor_id,
988                        upstream_actor_id,
989                        NewOutputRequest::Local(tx),
990                    );
991                }
992                LocalBarrierEvent::ReportCdcTableBackfillProgress {
993                    actor_id,
994                    epoch,
995                    state,
996                } => {
997                    self.update_cdc_table_backfill_progress(epoch, actor_id, state);
998                }
999            }
1000        }
1001
1002        debug_assert!(
1003            self.graph_states
1004                .values_mut()
1005                .all(|graph_state| graph_state.may_have_collected_all().is_none())
1006        );
1007        Poll::Pending
1008    }
1009}
1010
1011impl DatabaseManagedBarrierState {
1012    #[must_use]
1013    pub(super) fn collect(
1014        &mut self,
1015        actor_id: ActorId,
1016        epoch: EpochPair,
1017    ) -> Option<(PartialGraphId, Barrier)> {
1018        let (prev_partial_graph_id, is_finished) = self
1019            .actor_states
1020            .get_mut(&actor_id)
1021            .expect("should exist")
1022            .collect(epoch);
1023        if is_finished {
1024            let state = self.actor_states.remove(&actor_id).expect("should exist");
1025            if let Some(monitor_task_handle) = state.monitor_task_handle {
1026                monitor_task_handle.abort();
1027            }
1028        }
1029        let prev_graph_state = self
1030            .graph_states
1031            .get_mut(&prev_partial_graph_id)
1032            .expect("should exist");
1033        prev_graph_state.collect(actor_id, epoch);
1034        prev_graph_state
1035            .may_have_collected_all()
1036            .map(|barrier| (prev_partial_graph_id, barrier))
1037    }
1038
1039    #[allow(clippy::type_complexity)]
1040    pub(super) fn pop_barrier_to_complete(
1041        &mut self,
1042        partial_graph_id: PartialGraphId,
1043        prev_epoch: u64,
1044    ) -> (
1045        Barrier,
1046        Option<HashSet<TableId>>,
1047        Vec<PbCreateMviewProgress>,
1048        Vec<u32>,
1049        Vec<PbCdcTableBackfillProgress>,
1050    ) {
1051        self.graph_states
1052            .get_mut(&partial_graph_id)
1053            .expect("should exist")
1054            .pop_barrier_to_complete(prev_epoch)
1055    }
1056
1057    /// Collect actor errors for a while and find the one that might be the root cause.
1058    ///
1059    /// Returns `None` if there's no actor error received.
1060    async fn try_find_root_actor_failure(
1061        &mut self,
1062        first_failure: Option<(Option<ActorId>, StreamError)>,
1063    ) -> Option<ScoredStreamError> {
1064        let mut later_errs = vec![];
1065        // fetch more actor errors within a timeout
1066        let _ = tokio::time::timeout(Duration::from_secs(3), async {
1067            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1068            if let Some((Some(failed_actor), _)) = &first_failure {
1069                uncollected_actors.remove(failed_actor);
1070            }
1071            while !uncollected_actors.is_empty()
1072                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1073            {
1074                uncollected_actors.remove(&actor_id);
1075                later_errs.push(error);
1076            }
1077        })
1078        .await;
1079
1080        first_failure
1081            .into_iter()
1082            .map(|(_, err)| err)
1083            .chain(later_errs.into_iter())
1084            .map(|e| e.with_score())
1085            .max_by_key(|e| e.score)
1086    }
1087
1088    /// Report that a source has finished loading for a specific epoch
1089    pub(super) fn report_source_load_finished(
1090        &mut self,
1091        epoch: EpochPair,
1092        actor_id: ActorId,
1093        _table_id: u32,
1094        associated_source_id: u32,
1095    ) {
1096        // Find the correct partial graph state by matching the actor's partial graph id
1097        if let Some(actor_state) = self.actor_states.get(&actor_id)
1098            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1099            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1100        {
1101            graph_state
1102                .load_finished_source_ids
1103                .entry(epoch.curr)
1104                .or_default()
1105                .insert(associated_source_id);
1106        } else {
1107            warn!(
1108                ?epoch,
1109                actor_id, associated_source_id, "ignore source load finished"
1110            );
1111        }
1112    }
1113}
1114
1115impl PartialGraphManagedBarrierState {
1116    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
1117    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
1118    /// has been collected from all actors for an `Issued` barrier.
1119    fn may_have_collected_all(&mut self) -> Option<Barrier> {
1120        for barrier_state in self.epoch_barrier_state_map.values_mut() {
1121            match &barrier_state.inner {
1122                ManagedBarrierStateInner::Issued(IssuedState {
1123                    remaining_actors, ..
1124                }) if remaining_actors.is_empty() => {}
1125                ManagedBarrierStateInner::AllCollected { .. } => {
1126                    continue;
1127                }
1128                ManagedBarrierStateInner::Issued(_) => {
1129                    break;
1130                }
1131            }
1132
1133            self.streaming_metrics.barrier_manager_progress.inc();
1134
1135            let create_mview_progress = self
1136                .create_mview_progress
1137                .remove(&barrier_state.barrier.epoch.curr)
1138                .unwrap_or_default()
1139                .into_iter()
1140                .map(|(actor, state)| state.to_pb(actor))
1141                .collect();
1142
1143            let load_finished_source_ids = self
1144                .load_finished_source_ids
1145                .remove(&barrier_state.barrier.epoch.curr)
1146                .unwrap_or_default()
1147                .into_iter()
1148                .collect();
1149            let cdc_table_backfill_progress = self
1150                .cdc_table_backfill_progress
1151                .remove(&barrier_state.barrier.epoch.curr)
1152                .unwrap_or_default()
1153                .into_iter()
1154                .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1155                .collect();
1156
1157            let prev_state = replace(
1158                &mut barrier_state.inner,
1159                ManagedBarrierStateInner::AllCollected {
1160                    create_mview_progress,
1161                    load_finished_source_ids,
1162                    cdc_table_backfill_progress,
1163                },
1164            );
1165
1166            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1167                barrier_inflight_latency: timer,
1168                ..
1169            }) => {
1170                timer.observe_duration();
1171            });
1172
1173            return Some(barrier_state.barrier.clone());
1174        }
1175        None
1176    }
1177
1178    #[allow(clippy::type_complexity)]
1179    fn pop_barrier_to_complete(
1180        &mut self,
1181        prev_epoch: u64,
1182    ) -> (
1183        Barrier,
1184        Option<HashSet<TableId>>,
1185        Vec<PbCreateMviewProgress>,
1186        Vec<u32>,
1187        Vec<PbCdcTableBackfillProgress>,
1188    ) {
1189        let (popped_prev_epoch, barrier_state) = self
1190            .epoch_barrier_state_map
1191            .pop_first()
1192            .expect("should exist");
1193
1194        assert_eq!(prev_epoch, popped_prev_epoch);
1195
1196        let (create_mview_progress, load_finished_source_ids, cdc_table_backfill_progress) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1197            create_mview_progress,
1198            load_finished_source_ids,
1199            cdc_table_backfill_progress,
1200        } => {
1201            (create_mview_progress, load_finished_source_ids, cdc_table_backfill_progress)
1202        });
1203        (
1204            barrier_state.barrier,
1205            barrier_state.table_ids,
1206            create_mview_progress,
1207            load_finished_source_ids,
1208            cdc_table_backfill_progress,
1209        )
1210    }
1211}
1212
1213impl PartialGraphManagedBarrierState {
1214    /// Collect a `barrier` from the actor with `actor_id`.
1215    pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1216        tracing::debug!(
1217            target: "events::stream::barrier::manager::collect",
1218            ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1219            "collect_barrier",
1220        );
1221
1222        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1223            None => {
1224                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1225                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1226                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1227                panic!(
1228                    "cannot collect new actor barrier {:?} at current state: None",
1229                    epoch,
1230                )
1231            }
1232            Some(&mut BarrierState {
1233                ref barrier,
1234                inner:
1235                    ManagedBarrierStateInner::Issued(IssuedState {
1236                        ref mut remaining_actors,
1237                        ..
1238                    }),
1239                ..
1240            }) => {
1241                let exist = remaining_actors.remove(&actor_id);
1242                assert!(
1243                    exist,
1244                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1245                    actor_id, epoch.curr
1246                );
1247                assert_eq!(barrier.epoch.curr, epoch.curr);
1248            }
1249            Some(BarrierState { inner, .. }) => {
1250                panic!(
1251                    "cannot collect new actor barrier {:?} at current state: {:?}",
1252                    epoch, inner
1253                )
1254            }
1255        }
1256    }
1257
1258    /// When the meta service issues a `send_barrier` request, call this function to transform to
1259    /// `Issued` and start to collect or to notify.
1260    pub(super) fn transform_to_issued(
1261        &mut self,
1262        barrier: &Barrier,
1263        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1264        table_ids: HashSet<TableId>,
1265    ) {
1266        let timer = self
1267            .streaming_metrics
1268            .barrier_inflight_latency
1269            .start_timer();
1270
1271        if let Some(hummock) = self.state_store.as_hummock() {
1272            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1273        }
1274
1275        let table_ids = match barrier.kind {
1276            BarrierKind::Unspecified => {
1277                unreachable!()
1278            }
1279            BarrierKind::Initial => {
1280                assert!(
1281                    self.prev_barrier_table_ids.is_none(),
1282                    "non empty table_ids at initial barrier: {:?}",
1283                    self.prev_barrier_table_ids
1284                );
1285                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1286                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1287                None
1288            }
1289            BarrierKind::Barrier => {
1290                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1291                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1292                    assert_eq!(prev_table_ids, &table_ids);
1293                    *prev_epoch = barrier.epoch;
1294                } else {
1295                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1296                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1297                }
1298                None
1299            }
1300            BarrierKind::Checkpoint => Some(
1301                if let Some((prev_epoch, prev_table_ids)) = self
1302                    .prev_barrier_table_ids
1303                    .replace((barrier.epoch, table_ids))
1304                    && prev_epoch.curr == barrier.epoch.prev
1305                {
1306                    prev_table_ids
1307                } else {
1308                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1309                    HashSet::new()
1310                },
1311            ),
1312        };
1313
1314        if let Some(&mut BarrierState { ref inner, .. }) =
1315            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1316        {
1317            {
1318                panic!(
1319                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1320                    barrier.epoch, inner
1321                );
1322            }
1323        };
1324
1325        self.epoch_barrier_state_map.insert(
1326            barrier.epoch.prev,
1327            BarrierState {
1328                barrier: barrier.clone(),
1329                inner: ManagedBarrierStateInner::Issued(IssuedState {
1330                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1331                    barrier_inflight_latency: timer,
1332                }),
1333                table_ids,
1334            },
1335        );
1336    }
1337
1338    #[cfg(test)]
1339    async fn pop_next_completed_epoch(&mut self) -> u64 {
1340        if let Some(barrier) = self.may_have_collected_all() {
1341            self.pop_barrier_to_complete(barrier.epoch.prev);
1342            return barrier.epoch.prev;
1343        }
1344        pending().await
1345    }
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350    use std::collections::HashSet;
1351
1352    use risingwave_common::util::epoch::test_epoch;
1353
1354    use crate::executor::Barrier;
1355    use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1356
1357    #[tokio::test]
1358    async fn test_managed_state_add_actor() {
1359        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1360        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1361        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1362        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1363        let actor_ids_to_collect1 = HashSet::from([1, 2]);
1364        let actor_ids_to_collect2 = HashSet::from([1, 2]);
1365        let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1366        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1367        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1368        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1369        managed_barrier_state.collect(1, barrier1.epoch);
1370        managed_barrier_state.collect(2, barrier1.epoch);
1371        assert_eq!(
1372            managed_barrier_state.pop_next_completed_epoch().await,
1373            test_epoch(0)
1374        );
1375        assert_eq!(
1376            managed_barrier_state
1377                .epoch_barrier_state_map
1378                .first_key_value()
1379                .unwrap()
1380                .0,
1381            &test_epoch(1)
1382        );
1383        managed_barrier_state.collect(1, barrier2.epoch);
1384        managed_barrier_state.collect(1, barrier3.epoch);
1385        managed_barrier_state.collect(2, barrier2.epoch);
1386        assert_eq!(
1387            managed_barrier_state.pop_next_completed_epoch().await,
1388            test_epoch(1)
1389        );
1390        assert_eq!(
1391            managed_barrier_state
1392                .epoch_barrier_state_map
1393                .first_key_value()
1394                .unwrap()
1395                .0,
1396            &test_epoch(2)
1397        );
1398        managed_barrier_state.collect(2, barrier3.epoch);
1399        managed_barrier_state.collect(3, barrier3.epoch);
1400        assert_eq!(
1401            managed_barrier_state.pop_next_completed_epoch().await,
1402            test_epoch(2)
1403        );
1404        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1405    }
1406
1407    #[tokio::test]
1408    async fn test_managed_state_stop_actor() {
1409        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1410        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1411        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1412        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1413        let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1414        let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1415        let actor_ids_to_collect3 = HashSet::from([1, 2]);
1416        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1417        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1418        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1419
1420        managed_barrier_state.collect(1, barrier1.epoch);
1421        managed_barrier_state.collect(1, barrier2.epoch);
1422        managed_barrier_state.collect(1, barrier3.epoch);
1423        managed_barrier_state.collect(2, barrier1.epoch);
1424        managed_barrier_state.collect(2, barrier2.epoch);
1425        managed_barrier_state.collect(2, barrier3.epoch);
1426        assert_eq!(
1427            managed_barrier_state
1428                .epoch_barrier_state_map
1429                .first_key_value()
1430                .unwrap()
1431                .0,
1432            &0
1433        );
1434        managed_barrier_state.collect(3, barrier1.epoch);
1435        managed_barrier_state.collect(3, barrier2.epoch);
1436        assert_eq!(
1437            managed_barrier_state
1438                .epoch_barrier_state_map
1439                .first_key_value()
1440                .unwrap()
1441                .0,
1442            &0
1443        );
1444        managed_barrier_state.collect(4, barrier1.epoch);
1445        assert_eq!(
1446            managed_barrier_state.pop_next_completed_epoch().await,
1447            test_epoch(0)
1448        );
1449        assert_eq!(
1450            managed_barrier_state.pop_next_completed_epoch().await,
1451            test_epoch(1)
1452        );
1453        assert_eq!(
1454            managed_barrier_state.pop_next_completed_epoch().await,
1455            test_epoch(2)
1456        );
1457        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1458    }
1459}