risingwave_stream/task/barrier_worker/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::LazyCell;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::fmt::{Debug, Display, Formatter};
18use std::future::{Future, pending, poll_fn};
19use std::mem::replace;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use futures::FutureExt;
26use futures::stream::FuturesOrdered;
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::{DatabaseId, TableId};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_storage::StateStoreImpl;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::{mpsc, oneshot};
34use tokio::task::JoinHandle;
35
36use crate::error::{StreamError, StreamResult};
37use crate::executor::Barrier;
38use crate::executor::monitor::StreamingMetrics;
39use crate::task::progress::BackfillState;
40use crate::task::{
41    ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
42    StreamActorManager, UpDownActorIds,
43};
44
45struct IssuedState {
46    /// Actor ids remaining to be collected.
47    pub remaining_actors: BTreeSet<ActorId>,
48
49    pub barrier_inflight_latency: HistogramTimer,
50}
51
52impl Debug for IssuedState {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("IssuedState")
55            .field("remaining_actors", &self.remaining_actors)
56            .finish()
57    }
58}
59
60/// The state machine of local barrier manager.
61#[derive(Debug)]
62enum ManagedBarrierStateInner {
63    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
64    Issued(IssuedState),
65
66    /// The barrier has been collected by all remaining actors
67    AllCollected(Vec<PbCreateMviewProgress>),
68}
69
70#[derive(Debug)]
71struct BarrierState {
72    barrier: Barrier,
73    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
74    table_ids: Option<HashSet<TableId>>,
75    inner: ManagedBarrierStateInner,
76}
77
78use risingwave_common::must_match;
79use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
80use risingwave_pb::stream_service::InjectBarrierRequest;
81use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
82use risingwave_pb::stream_service::streaming_control_stream_request::{
83    DatabaseInitialPartialGraph, InitialPartialGraph,
84};
85
86use crate::executor::exchange::permit;
87use crate::executor::exchange::permit::channel_from_config;
88use crate::task::barrier_worker::ScoredStreamError;
89use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
90
91pub(super) struct ManagedBarrierStateDebugInfo<'a> {
92    running_actors: BTreeSet<ActorId>,
93    graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
94}
95
96impl Display for ManagedBarrierStateDebugInfo<'_> {
97    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98        write!(f, "running_actors: ")?;
99        for actor_id in &self.running_actors {
100            write!(f, "{}, ", actor_id)?;
101        }
102        for (partial_graph_id, graph_states) in self.graph_states {
103            writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
104            write!(f, "{}", graph_states)?;
105        }
106        Ok(())
107    }
108}
109
110impl Display for &'_ PartialGraphManagedBarrierState {
111    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
112        let mut prev_epoch = 0u64;
113        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
114            write!(f, "> Epoch {}: ", epoch)?;
115            match &barrier_state.inner {
116                ManagedBarrierStateInner::Issued(state) => {
117                    write!(
118                        f,
119                        "Issued [{:?}]. Remaining actors: [",
120                        barrier_state.barrier.kind
121                    )?;
122                    let mut is_prev_epoch_issued = false;
123                    if prev_epoch != 0 {
124                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
125                        if let ManagedBarrierStateInner::Issued(IssuedState {
126                            remaining_actors: remaining_actors_prev,
127                            ..
128                        }) = &bs.inner
129                        {
130                            // Only show the actors that are not in the previous epoch.
131                            is_prev_epoch_issued = true;
132                            let mut duplicates = 0usize;
133                            for actor_id in &state.remaining_actors {
134                                if !remaining_actors_prev.contains(actor_id) {
135                                    write!(f, "{}, ", actor_id)?;
136                                } else {
137                                    duplicates += 1;
138                                }
139                            }
140                            if duplicates > 0 {
141                                write!(f, "...and {} actors in prev epoch", duplicates)?;
142                            }
143                        }
144                    }
145                    if !is_prev_epoch_issued {
146                        for actor_id in &state.remaining_actors {
147                            write!(f, "{}, ", actor_id)?;
148                        }
149                    }
150                    write!(f, "]")?;
151                }
152                ManagedBarrierStateInner::AllCollected(_) => {
153                    write!(f, "AllCollected")?;
154                }
155            }
156            prev_epoch = *epoch;
157            writeln!(f)?;
158        }
159
160        if !self.create_mview_progress.is_empty() {
161            writeln!(f, "Create MView Progress:")?;
162            for (epoch, progress) in &self.create_mview_progress {
163                write!(f, "> Epoch {}:", epoch)?;
164                for (actor_id, state) in progress {
165                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
166                }
167            }
168        }
169
170        Ok(())
171    }
172}
173
174enum InflightActorStatus {
175    /// The actor has been issued some barriers, but has not collected the first barrier
176    IssuedFirst(Vec<Barrier>),
177    /// The actor has been issued some barriers, and has collected the first barrier
178    Running(u64),
179}
180
181impl InflightActorStatus {
182    fn max_issued_epoch(&self) -> u64 {
183        match self {
184            InflightActorStatus::Running(epoch) => *epoch,
185            InflightActorStatus::IssuedFirst(issued_barriers) => {
186                issued_barriers.last().expect("non-empty").epoch.prev
187            }
188        }
189    }
190}
191
192pub(crate) struct InflightActorState {
193    actor_id: ActorId,
194    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
195    /// `prev_epoch` -> partial graph id
196    pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
197    status: InflightActorStatus,
198    /// Whether the actor has been issued a stop barrier
199    is_stopping: bool,
200
201    new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
202    join_handle: JoinHandle<()>,
203    monitor_task_handle: Option<JoinHandle<()>>,
204}
205
206impl InflightActorState {
207    pub(super) fn start(
208        actor_id: ActorId,
209        initial_partial_graph_id: PartialGraphId,
210        initial_barrier: &Barrier,
211        new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
212        join_handle: JoinHandle<()>,
213        monitor_task_handle: Option<JoinHandle<()>>,
214    ) -> Self {
215        Self {
216            actor_id,
217            barrier_senders: vec![],
218            inflight_barriers: BTreeMap::from_iter([(
219                initial_barrier.epoch.prev,
220                initial_partial_graph_id,
221            )]),
222            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
223            is_stopping: false,
224            new_output_request_tx,
225            join_handle,
226            monitor_task_handle,
227        }
228    }
229
230    pub(super) fn issue_barrier(
231        &mut self,
232        partial_graph_id: PartialGraphId,
233        barrier: &Barrier,
234        is_stop: bool,
235    ) -> StreamResult<()> {
236        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
237
238        for barrier_sender in &self.barrier_senders {
239            barrier_sender.send(barrier.clone()).map_err(|_| {
240                StreamError::barrier_send(
241                    barrier.clone(),
242                    self.actor_id,
243                    "failed to send to registered sender",
244                )
245            })?;
246        }
247
248        assert!(
249            self.inflight_barriers
250                .insert(barrier.epoch.prev, partial_graph_id)
251                .is_none()
252        );
253
254        match &mut self.status {
255            InflightActorStatus::IssuedFirst(pending_barriers) => {
256                pending_barriers.push(barrier.clone());
257            }
258            InflightActorStatus::Running(prev_epoch) => {
259                *prev_epoch = barrier.epoch.prev;
260            }
261        };
262
263        if is_stop {
264            assert!(!self.is_stopping, "stopped actor should not issue barrier");
265            self.is_stopping = true;
266        }
267        Ok(())
268    }
269
270    pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
271        let (prev_epoch, prev_partial_graph_id) =
272            self.inflight_barriers.pop_first().expect("should exist");
273        assert_eq!(prev_epoch, epoch.prev);
274        match &self.status {
275            InflightActorStatus::IssuedFirst(pending_barriers) => {
276                assert_eq!(
277                    prev_epoch,
278                    pending_barriers.first().expect("non-empty").epoch.prev
279                );
280                self.status = InflightActorStatus::Running(
281                    pending_barriers.last().expect("non-empty").epoch.prev,
282                );
283            }
284            InflightActorStatus::Running(_) => {}
285        }
286        (
287            prev_partial_graph_id,
288            self.inflight_barriers.is_empty() && self.is_stopping,
289        )
290    }
291}
292
293/// Part of [`DatabaseManagedBarrierState`]
294pub(crate) struct PartialGraphManagedBarrierState {
295    /// Record barrier state for each epoch of concurrent checkpoints.
296    ///
297    /// The key is `prev_epoch`, and the first value is `curr_epoch`
298    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
299
300    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
301
302    mv_depended_subscriptions: HashMap<TableId, HashSet<u32>>,
303
304    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
305    ///
306    /// This is updated by [`crate::task::barrier_manager::CreateMviewProgressReporter::update`] and will be reported to meta
307    /// in [`crate::task::barrier_worker::BarrierCompleteResult`].
308    pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
309
310    state_store: StateStoreImpl,
311
312    streaming_metrics: Arc<StreamingMetrics>,
313}
314
315impl PartialGraphManagedBarrierState {
316    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
317        Self::new_inner(
318            actor_manager.env.state_store(),
319            actor_manager.streaming_metrics.clone(),
320        )
321    }
322
323    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
324        Self {
325            epoch_barrier_state_map: Default::default(),
326            prev_barrier_table_ids: None,
327            mv_depended_subscriptions: Default::default(),
328            create_mview_progress: Default::default(),
329            state_store,
330            streaming_metrics,
331        }
332    }
333
334    #[cfg(test)]
335    pub(crate) fn for_test() -> Self {
336        Self::new_inner(
337            StateStoreImpl::for_test(),
338            Arc::new(StreamingMetrics::unused()),
339        )
340    }
341
342    pub(super) fn is_empty(&self) -> bool {
343        self.epoch_barrier_state_map.is_empty()
344    }
345}
346
347pub(crate) struct SuspendedDatabaseState {
348    pub(super) suspend_time: Instant,
349    inner: DatabaseManagedBarrierState,
350    failure: Option<(Option<ActorId>, StreamError)>,
351}
352
353impl SuspendedDatabaseState {
354    fn new(
355        state: DatabaseManagedBarrierState,
356        failure: Option<(Option<ActorId>, StreamError)>,
357        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
358    ) -> Self {
359        Self {
360            suspend_time: Instant::now(),
361            inner: state,
362            failure,
363        }
364    }
365
366    async fn reset(mut self) -> ResetDatabaseOutput {
367        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
368        self.inner.abort_and_wait_actors().await;
369        if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
370            hummock.clear_tables(self.inner.table_ids).await;
371        }
372        ResetDatabaseOutput { root_err }
373    }
374}
375
376pub(crate) struct ResettingDatabaseState {
377    join_handle: JoinHandle<ResetDatabaseOutput>,
378    reset_request_id: u32,
379}
380
381pub(crate) struct ResetDatabaseOutput {
382    pub(crate) root_err: Option<ScoredStreamError>,
383}
384
385pub(crate) enum DatabaseStatus {
386    ReceivedExchangeRequest(
387        Vec<(
388            UpDownActorIds,
389            oneshot::Sender<StreamResult<permit::Receiver>>,
390        )>,
391    ),
392    Running(DatabaseManagedBarrierState),
393    Suspended(SuspendedDatabaseState),
394    Resetting(ResettingDatabaseState),
395    /// temporary place holder
396    Unspecified,
397}
398
399impl DatabaseStatus {
400    pub(crate) async fn abort(&mut self) {
401        match self {
402            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
403                for (_, sender) in pending_requests.drain(..) {
404                    let _ = sender.send(Err(anyhow!("database aborted").into()));
405                }
406            }
407            DatabaseStatus::Running(state) => {
408                state.abort_and_wait_actors().await;
409            }
410            DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
411                state.abort_and_wait_actors().await;
412            }
413            DatabaseStatus::Resetting(state) => {
414                (&mut state.join_handle)
415                    .await
416                    .expect("failed to join reset database join handle");
417            }
418            DatabaseStatus::Unspecified => {
419                unreachable!()
420            }
421        }
422    }
423
424    pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
425        match self {
426            DatabaseStatus::ReceivedExchangeRequest(_) => {
427                unreachable!("should not handle request")
428            }
429            DatabaseStatus::Running(state) => Some(state),
430            DatabaseStatus::Suspended(_) => None,
431            DatabaseStatus::Resetting(_) => {
432                unreachable!("should not receive further request during cleaning")
433            }
434            DatabaseStatus::Unspecified => {
435                unreachable!()
436            }
437        }
438    }
439
440    pub(super) fn poll_next_event(
441        &mut self,
442        cx: &mut Context<'_>,
443    ) -> Poll<ManagedBarrierStateEvent> {
444        match self {
445            DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
446            DatabaseStatus::Running(state) => state.poll_next_event(cx),
447            DatabaseStatus::Suspended(_) => Poll::Pending,
448            DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
449                let output = result.expect("should be able to join");
450                ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
451            }),
452            DatabaseStatus::Unspecified => {
453                unreachable!()
454            }
455        }
456    }
457
458    pub(super) fn suspend(
459        &mut self,
460        failed_actor: Option<ActorId>,
461        err: StreamError,
462        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
463    ) {
464        let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
465        *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
466            state,
467            Some((failed_actor, err)),
468            completing_futures,
469        ));
470    }
471
472    pub(super) fn start_reset(
473        &mut self,
474        database_id: DatabaseId,
475        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
476        reset_request_id: u32,
477    ) {
478        let join_handle = match replace(self, DatabaseStatus::Unspecified) {
479            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
480                for (_, sender) in pending_requests {
481                    let _ = sender.send(Err(anyhow!("database reset").into()));
482                }
483                tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
484            }
485            DatabaseStatus::Running(state) => {
486                assert_eq!(database_id, state.database_id);
487                info!(
488                    database_id = database_id.database_id,
489                    reset_request_id, "start database reset from Running"
490                );
491                tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
492            }
493            DatabaseStatus::Suspended(state) => {
494                assert!(
495                    completing_futures.is_none(),
496                    "should have been clear when suspended"
497                );
498                assert_eq!(database_id, state.inner.database_id);
499                info!(
500                    database_id = database_id.database_id,
501                    reset_request_id,
502                    suspend_elapsed = ?state.suspend_time.elapsed(),
503                    "start database reset after suspended"
504                );
505                tokio::spawn(state.reset())
506            }
507            DatabaseStatus::Resetting(state) => {
508                let prev_request_id = state.reset_request_id;
509                info!(
510                    database_id = database_id.database_id,
511                    reset_request_id, prev_request_id, "receive duplicate reset request"
512                );
513                assert!(reset_request_id > prev_request_id);
514                state.join_handle
515            }
516            DatabaseStatus::Unspecified => {
517                unreachable!()
518            }
519        };
520        *self = DatabaseStatus::Resetting(ResettingDatabaseState {
521            join_handle,
522            reset_request_id,
523        });
524    }
525}
526
527pub(crate) struct ManagedBarrierState {
528    pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
529}
530
531pub(super) enum ManagedBarrierStateEvent {
532    BarrierCollected {
533        partial_graph_id: PartialGraphId,
534        barrier: Barrier,
535    },
536    ActorError {
537        actor_id: ActorId,
538        err: StreamError,
539    },
540    DatabaseReset(ResetDatabaseOutput, u32),
541}
542
543impl ManagedBarrierState {
544    pub(super) fn new(
545        actor_manager: Arc<StreamActorManager>,
546        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
547        term_id: String,
548    ) -> Self {
549        let mut databases = HashMap::new();
550        for database in initial_partial_graphs {
551            let database_id = DatabaseId::new(database.database_id);
552            assert!(!databases.contains_key(&database_id));
553            let state = DatabaseManagedBarrierState::new(
554                database_id,
555                term_id.clone(),
556                actor_manager.clone(),
557                database.graphs,
558            );
559            databases.insert(database_id, DatabaseStatus::Running(state));
560        }
561
562        Self { databases }
563    }
564
565    pub(super) fn next_event(
566        &mut self,
567    ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
568        poll_fn(|cx| {
569            for (database_id, database) in &mut self.databases {
570                if let Poll::Ready(event) = database.poll_next_event(cx) {
571                    return Poll::Ready((*database_id, event));
572                }
573            }
574            Poll::Pending
575        })
576    }
577}
578
579/// Per-database barrier state manager. Handles barriers for one specific database.
580/// Part of [`ManagedBarrierState`] in [`super::LocalBarrierWorker`].
581///
582/// See [`crate::task`] for architecture overview.
583pub(crate) struct DatabaseManagedBarrierState {
584    database_id: DatabaseId,
585    pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
586    pub(super) actor_pending_new_output_requests:
587        HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
588
589    pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
590
591    table_ids: HashSet<TableId>,
592
593    actor_manager: Arc<StreamActorManager>,
594
595    pub(super) local_barrier_manager: LocalBarrierManager,
596
597    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
598    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
599}
600
601impl DatabaseManagedBarrierState {
602    /// Create a barrier manager state. This will be called only once.
603    pub(super) fn new(
604        database_id: DatabaseId,
605        term_id: String,
606        actor_manager: Arc<StreamActorManager>,
607        initial_partial_graphs: Vec<InitialPartialGraph>,
608    ) -> Self {
609        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
610            LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
611        Self {
612            database_id,
613            actor_states: Default::default(),
614            actor_pending_new_output_requests: Default::default(),
615            graph_states: initial_partial_graphs
616                .into_iter()
617                .map(|graph| {
618                    let mut state = PartialGraphManagedBarrierState::new(&actor_manager);
619                    state.add_subscriptions(graph.subscriptions);
620                    (PartialGraphId::new(graph.partial_graph_id), state)
621                })
622                .collect(),
623            table_ids: Default::default(),
624            actor_manager,
625            local_barrier_manager,
626            barrier_event_rx,
627            actor_failure_rx,
628        }
629    }
630
631    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
632        ManagedBarrierStateDebugInfo {
633            running_actors: self.actor_states.keys().cloned().collect(),
634            graph_states: &self.graph_states,
635        }
636    }
637
638    async fn abort_and_wait_actors(&mut self) {
639        for (actor_id, state) in &self.actor_states {
640            tracing::debug!("force stopping actor {}", actor_id);
641            state.join_handle.abort();
642            if let Some(monitor_task_handle) = &state.monitor_task_handle {
643                monitor_task_handle.abort();
644            }
645        }
646
647        for (actor_id, state) in self.actor_states.drain() {
648            tracing::debug!("join actor {}", actor_id);
649            let result = state.join_handle.await;
650            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
651        }
652    }
653}
654
655impl InflightActorState {
656    pub(super) fn register_barrier_sender(
657        &mut self,
658        tx: mpsc::UnboundedSender<Barrier>,
659    ) -> StreamResult<()> {
660        match &self.status {
661            InflightActorStatus::IssuedFirst(pending_barriers) => {
662                for barrier in pending_barriers {
663                    tx.send(barrier.clone()).map_err(|_| {
664                        StreamError::barrier_send(
665                            barrier.clone(),
666                            self.actor_id,
667                            "failed to send pending barriers to newly registered sender",
668                        )
669                    })?;
670                }
671                self.barrier_senders.push(tx);
672            }
673            InflightActorStatus::Running(_) => {
674                unreachable!("should not register barrier sender when entering Running status")
675            }
676        }
677        Ok(())
678    }
679}
680
681impl DatabaseManagedBarrierState {
682    pub(super) fn register_barrier_sender(
683        &mut self,
684        actor_id: ActorId,
685        tx: mpsc::UnboundedSender<Barrier>,
686    ) -> StreamResult<()> {
687        self.actor_states
688            .get_mut(&actor_id)
689            .expect("should exist")
690            .register_barrier_sender(tx)
691    }
692}
693
694impl PartialGraphManagedBarrierState {
695    pub(super) fn add_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
696        for subscription_to_add in subscriptions {
697            if !self
698                .mv_depended_subscriptions
699                .entry(TableId::new(subscription_to_add.upstream_mv_table_id))
700                .or_default()
701                .insert(subscription_to_add.subscriber_id)
702            {
703                if cfg!(debug_assertions) {
704                    panic!("add an existing subscription: {:?}", subscription_to_add);
705                }
706                warn!(?subscription_to_add, "add an existing subscription");
707            }
708        }
709    }
710
711    pub(super) fn remove_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
712        for subscription_to_remove in subscriptions {
713            let upstream_table_id = TableId::new(subscription_to_remove.upstream_mv_table_id);
714            let Some(subscribers) = self.mv_depended_subscriptions.get_mut(&upstream_table_id)
715            else {
716                if cfg!(debug_assertions) {
717                    panic!(
718                        "unable to find upstream mv table to remove: {:?}",
719                        subscription_to_remove
720                    );
721                }
722                warn!(
723                    ?subscription_to_remove,
724                    "unable to find upstream mv table to remove"
725                );
726                continue;
727            };
728            if !subscribers.remove(&subscription_to_remove.subscriber_id) {
729                if cfg!(debug_assertions) {
730                    panic!(
731                        "unable to find subscriber to remove: {:?}",
732                        subscription_to_remove
733                    );
734                }
735                warn!(
736                    ?subscription_to_remove,
737                    "unable to find subscriber to remove"
738                );
739            }
740            if subscribers.is_empty() {
741                self.mv_depended_subscriptions.remove(&upstream_table_id);
742            }
743        }
744    }
745}
746
747impl DatabaseManagedBarrierState {
748    pub(super) fn transform_to_issued(
749        &mut self,
750        barrier: &Barrier,
751        request: InjectBarrierRequest,
752    ) -> StreamResult<()> {
753        let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
754        let actor_to_stop = barrier.all_stop_actors();
755        let is_stop_actor = |actor_id| {
756            actor_to_stop
757                .map(|actors| actors.contains(&actor_id))
758                .unwrap_or(false)
759        };
760        let graph_state = self
761            .graph_states
762            .get_mut(&partial_graph_id)
763            .expect("should exist");
764
765        graph_state.add_subscriptions(request.subscriptions_to_add);
766        graph_state.remove_subscriptions(request.subscriptions_to_remove);
767
768        let table_ids =
769            HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
770        self.table_ids.extend(table_ids.iter().cloned());
771
772        graph_state.transform_to_issued(
773            barrier,
774            request.actor_ids_to_collect.iter().cloned(),
775            table_ids,
776        );
777
778        let mut new_actors = HashSet::new();
779        let subscriptions =
780            LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
781        for (node, fragment_id, actor) in
782            request
783                .actors_to_build
784                .into_iter()
785                .flat_map(|fragment_actors| {
786                    let node = Arc::new(fragment_actors.node.unwrap());
787                    fragment_actors
788                        .actors
789                        .into_iter()
790                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
791                })
792        {
793            let actor_id = actor.actor_id;
794            assert!(!is_stop_actor(actor_id));
795            assert!(new_actors.insert(actor_id));
796            assert!(request.actor_ids_to_collect.contains(&actor_id));
797            let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
798            if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
799            {
800                for request in pending_requests {
801                    let _ = new_output_request_tx.send(request);
802                }
803            }
804            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
805                actor,
806                fragment_id,
807                node,
808                (*subscriptions).clone(),
809                self.local_barrier_manager.clone(),
810                new_output_request_rx,
811            );
812            assert!(
813                self.actor_states
814                    .try_insert(
815                        actor_id,
816                        InflightActorState::start(
817                            actor_id,
818                            partial_graph_id,
819                            barrier,
820                            new_output_request_tx,
821                            join_handle,
822                            monitor_join_handle
823                        )
824                    )
825                    .is_ok()
826            );
827        }
828
829        // Spawn a trivial join handle to be compatible with the unit test. In the unit tests that involve local barrier manager,
830        // actors are spawned in the local test logic, but we assume that there is an entry for each spawned actor in ·actor_states`,
831        // so under cfg!(test) we add a dummy entry for each new actor.
832        if cfg!(test) {
833            for actor_id in &request.actor_ids_to_collect {
834                if !self.actor_states.contains_key(actor_id) {
835                    let (tx, rx) = unbounded_channel();
836                    let join_handle = self.actor_manager.runtime.spawn(async move {
837                        // The rx is spawned so that tx.send() will not fail.
838                        let _ = rx;
839                        pending().await
840                    });
841                    assert!(
842                        self.actor_states
843                            .try_insert(
844                                *actor_id,
845                                InflightActorState::start(
846                                    *actor_id,
847                                    partial_graph_id,
848                                    barrier,
849                                    tx,
850                                    join_handle,
851                                    None,
852                                )
853                            )
854                            .is_ok()
855                    );
856                    new_actors.insert(*actor_id);
857                }
858            }
859        }
860
861        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
862        // we call `start_epoch` on the graph before the actors receive the barrier
863        for actor_id in &request.actor_ids_to_collect {
864            if new_actors.contains(actor_id) {
865                continue;
866            }
867            self.actor_states
868                .get_mut(actor_id)
869                .unwrap_or_else(|| {
870                    panic!(
871                        "should exist: {} {:?}",
872                        actor_id, request.actor_ids_to_collect
873                    );
874                })
875                .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
876        }
877
878        Ok(())
879    }
880
881    pub(super) fn new_actor_remote_output_request(
882        &mut self,
883        actor_id: ActorId,
884        upstream_actor_id: ActorId,
885        result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
886    ) {
887        let (tx, rx) = channel_from_config(self.local_barrier_manager.env.config());
888        self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
889        let _ = result_sender.send(Ok(rx));
890    }
891
892    pub(super) fn new_actor_output_request(
893        &mut self,
894        actor_id: ActorId,
895        upstream_actor_id: ActorId,
896        request: NewOutputRequest,
897    ) {
898        if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
899            let _ = actor.new_output_request_tx.send((actor_id, request));
900        } else {
901            self.actor_pending_new_output_requests
902                .entry(upstream_actor_id)
903                .or_default()
904                .push((actor_id, request));
905        }
906    }
907
908    /// Handles [`LocalBarrierEvent`] from [`crate::task::barrier_manager::LocalBarrierManager`].
909    pub(super) fn poll_next_event(
910        &mut self,
911        cx: &mut Context<'_>,
912    ) -> Poll<ManagedBarrierStateEvent> {
913        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
914            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
915            return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
916        }
917        // yield some pending collected epochs
918        for (partial_graph_id, graph_state) in &mut self.graph_states {
919            if let Some(barrier) = graph_state.may_have_collected_all() {
920                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
921                    partial_graph_id: *partial_graph_id,
922                    barrier,
923                });
924            }
925        }
926        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
927            match event.expect("non-empty when tx in local_barrier_manager") {
928                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
929                    if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
930                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
931                            partial_graph_id,
932                            barrier,
933                        });
934                    }
935                }
936                LocalBarrierEvent::ReportCreateProgress {
937                    epoch,
938                    actor,
939                    state,
940                } => {
941                    self.update_create_mview_progress(epoch, actor, state);
942                }
943                LocalBarrierEvent::RegisterBarrierSender {
944                    actor_id,
945                    barrier_sender,
946                } => {
947                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
948                        return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
949                    }
950                }
951                LocalBarrierEvent::RegisterLocalUpstreamOutput {
952                    actor_id,
953                    upstream_actor_id,
954                    tx,
955                } => {
956                    self.new_actor_output_request(
957                        actor_id,
958                        upstream_actor_id,
959                        NewOutputRequest::Local(tx),
960                    );
961                }
962            }
963        }
964
965        debug_assert!(
966            self.graph_states
967                .values_mut()
968                .all(|graph_state| graph_state.may_have_collected_all().is_none())
969        );
970        Poll::Pending
971    }
972}
973
974impl DatabaseManagedBarrierState {
975    #[must_use]
976    pub(super) fn collect(
977        &mut self,
978        actor_id: ActorId,
979        epoch: EpochPair,
980    ) -> Option<(PartialGraphId, Barrier)> {
981        let (prev_partial_graph_id, is_finished) = self
982            .actor_states
983            .get_mut(&actor_id)
984            .expect("should exist")
985            .collect(epoch);
986        if is_finished {
987            let state = self.actor_states.remove(&actor_id).expect("should exist");
988            if let Some(monitor_task_handle) = state.monitor_task_handle {
989                monitor_task_handle.abort();
990            }
991        }
992        let prev_graph_state = self
993            .graph_states
994            .get_mut(&prev_partial_graph_id)
995            .expect("should exist");
996        prev_graph_state.collect(actor_id, epoch);
997        prev_graph_state
998            .may_have_collected_all()
999            .map(|barrier| (prev_partial_graph_id, barrier))
1000    }
1001
1002    pub(super) fn pop_barrier_to_complete(
1003        &mut self,
1004        partial_graph_id: PartialGraphId,
1005        prev_epoch: u64,
1006    ) -> (
1007        Barrier,
1008        Option<HashSet<TableId>>,
1009        Vec<PbCreateMviewProgress>,
1010    ) {
1011        self.graph_states
1012            .get_mut(&partial_graph_id)
1013            .expect("should exist")
1014            .pop_barrier_to_complete(prev_epoch)
1015    }
1016
1017    /// Collect actor errors for a while and find the one that might be the root cause.
1018    ///
1019    /// Returns `None` if there's no actor error received.
1020    async fn try_find_root_actor_failure(
1021        &mut self,
1022        first_failure: Option<(Option<ActorId>, StreamError)>,
1023    ) -> Option<ScoredStreamError> {
1024        let mut later_errs = vec![];
1025        // fetch more actor errors within a timeout
1026        let _ = tokio::time::timeout(Duration::from_secs(3), async {
1027            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1028            if let Some((Some(failed_actor), _)) = &first_failure {
1029                uncollected_actors.remove(failed_actor);
1030            }
1031            while !uncollected_actors.is_empty()
1032                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1033            {
1034                uncollected_actors.remove(&actor_id);
1035                later_errs.push(error);
1036            }
1037        })
1038        .await;
1039
1040        first_failure
1041            .into_iter()
1042            .map(|(_, err)| err)
1043            .chain(later_errs.into_iter())
1044            .map(|e| e.with_score())
1045            .max_by_key(|e| e.score)
1046    }
1047}
1048
1049impl PartialGraphManagedBarrierState {
1050    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
1051    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
1052    /// has been collected from all actors for an `Issued` barrier.
1053    fn may_have_collected_all(&mut self) -> Option<Barrier> {
1054        for barrier_state in self.epoch_barrier_state_map.values_mut() {
1055            match &barrier_state.inner {
1056                ManagedBarrierStateInner::Issued(IssuedState {
1057                    remaining_actors, ..
1058                }) if remaining_actors.is_empty() => {}
1059                ManagedBarrierStateInner::AllCollected(_) => {
1060                    continue;
1061                }
1062                ManagedBarrierStateInner::Issued(_) => {
1063                    break;
1064                }
1065            }
1066
1067            self.streaming_metrics.barrier_manager_progress.inc();
1068
1069            let create_mview_progress = self
1070                .create_mview_progress
1071                .remove(&barrier_state.barrier.epoch.curr)
1072                .unwrap_or_default()
1073                .into_iter()
1074                .map(|(actor, state)| state.to_pb(actor))
1075                .collect();
1076
1077            let prev_state = replace(
1078                &mut barrier_state.inner,
1079                ManagedBarrierStateInner::AllCollected(create_mview_progress),
1080            );
1081
1082            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1083                barrier_inflight_latency: timer,
1084                ..
1085            }) => {
1086                timer.observe_duration();
1087            });
1088
1089            return Some(barrier_state.barrier.clone());
1090        }
1091        None
1092    }
1093
1094    fn pop_barrier_to_complete(
1095        &mut self,
1096        prev_epoch: u64,
1097    ) -> (
1098        Barrier,
1099        Option<HashSet<TableId>>,
1100        Vec<PbCreateMviewProgress>,
1101    ) {
1102        let (popped_prev_epoch, barrier_state) = self
1103            .epoch_barrier_state_map
1104            .pop_first()
1105            .expect("should exist");
1106
1107        assert_eq!(prev_epoch, popped_prev_epoch);
1108
1109        let create_mview_progress = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected(create_mview_progress) => {
1110            create_mview_progress
1111        });
1112        (
1113            barrier_state.barrier,
1114            barrier_state.table_ids,
1115            create_mview_progress,
1116        )
1117    }
1118}
1119
1120impl PartialGraphManagedBarrierState {
1121    /// Collect a `barrier` from the actor with `actor_id`.
1122    pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1123        tracing::debug!(
1124            target: "events::stream::barrier::manager::collect",
1125            ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1126            "collect_barrier",
1127        );
1128
1129        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1130            None => {
1131                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1132                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1133                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1134                panic!(
1135                    "cannot collect new actor barrier {:?} at current state: None",
1136                    epoch,
1137                )
1138            }
1139            Some(&mut BarrierState {
1140                ref barrier,
1141                inner:
1142                    ManagedBarrierStateInner::Issued(IssuedState {
1143                        ref mut remaining_actors,
1144                        ..
1145                    }),
1146                ..
1147            }) => {
1148                let exist = remaining_actors.remove(&actor_id);
1149                assert!(
1150                    exist,
1151                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1152                    actor_id, epoch.curr
1153                );
1154                assert_eq!(barrier.epoch.curr, epoch.curr);
1155            }
1156            Some(BarrierState { inner, .. }) => {
1157                panic!(
1158                    "cannot collect new actor barrier {:?} at current state: {:?}",
1159                    epoch, inner
1160                )
1161            }
1162        }
1163    }
1164
1165    /// When the meta service issues a `send_barrier` request, call this function to transform to
1166    /// `Issued` and start to collect or to notify.
1167    pub(super) fn transform_to_issued(
1168        &mut self,
1169        barrier: &Barrier,
1170        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1171        table_ids: HashSet<TableId>,
1172    ) {
1173        let timer = self
1174            .streaming_metrics
1175            .barrier_inflight_latency
1176            .start_timer();
1177
1178        if let Some(hummock) = self.state_store.as_hummock() {
1179            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1180        }
1181
1182        let table_ids = match barrier.kind {
1183            BarrierKind::Unspecified => {
1184                unreachable!()
1185            }
1186            BarrierKind::Initial => {
1187                assert!(
1188                    self.prev_barrier_table_ids.is_none(),
1189                    "non empty table_ids at initial barrier: {:?}",
1190                    self.prev_barrier_table_ids
1191                );
1192                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1193                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1194                None
1195            }
1196            BarrierKind::Barrier => {
1197                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1198                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1199                    assert_eq!(prev_table_ids, &table_ids);
1200                    *prev_epoch = barrier.epoch;
1201                } else {
1202                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1203                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1204                }
1205                None
1206            }
1207            BarrierKind::Checkpoint => Some(
1208                if let Some((prev_epoch, prev_table_ids)) = self
1209                    .prev_barrier_table_ids
1210                    .replace((barrier.epoch, table_ids))
1211                    && prev_epoch.curr == barrier.epoch.prev
1212                {
1213                    prev_table_ids
1214                } else {
1215                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1216                    HashSet::new()
1217                },
1218            ),
1219        };
1220
1221        if let Some(&mut BarrierState { ref inner, .. }) =
1222            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1223        {
1224            {
1225                panic!(
1226                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1227                    barrier.epoch, inner
1228                );
1229            }
1230        };
1231
1232        self.epoch_barrier_state_map.insert(
1233            barrier.epoch.prev,
1234            BarrierState {
1235                barrier: barrier.clone(),
1236                inner: ManagedBarrierStateInner::Issued(IssuedState {
1237                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1238                    barrier_inflight_latency: timer,
1239                }),
1240                table_ids,
1241            },
1242        );
1243    }
1244
1245    #[cfg(test)]
1246    async fn pop_next_completed_epoch(&mut self) -> u64 {
1247        if let Some(barrier) = self.may_have_collected_all() {
1248            self.pop_barrier_to_complete(barrier.epoch.prev);
1249            return barrier.epoch.prev;
1250        }
1251        pending().await
1252    }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use std::collections::HashSet;
1258
1259    use risingwave_common::util::epoch::test_epoch;
1260
1261    use crate::executor::Barrier;
1262    use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1263
1264    #[tokio::test]
1265    async fn test_managed_state_add_actor() {
1266        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1267        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1268        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1269        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1270        let actor_ids_to_collect1 = HashSet::from([1, 2]);
1271        let actor_ids_to_collect2 = HashSet::from([1, 2]);
1272        let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1273        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1274        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1275        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1276        managed_barrier_state.collect(1, barrier1.epoch);
1277        managed_barrier_state.collect(2, barrier1.epoch);
1278        assert_eq!(
1279            managed_barrier_state.pop_next_completed_epoch().await,
1280            test_epoch(0)
1281        );
1282        assert_eq!(
1283            managed_barrier_state
1284                .epoch_barrier_state_map
1285                .first_key_value()
1286                .unwrap()
1287                .0,
1288            &test_epoch(1)
1289        );
1290        managed_barrier_state.collect(1, barrier2.epoch);
1291        managed_barrier_state.collect(1, barrier3.epoch);
1292        managed_barrier_state.collect(2, barrier2.epoch);
1293        assert_eq!(
1294            managed_barrier_state.pop_next_completed_epoch().await,
1295            test_epoch(1)
1296        );
1297        assert_eq!(
1298            managed_barrier_state
1299                .epoch_barrier_state_map
1300                .first_key_value()
1301                .unwrap()
1302                .0,
1303            &test_epoch(2)
1304        );
1305        managed_barrier_state.collect(2, barrier3.epoch);
1306        managed_barrier_state.collect(3, barrier3.epoch);
1307        assert_eq!(
1308            managed_barrier_state.pop_next_completed_epoch().await,
1309            test_epoch(2)
1310        );
1311        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1312    }
1313
1314    #[tokio::test]
1315    async fn test_managed_state_stop_actor() {
1316        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1317        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1318        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1319        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1320        let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1321        let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1322        let actor_ids_to_collect3 = HashSet::from([1, 2]);
1323        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1324        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1325        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1326
1327        managed_barrier_state.collect(1, barrier1.epoch);
1328        managed_barrier_state.collect(1, barrier2.epoch);
1329        managed_barrier_state.collect(1, barrier3.epoch);
1330        managed_barrier_state.collect(2, barrier1.epoch);
1331        managed_barrier_state.collect(2, barrier2.epoch);
1332        managed_barrier_state.collect(2, barrier3.epoch);
1333        assert_eq!(
1334            managed_barrier_state
1335                .epoch_barrier_state_map
1336                .first_key_value()
1337                .unwrap()
1338                .0,
1339            &0
1340        );
1341        managed_barrier_state.collect(3, barrier1.epoch);
1342        managed_barrier_state.collect(3, barrier2.epoch);
1343        assert_eq!(
1344            managed_barrier_state
1345                .epoch_barrier_state_map
1346                .first_key_value()
1347                .unwrap()
1348                .0,
1349            &0
1350        );
1351        managed_barrier_state.collect(4, barrier1.epoch);
1352        assert_eq!(
1353            managed_barrier_state.pop_next_completed_epoch().await,
1354            test_epoch(0)
1355        );
1356        assert_eq!(
1357            managed_barrier_state.pop_next_completed_epoch().await,
1358            test_epoch(1)
1359        );
1360        assert_eq!(
1361            managed_barrier_state.pop_next_completed_epoch().await,
1362            test_epoch(2)
1363        );
1364        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1365    }
1366}