1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::FutureExt;
25use futures::stream::FuturesOrdered;
26use prometheus::HistogramTimer;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_pb::stream_plan::barrier::BarrierKind;
30use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
31use risingwave_storage::StateStoreImpl;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::{mpsc, oneshot};
34use tokio::task::JoinHandle;
35
36use crate::error::{StreamError, StreamResult};
37use crate::executor::Barrier;
38use crate::executor::monitor::StreamingMetrics;
39use crate::task::progress::BackfillState;
40use crate::task::{
41 ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
42 StreamActorManager, UpDownActorIds,
43};
44
45struct IssuedState {
46 pub remaining_actors: BTreeSet<ActorId>,
48
49 pub barrier_inflight_latency: HistogramTimer,
50}
51
52impl Debug for IssuedState {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("IssuedState")
55 .field("remaining_actors", &self.remaining_actors)
56 .finish()
57 }
58}
59
60#[derive(Debug)]
62enum ManagedBarrierStateInner {
63 Issued(IssuedState),
65
66 AllCollected {
68 create_mview_progress: Vec<PbCreateMviewProgress>,
69 list_finished_source_ids: Vec<u32>,
70 load_finished_source_ids: Vec<u32>,
71 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
72 truncate_tables: Vec<u32>,
73 refresh_finished_tables: Vec<u32>,
74 },
75}
76
77#[derive(Debug)]
78struct BarrierState {
79 barrier: Barrier,
80 table_ids: Option<HashSet<TableId>>,
82 inner: ManagedBarrierStateInner,
83}
84
85use risingwave_common::must_match;
86use risingwave_pb::stream_service::InjectBarrierRequest;
87use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
88
89use crate::executor::exchange::permit;
90use crate::executor::exchange::permit::channel_from_config;
91use crate::task::barrier_worker::ScoredStreamError;
92use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
93use crate::task::cdc_progress::CdcTableBackfillState;
94
95pub(super) struct ManagedBarrierStateDebugInfo<'a> {
96 running_actors: BTreeSet<ActorId>,
97 graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
98}
99
100impl Display for ManagedBarrierStateDebugInfo<'_> {
101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102 write!(f, "running_actors: ")?;
103 for actor_id in &self.running_actors {
104 write!(f, "{}, ", actor_id)?;
105 }
106 for (partial_graph_id, graph_states) in self.graph_states {
107 writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
108 write!(f, "{}", graph_states)?;
109 }
110 Ok(())
111 }
112}
113
114impl Display for &'_ PartialGraphManagedBarrierState {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 let mut prev_epoch = 0u64;
117 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
118 write!(f, "> Epoch {}: ", epoch)?;
119 match &barrier_state.inner {
120 ManagedBarrierStateInner::Issued(state) => {
121 write!(
122 f,
123 "Issued [{:?}]. Remaining actors: [",
124 barrier_state.barrier.kind
125 )?;
126 let mut is_prev_epoch_issued = false;
127 if prev_epoch != 0 {
128 let bs = &self.epoch_barrier_state_map[&prev_epoch];
129 if let ManagedBarrierStateInner::Issued(IssuedState {
130 remaining_actors: remaining_actors_prev,
131 ..
132 }) = &bs.inner
133 {
134 is_prev_epoch_issued = true;
136 let mut duplicates = 0usize;
137 for actor_id in &state.remaining_actors {
138 if !remaining_actors_prev.contains(actor_id) {
139 write!(f, "{}, ", actor_id)?;
140 } else {
141 duplicates += 1;
142 }
143 }
144 if duplicates > 0 {
145 write!(f, "...and {} actors in prev epoch", duplicates)?;
146 }
147 }
148 }
149 if !is_prev_epoch_issued {
150 for actor_id in &state.remaining_actors {
151 write!(f, "{}, ", actor_id)?;
152 }
153 }
154 write!(f, "]")?;
155 }
156 ManagedBarrierStateInner::AllCollected { .. } => {
157 write!(f, "AllCollected")?;
158 }
159 }
160 prev_epoch = *epoch;
161 writeln!(f)?;
162 }
163
164 if !self.create_mview_progress.is_empty() {
165 writeln!(f, "Create MView Progress:")?;
166 for (epoch, progress) in &self.create_mview_progress {
167 write!(f, "> Epoch {}:", epoch)?;
168 for (actor_id, state) in progress {
169 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
170 }
171 }
172 }
173
174 Ok(())
175 }
176}
177
178enum InflightActorStatus {
179 IssuedFirst(Vec<Barrier>),
181 Running(u64),
183}
184
185impl InflightActorStatus {
186 fn max_issued_epoch(&self) -> u64 {
187 match self {
188 InflightActorStatus::Running(epoch) => *epoch,
189 InflightActorStatus::IssuedFirst(issued_barriers) => {
190 issued_barriers.last().expect("non-empty").epoch.prev
191 }
192 }
193 }
194}
195
196pub(crate) struct InflightActorState {
197 actor_id: ActorId,
198 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
199 pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
201 status: InflightActorStatus,
202 is_stopping: bool,
204
205 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
206 join_handle: JoinHandle<()>,
207 monitor_task_handle: Option<JoinHandle<()>>,
208}
209
210impl InflightActorState {
211 pub(super) fn start(
212 actor_id: ActorId,
213 initial_partial_graph_id: PartialGraphId,
214 initial_barrier: &Barrier,
215 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
216 join_handle: JoinHandle<()>,
217 monitor_task_handle: Option<JoinHandle<()>>,
218 ) -> Self {
219 Self {
220 actor_id,
221 barrier_senders: vec![],
222 inflight_barriers: BTreeMap::from_iter([(
223 initial_barrier.epoch.prev,
224 initial_partial_graph_id,
225 )]),
226 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
227 is_stopping: false,
228 new_output_request_tx,
229 join_handle,
230 monitor_task_handle,
231 }
232 }
233
234 pub(super) fn issue_barrier(
235 &mut self,
236 partial_graph_id: PartialGraphId,
237 barrier: &Barrier,
238 is_stop: bool,
239 ) -> StreamResult<()> {
240 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
241
242 for barrier_sender in &self.barrier_senders {
243 barrier_sender.send(barrier.clone()).map_err(|_| {
244 StreamError::barrier_send(
245 barrier.clone(),
246 self.actor_id,
247 "failed to send to registered sender",
248 )
249 })?;
250 }
251
252 assert!(
253 self.inflight_barriers
254 .insert(barrier.epoch.prev, partial_graph_id)
255 .is_none()
256 );
257
258 match &mut self.status {
259 InflightActorStatus::IssuedFirst(pending_barriers) => {
260 pending_barriers.push(barrier.clone());
261 }
262 InflightActorStatus::Running(prev_epoch) => {
263 *prev_epoch = barrier.epoch.prev;
264 }
265 };
266
267 if is_stop {
268 assert!(!self.is_stopping, "stopped actor should not issue barrier");
269 self.is_stopping = true;
270 }
271 Ok(())
272 }
273
274 pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
275 let (prev_epoch, prev_partial_graph_id) =
276 self.inflight_barriers.pop_first().expect("should exist");
277 assert_eq!(prev_epoch, epoch.prev);
278 match &self.status {
279 InflightActorStatus::IssuedFirst(pending_barriers) => {
280 assert_eq!(
281 prev_epoch,
282 pending_barriers.first().expect("non-empty").epoch.prev
283 );
284 self.status = InflightActorStatus::Running(
285 pending_barriers.last().expect("non-empty").epoch.prev,
286 );
287 }
288 InflightActorStatus::Running(_) => {}
289 }
290 (
291 prev_partial_graph_id,
292 self.inflight_barriers.is_empty() && self.is_stopping,
293 )
294 }
295}
296
297pub(crate) struct PartialGraphManagedBarrierState {
299 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
303
304 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
305
306 pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
314
315 pub(crate) list_finished_source_ids: HashMap<u64, HashSet<u32>>,
318
319 pub(crate) load_finished_source_ids: HashMap<u64, HashSet<u32>>,
322
323 pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
324
325 pub(crate) truncate_tables: HashMap<u64, HashSet<u32>>,
327 pub(crate) refresh_finished_tables: HashMap<u64, HashSet<u32>>,
330
331 state_store: StateStoreImpl,
332
333 streaming_metrics: Arc<StreamingMetrics>,
334}
335
336impl PartialGraphManagedBarrierState {
337 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
338 Self::new_inner(
339 actor_manager.env.state_store(),
340 actor_manager.streaming_metrics.clone(),
341 )
342 }
343
344 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
345 Self {
346 epoch_barrier_state_map: Default::default(),
347 prev_barrier_table_ids: None,
348 create_mview_progress: Default::default(),
349 list_finished_source_ids: Default::default(),
350 load_finished_source_ids: Default::default(),
351 cdc_table_backfill_progress: Default::default(),
352 truncate_tables: Default::default(),
353 refresh_finished_tables: Default::default(),
354 state_store,
355 streaming_metrics,
356 }
357 }
358
359 #[cfg(test)]
360 pub(crate) fn for_test() -> Self {
361 Self::new_inner(
362 StateStoreImpl::for_test(),
363 Arc::new(StreamingMetrics::unused()),
364 )
365 }
366
367 pub(super) fn is_empty(&self) -> bool {
368 self.epoch_barrier_state_map.is_empty()
369 }
370}
371
372pub(crate) struct SuspendedDatabaseState {
373 pub(super) suspend_time: Instant,
374 inner: DatabaseManagedBarrierState,
375 failure: Option<(Option<ActorId>, StreamError)>,
376}
377
378impl SuspendedDatabaseState {
379 fn new(
380 state: DatabaseManagedBarrierState,
381 failure: Option<(Option<ActorId>, StreamError)>,
382 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
384 Self {
385 suspend_time: Instant::now(),
386 inner: state,
387 failure,
388 }
389 }
390
391 async fn reset(mut self) -> ResetDatabaseOutput {
392 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
393 self.inner.abort_and_wait_actors().await;
394 if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
395 hummock.clear_tables(self.inner.table_ids).await;
396 }
397 ResetDatabaseOutput { root_err }
398 }
399}
400
401pub(crate) struct ResettingDatabaseState {
402 join_handle: JoinHandle<ResetDatabaseOutput>,
403 reset_request_id: u32,
404}
405
406pub(crate) struct ResetDatabaseOutput {
407 pub(crate) root_err: Option<ScoredStreamError>,
408}
409
410pub(crate) enum DatabaseStatus {
411 ReceivedExchangeRequest(
412 Vec<(
413 UpDownActorIds,
414 oneshot::Sender<StreamResult<permit::Receiver>>,
415 )>,
416 ),
417 Running(DatabaseManagedBarrierState),
418 Suspended(SuspendedDatabaseState),
419 Resetting(ResettingDatabaseState),
420 Unspecified,
422}
423
424impl DatabaseStatus {
425 pub(crate) async fn abort(&mut self) {
426 match self {
427 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
428 for (_, sender) in pending_requests.drain(..) {
429 let _ = sender.send(Err(anyhow!("database aborted").into()));
430 }
431 }
432 DatabaseStatus::Running(state) => {
433 state.abort_and_wait_actors().await;
434 }
435 DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
436 state.abort_and_wait_actors().await;
437 }
438 DatabaseStatus::Resetting(state) => {
439 (&mut state.join_handle)
440 .await
441 .expect("failed to join reset database join handle");
442 }
443 DatabaseStatus::Unspecified => {
444 unreachable!()
445 }
446 }
447 }
448
449 pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
450 match self {
451 DatabaseStatus::ReceivedExchangeRequest(_) => {
452 unreachable!("should not handle request")
453 }
454 DatabaseStatus::Running(state) => Some(state),
455 DatabaseStatus::Suspended(_) => None,
456 DatabaseStatus::Resetting(_) => {
457 unreachable!("should not receive further request during cleaning")
458 }
459 DatabaseStatus::Unspecified => {
460 unreachable!()
461 }
462 }
463 }
464
465 pub(super) fn poll_next_event(
466 &mut self,
467 cx: &mut Context<'_>,
468 ) -> Poll<ManagedBarrierStateEvent> {
469 match self {
470 DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
471 DatabaseStatus::Running(state) => state.poll_next_event(cx),
472 DatabaseStatus::Suspended(_) => Poll::Pending,
473 DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
474 let output = result.expect("should be able to join");
475 ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
476 }),
477 DatabaseStatus::Unspecified => {
478 unreachable!()
479 }
480 }
481 }
482
483 pub(super) fn suspend(
484 &mut self,
485 failed_actor: Option<ActorId>,
486 err: StreamError,
487 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
488 ) {
489 let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
490 *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
491 state,
492 Some((failed_actor, err)),
493 completing_futures,
494 ));
495 }
496
497 pub(super) fn start_reset(
498 &mut self,
499 database_id: DatabaseId,
500 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
501 reset_request_id: u32,
502 ) {
503 let join_handle = match replace(self, DatabaseStatus::Unspecified) {
504 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
505 for (_, sender) in pending_requests {
506 let _ = sender.send(Err(anyhow!("database reset").into()));
507 }
508 tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
509 }
510 DatabaseStatus::Running(state) => {
511 assert_eq!(database_id, state.database_id);
512 info!(
513 database_id = database_id.database_id,
514 reset_request_id, "start database reset from Running"
515 );
516 tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
517 }
518 DatabaseStatus::Suspended(state) => {
519 assert!(
520 completing_futures.is_none(),
521 "should have been clear when suspended"
522 );
523 assert_eq!(database_id, state.inner.database_id);
524 info!(
525 database_id = database_id.database_id,
526 reset_request_id,
527 suspend_elapsed = ?state.suspend_time.elapsed(),
528 "start database reset after suspended"
529 );
530 tokio::spawn(state.reset())
531 }
532 DatabaseStatus::Resetting(state) => {
533 let prev_request_id = state.reset_request_id;
534 info!(
535 database_id = database_id.database_id,
536 reset_request_id, prev_request_id, "receive duplicate reset request"
537 );
538 assert!(reset_request_id > prev_request_id);
539 state.join_handle
540 }
541 DatabaseStatus::Unspecified => {
542 unreachable!()
543 }
544 };
545 *self = DatabaseStatus::Resetting(ResettingDatabaseState {
546 join_handle,
547 reset_request_id,
548 });
549 }
550}
551
552#[derive(Default)]
553pub(crate) struct ManagedBarrierState {
554 pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
555}
556
557pub(super) enum ManagedBarrierStateEvent {
558 BarrierCollected {
559 partial_graph_id: PartialGraphId,
560 barrier: Barrier,
561 },
562 ActorError {
563 actor_id: ActorId,
564 err: StreamError,
565 },
566 DatabaseReset(ResetDatabaseOutput, u32),
567}
568
569impl ManagedBarrierState {
570 pub(super) fn next_event(
571 &mut self,
572 ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
573 poll_fn(|cx| {
574 for (database_id, database) in &mut self.databases {
575 if let Poll::Ready(event) = database.poll_next_event(cx) {
576 return Poll::Ready((*database_id, event));
577 }
578 }
579 Poll::Pending
580 })
581 }
582}
583
584pub(crate) struct DatabaseManagedBarrierState {
589 database_id: DatabaseId,
590 pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
591 pub(super) actor_pending_new_output_requests:
592 HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
593
594 pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
595
596 table_ids: HashSet<TableId>,
597
598 actor_manager: Arc<StreamActorManager>,
599
600 pub(super) local_barrier_manager: LocalBarrierManager,
601
602 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
603 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
604}
605
606impl DatabaseManagedBarrierState {
607 pub(super) fn new(
609 database_id: DatabaseId,
610 term_id: String,
611 actor_manager: Arc<StreamActorManager>,
612 ) -> Self {
613 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
614 LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
615 Self {
616 database_id,
617 actor_states: Default::default(),
618 actor_pending_new_output_requests: Default::default(),
619 graph_states: Default::default(),
620 table_ids: Default::default(),
621 actor_manager,
622 local_barrier_manager,
623 barrier_event_rx,
624 actor_failure_rx,
625 }
626 }
627
628 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
629 ManagedBarrierStateDebugInfo {
630 running_actors: self.actor_states.keys().cloned().collect(),
631 graph_states: &self.graph_states,
632 }
633 }
634
635 async fn abort_and_wait_actors(&mut self) {
636 for (actor_id, state) in &self.actor_states {
637 tracing::debug!("force stopping actor {}", actor_id);
638 state.join_handle.abort();
639 if let Some(monitor_task_handle) = &state.monitor_task_handle {
640 monitor_task_handle.abort();
641 }
642 }
643
644 for (actor_id, state) in self.actor_states.drain() {
645 tracing::debug!("join actor {}", actor_id);
646 let result = state.join_handle.await;
647 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
648 }
649 }
650}
651
652impl InflightActorState {
653 pub(super) fn register_barrier_sender(
654 &mut self,
655 tx: mpsc::UnboundedSender<Barrier>,
656 ) -> StreamResult<()> {
657 match &self.status {
658 InflightActorStatus::IssuedFirst(pending_barriers) => {
659 for barrier in pending_barriers {
660 tx.send(barrier.clone()).map_err(|_| {
661 StreamError::barrier_send(
662 barrier.clone(),
663 self.actor_id,
664 "failed to send pending barriers to newly registered sender",
665 )
666 })?;
667 }
668 self.barrier_senders.push(tx);
669 }
670 InflightActorStatus::Running(_) => {
671 unreachable!("should not register barrier sender when entering Running status")
672 }
673 }
674 Ok(())
675 }
676}
677
678impl DatabaseManagedBarrierState {
679 pub(super) fn register_barrier_sender(
680 &mut self,
681 actor_id: ActorId,
682 tx: mpsc::UnboundedSender<Barrier>,
683 ) -> StreamResult<()> {
684 self.actor_states
685 .get_mut(&actor_id)
686 .expect("should exist")
687 .register_barrier_sender(tx)
688 }
689}
690
691impl DatabaseManagedBarrierState {
692 pub(super) fn transform_to_issued(
693 &mut self,
694 barrier: &Barrier,
695 request: InjectBarrierRequest,
696 ) -> StreamResult<()> {
697 let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
698 let actor_to_stop = barrier.all_stop_actors();
699 let is_stop_actor = |actor_id| {
700 actor_to_stop
701 .map(|actors| actors.contains(&actor_id))
702 .unwrap_or(false)
703 };
704 let graph_state = self
705 .graph_states
706 .get_mut(&partial_graph_id)
707 .expect("should exist");
708
709 let table_ids =
710 HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
711 self.table_ids.extend(table_ids.iter().cloned());
712
713 graph_state.transform_to_issued(
714 barrier,
715 request.actor_ids_to_collect.iter().cloned(),
716 table_ids,
717 );
718
719 let mut new_actors = HashSet::new();
720 for (node, fragment_id, actor) in
721 request
722 .actors_to_build
723 .into_iter()
724 .flat_map(|fragment_actors| {
725 let node = Arc::new(fragment_actors.node.unwrap());
726 fragment_actors
727 .actors
728 .into_iter()
729 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
730 })
731 {
732 let actor_id = actor.actor_id;
733 assert!(!is_stop_actor(actor_id));
734 assert!(new_actors.insert(actor_id));
735 assert!(request.actor_ids_to_collect.contains(&actor_id));
736 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
737 if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
738 {
739 for request in pending_requests {
740 let _ = new_output_request_tx.send(request);
741 }
742 }
743 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
744 actor,
745 fragment_id,
746 node,
747 self.local_barrier_manager.clone(),
748 new_output_request_rx,
749 );
750 assert!(
751 self.actor_states
752 .try_insert(
753 actor_id,
754 InflightActorState::start(
755 actor_id,
756 partial_graph_id,
757 barrier,
758 new_output_request_tx,
759 join_handle,
760 monitor_join_handle
761 )
762 )
763 .is_ok()
764 );
765 }
766
767 if cfg!(test) {
771 for actor_id in &request.actor_ids_to_collect {
772 if !self.actor_states.contains_key(actor_id) {
773 let (tx, rx) = unbounded_channel();
774 let join_handle = self.actor_manager.runtime.spawn(async move {
775 let _ = rx;
777 pending().await
778 });
779 assert!(
780 self.actor_states
781 .try_insert(
782 *actor_id,
783 InflightActorState::start(
784 *actor_id,
785 partial_graph_id,
786 barrier,
787 tx,
788 join_handle,
789 None,
790 )
791 )
792 .is_ok()
793 );
794 new_actors.insert(*actor_id);
795 }
796 }
797 }
798
799 for actor_id in &request.actor_ids_to_collect {
802 if new_actors.contains(actor_id) {
803 continue;
804 }
805 self.actor_states
806 .get_mut(actor_id)
807 .unwrap_or_else(|| {
808 panic!(
809 "should exist: {} {:?}",
810 actor_id, request.actor_ids_to_collect
811 );
812 })
813 .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
814 }
815
816 Ok(())
817 }
818
819 pub(super) fn new_actor_remote_output_request(
820 &mut self,
821 actor_id: ActorId,
822 upstream_actor_id: ActorId,
823 result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
824 ) {
825 let (tx, rx) = channel_from_config(self.local_barrier_manager.env.config());
826 self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
827 let _ = result_sender.send(Ok(rx));
828 }
829
830 pub(super) fn new_actor_output_request(
831 &mut self,
832 actor_id: ActorId,
833 upstream_actor_id: ActorId,
834 request: NewOutputRequest,
835 ) {
836 if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
837 let _ = actor.new_output_request_tx.send((actor_id, request));
838 } else {
839 self.actor_pending_new_output_requests
840 .entry(upstream_actor_id)
841 .or_default()
842 .push((actor_id, request));
843 }
844 }
845
846 pub(super) fn poll_next_event(
848 &mut self,
849 cx: &mut Context<'_>,
850 ) -> Poll<ManagedBarrierStateEvent> {
851 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
852 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
853 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
854 }
855 for (partial_graph_id, graph_state) in &mut self.graph_states {
857 if let Some(barrier) = graph_state.may_have_collected_all() {
858 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
859 partial_graph_id: *partial_graph_id,
860 barrier,
861 });
862 }
863 }
864 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
865 match event.expect("non-empty when tx in local_barrier_manager") {
866 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
867 if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
868 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
869 partial_graph_id,
870 barrier,
871 });
872 }
873 }
874 LocalBarrierEvent::ReportCreateProgress {
875 epoch,
876 actor,
877 state,
878 } => {
879 self.update_create_mview_progress(epoch, actor, state);
880 }
881 LocalBarrierEvent::ReportSourceListFinished {
882 epoch,
883 actor_id,
884 table_id,
885 associated_source_id,
886 } => {
887 self.report_source_list_finished(
888 epoch,
889 actor_id,
890 table_id,
891 associated_source_id,
892 );
893 }
894 LocalBarrierEvent::ReportSourceLoadFinished {
895 epoch,
896 actor_id,
897 table_id,
898 associated_source_id,
899 } => {
900 self.report_source_load_finished(
901 epoch,
902 actor_id,
903 table_id,
904 associated_source_id,
905 );
906 }
907 LocalBarrierEvent::RefreshFinished {
908 epoch,
909 actor_id,
910 table_id,
911 staging_table_id,
912 } => {
913 self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
914 }
915 LocalBarrierEvent::RegisterBarrierSender {
916 actor_id,
917 barrier_sender,
918 } => {
919 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
920 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
921 }
922 }
923 LocalBarrierEvent::RegisterLocalUpstreamOutput {
924 actor_id,
925 upstream_actor_id,
926 tx,
927 } => {
928 self.new_actor_output_request(
929 actor_id,
930 upstream_actor_id,
931 NewOutputRequest::Local(tx),
932 );
933 }
934 LocalBarrierEvent::ReportCdcTableBackfillProgress {
935 actor_id,
936 epoch,
937 state,
938 } => {
939 self.update_cdc_table_backfill_progress(epoch, actor_id, state);
940 }
941 }
942 }
943
944 debug_assert!(
945 self.graph_states
946 .values_mut()
947 .all(|graph_state| graph_state.may_have_collected_all().is_none())
948 );
949 Poll::Pending
950 }
951}
952
953impl DatabaseManagedBarrierState {
954 #[must_use]
955 pub(super) fn collect(
956 &mut self,
957 actor_id: ActorId,
958 epoch: EpochPair,
959 ) -> Option<(PartialGraphId, Barrier)> {
960 let (prev_partial_graph_id, is_finished) = self
961 .actor_states
962 .get_mut(&actor_id)
963 .expect("should exist")
964 .collect(epoch);
965 if is_finished {
966 let state = self.actor_states.remove(&actor_id).expect("should exist");
967 if let Some(monitor_task_handle) = state.monitor_task_handle {
968 monitor_task_handle.abort();
969 }
970 }
971 let prev_graph_state = self
972 .graph_states
973 .get_mut(&prev_partial_graph_id)
974 .expect("should exist");
975 prev_graph_state.collect(actor_id, epoch);
976 prev_graph_state
977 .may_have_collected_all()
978 .map(|barrier| (prev_partial_graph_id, barrier))
979 }
980
981 #[allow(clippy::type_complexity)]
982 pub(super) fn pop_barrier_to_complete(
983 &mut self,
984 partial_graph_id: PartialGraphId,
985 prev_epoch: u64,
986 ) -> BarrierToComplete {
987 self.graph_states
988 .get_mut(&partial_graph_id)
989 .expect("should exist")
990 .pop_barrier_to_complete(prev_epoch)
991 }
992
993 async fn try_find_root_actor_failure(
997 &mut self,
998 first_failure: Option<(Option<ActorId>, StreamError)>,
999 ) -> Option<ScoredStreamError> {
1000 let mut later_errs = vec![];
1001 let _ = tokio::time::timeout(Duration::from_secs(3), async {
1003 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1004 if let Some((Some(failed_actor), _)) = &first_failure {
1005 uncollected_actors.remove(failed_actor);
1006 }
1007 while !uncollected_actors.is_empty()
1008 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1009 {
1010 uncollected_actors.remove(&actor_id);
1011 later_errs.push(error);
1012 }
1013 })
1014 .await;
1015
1016 first_failure
1017 .into_iter()
1018 .map(|(_, err)| err)
1019 .chain(later_errs.into_iter())
1020 .map(|e| e.with_score())
1021 .max_by_key(|e| e.score)
1022 }
1023
1024 pub(super) fn report_source_list_finished(
1026 &mut self,
1027 epoch: EpochPair,
1028 actor_id: ActorId,
1029 _table_id: u32,
1030 associated_source_id: u32,
1031 ) {
1032 if let Some(actor_state) = self.actor_states.get(&actor_id)
1034 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1035 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1036 {
1037 graph_state
1038 .list_finished_source_ids
1039 .entry(epoch.curr)
1040 .or_default()
1041 .insert(associated_source_id);
1042 } else {
1043 warn!(
1044 ?epoch,
1045 actor_id, associated_source_id, "ignore source list finished"
1046 );
1047 }
1048 }
1049
1050 pub(super) fn report_source_load_finished(
1052 &mut self,
1053 epoch: EpochPair,
1054 actor_id: ActorId,
1055 _table_id: u32,
1056 associated_source_id: u32,
1057 ) {
1058 if let Some(actor_state) = self.actor_states.get(&actor_id)
1060 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1061 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1062 {
1063 graph_state
1064 .load_finished_source_ids
1065 .entry(epoch.curr)
1066 .or_default()
1067 .insert(associated_source_id);
1068 } else {
1069 warn!(
1070 ?epoch,
1071 actor_id, associated_source_id, "ignore source load finished"
1072 );
1073 }
1074 }
1075
1076 pub(super) fn report_refresh_finished(
1078 &mut self,
1079 epoch: EpochPair,
1080 actor_id: ActorId,
1081 table_id: u32,
1082 staging_table_id: u32,
1083 ) {
1084 let Some(actor_state) = self.actor_states.get(&actor_id) else {
1086 warn!(
1087 ?epoch,
1088 actor_id, table_id, "ignore refresh finished table: actor_state not found"
1089 );
1090 return;
1091 };
1092 let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) else {
1093 let inflight_barriers = actor_state.inflight_barriers.keys().collect::<Vec<_>>();
1094 warn!(
1095 ?epoch,
1096 actor_id,
1097 table_id,
1098 ?inflight_barriers,
1099 "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1100 );
1101 return;
1102 };
1103 let Some(graph_state) = self.graph_states.get_mut(partial_graph_id) else {
1104 warn!(
1105 ?epoch,
1106 actor_id, table_id, "ignore refresh finished table: graph_state not found"
1107 );
1108 return;
1109 };
1110 graph_state
1111 .refresh_finished_tables
1112 .entry(epoch.curr)
1113 .or_default()
1114 .insert(table_id);
1115 graph_state
1116 .truncate_tables
1117 .entry(epoch.curr)
1118 .or_default()
1119 .insert(staging_table_id);
1120 }
1121}
1122
1123impl PartialGraphManagedBarrierState {
1124 fn may_have_collected_all(&mut self) -> Option<Barrier> {
1128 for barrier_state in self.epoch_barrier_state_map.values_mut() {
1129 match &barrier_state.inner {
1130 ManagedBarrierStateInner::Issued(IssuedState {
1131 remaining_actors, ..
1132 }) if remaining_actors.is_empty() => {}
1133 ManagedBarrierStateInner::AllCollected { .. } => {
1134 continue;
1135 }
1136 ManagedBarrierStateInner::Issued(_) => {
1137 break;
1138 }
1139 }
1140
1141 self.streaming_metrics.barrier_manager_progress.inc();
1142
1143 let create_mview_progress = self
1144 .create_mview_progress
1145 .remove(&barrier_state.barrier.epoch.curr)
1146 .unwrap_or_default()
1147 .into_iter()
1148 .map(|(actor, state)| state.to_pb(actor))
1149 .collect();
1150
1151 let list_finished_source_ids = self
1152 .list_finished_source_ids
1153 .remove(&barrier_state.barrier.epoch.curr)
1154 .unwrap_or_default()
1155 .into_iter()
1156 .collect();
1157
1158 let load_finished_source_ids = self
1159 .load_finished_source_ids
1160 .remove(&barrier_state.barrier.epoch.curr)
1161 .unwrap_or_default()
1162 .into_iter()
1163 .collect();
1164
1165 let cdc_table_backfill_progress = self
1166 .cdc_table_backfill_progress
1167 .remove(&barrier_state.barrier.epoch.curr)
1168 .unwrap_or_default()
1169 .into_iter()
1170 .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1171 .collect();
1172
1173 let truncate_tables = self
1174 .truncate_tables
1175 .remove(&barrier_state.barrier.epoch.curr)
1176 .unwrap_or_default()
1177 .into_iter()
1178 .collect();
1179
1180 let refresh_finished_tables = self
1181 .refresh_finished_tables
1182 .remove(&barrier_state.barrier.epoch.curr)
1183 .unwrap_or_default()
1184 .into_iter()
1185 .collect();
1186 let prev_state = replace(
1187 &mut barrier_state.inner,
1188 ManagedBarrierStateInner::AllCollected {
1189 create_mview_progress,
1190 list_finished_source_ids,
1191 load_finished_source_ids,
1192 truncate_tables,
1193 refresh_finished_tables,
1194 cdc_table_backfill_progress,
1195 },
1196 );
1197
1198 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1199 barrier_inflight_latency: timer,
1200 ..
1201 }) => {
1202 timer.observe_duration();
1203 });
1204
1205 return Some(barrier_state.barrier.clone());
1206 }
1207 None
1208 }
1209
1210 fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1211 let (popped_prev_epoch, barrier_state) = self
1212 .epoch_barrier_state_map
1213 .pop_first()
1214 .expect("should exist");
1215
1216 assert_eq!(prev_epoch, popped_prev_epoch);
1217
1218 let (
1219 create_mview_progress,
1220 list_finished_source_ids,
1221 load_finished_source_ids,
1222 cdc_table_backfill_progress,
1223 truncate_tables,
1224 refresh_finished_tables,
1225 ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1226 create_mview_progress,
1227 list_finished_source_ids,
1228 load_finished_source_ids,
1229 truncate_tables,
1230 refresh_finished_tables,
1231 cdc_table_backfill_progress,
1232 } => {
1233 (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, truncate_tables, refresh_finished_tables)
1234 });
1235 BarrierToComplete {
1236 barrier: barrier_state.barrier,
1237 table_ids: barrier_state.table_ids,
1238 create_mview_progress,
1239 list_finished_source_ids,
1240 load_finished_source_ids,
1241 truncate_tables,
1242 refresh_finished_tables,
1243 cdc_table_backfill_progress,
1244 }
1245 }
1246}
1247
1248pub(crate) struct BarrierToComplete {
1249 pub barrier: Barrier,
1250 pub table_ids: Option<HashSet<TableId>>,
1251 pub create_mview_progress: Vec<PbCreateMviewProgress>,
1252 pub list_finished_source_ids: Vec<u32>,
1253 pub load_finished_source_ids: Vec<u32>,
1254 pub truncate_tables: Vec<u32>,
1255 pub refresh_finished_tables: Vec<u32>,
1256 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1257}
1258
1259impl PartialGraphManagedBarrierState {
1260 pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1262 tracing::debug!(
1263 target: "events::stream::barrier::manager::collect",
1264 ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1265 "collect_barrier",
1266 );
1267
1268 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1269 None => {
1270 panic!(
1274 "cannot collect new actor barrier {:?} at current state: None",
1275 epoch,
1276 )
1277 }
1278 Some(&mut BarrierState {
1279 ref barrier,
1280 inner:
1281 ManagedBarrierStateInner::Issued(IssuedState {
1282 ref mut remaining_actors,
1283 ..
1284 }),
1285 ..
1286 }) => {
1287 let exist = remaining_actors.remove(&actor_id);
1288 assert!(
1289 exist,
1290 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1291 actor_id, epoch.curr
1292 );
1293 assert_eq!(barrier.epoch.curr, epoch.curr);
1294 }
1295 Some(BarrierState { inner, .. }) => {
1296 panic!(
1297 "cannot collect new actor barrier {:?} at current state: {:?}",
1298 epoch, inner
1299 )
1300 }
1301 }
1302 }
1303
1304 pub(super) fn transform_to_issued(
1307 &mut self,
1308 barrier: &Barrier,
1309 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1310 table_ids: HashSet<TableId>,
1311 ) {
1312 let timer = self
1313 .streaming_metrics
1314 .barrier_inflight_latency
1315 .start_timer();
1316
1317 if let Some(hummock) = self.state_store.as_hummock() {
1318 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1319 }
1320
1321 let table_ids = match barrier.kind {
1322 BarrierKind::Unspecified => {
1323 unreachable!()
1324 }
1325 BarrierKind::Initial => {
1326 assert!(
1327 self.prev_barrier_table_ids.is_none(),
1328 "non empty table_ids at initial barrier: {:?}",
1329 self.prev_barrier_table_ids
1330 );
1331 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1332 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1333 None
1334 }
1335 BarrierKind::Barrier => {
1336 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1337 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1338 assert_eq!(prev_table_ids, &table_ids);
1339 *prev_epoch = barrier.epoch;
1340 } else {
1341 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1342 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1343 }
1344 None
1345 }
1346 BarrierKind::Checkpoint => Some(
1347 if let Some((prev_epoch, prev_table_ids)) = self
1348 .prev_barrier_table_ids
1349 .replace((barrier.epoch, table_ids))
1350 && prev_epoch.curr == barrier.epoch.prev
1351 {
1352 prev_table_ids
1353 } else {
1354 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1355 HashSet::new()
1356 },
1357 ),
1358 };
1359
1360 if let Some(&mut BarrierState { ref inner, .. }) =
1361 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1362 {
1363 {
1364 panic!(
1365 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1366 barrier.epoch, inner
1367 );
1368 }
1369 };
1370
1371 self.epoch_barrier_state_map.insert(
1372 barrier.epoch.prev,
1373 BarrierState {
1374 barrier: barrier.clone(),
1375 inner: ManagedBarrierStateInner::Issued(IssuedState {
1376 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1377 barrier_inflight_latency: timer,
1378 }),
1379 table_ids,
1380 },
1381 );
1382 }
1383
1384 #[cfg(test)]
1385 async fn pop_next_completed_epoch(&mut self) -> u64 {
1386 if let Some(barrier) = self.may_have_collected_all() {
1387 self.pop_barrier_to_complete(barrier.epoch.prev);
1388 return barrier.epoch.prev;
1389 }
1390 pending().await
1391 }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396 use std::collections::HashSet;
1397
1398 use risingwave_common::util::epoch::test_epoch;
1399
1400 use crate::executor::Barrier;
1401 use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1402
1403 #[tokio::test]
1404 async fn test_managed_state_add_actor() {
1405 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1406 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1407 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1408 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1409 let actor_ids_to_collect1 = HashSet::from([1, 2]);
1410 let actor_ids_to_collect2 = HashSet::from([1, 2]);
1411 let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1412 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1413 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1414 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1415 managed_barrier_state.collect(1, barrier1.epoch);
1416 managed_barrier_state.collect(2, barrier1.epoch);
1417 assert_eq!(
1418 managed_barrier_state.pop_next_completed_epoch().await,
1419 test_epoch(0)
1420 );
1421 assert_eq!(
1422 managed_barrier_state
1423 .epoch_barrier_state_map
1424 .first_key_value()
1425 .unwrap()
1426 .0,
1427 &test_epoch(1)
1428 );
1429 managed_barrier_state.collect(1, barrier2.epoch);
1430 managed_barrier_state.collect(1, barrier3.epoch);
1431 managed_barrier_state.collect(2, barrier2.epoch);
1432 assert_eq!(
1433 managed_barrier_state.pop_next_completed_epoch().await,
1434 test_epoch(1)
1435 );
1436 assert_eq!(
1437 managed_barrier_state
1438 .epoch_barrier_state_map
1439 .first_key_value()
1440 .unwrap()
1441 .0,
1442 &test_epoch(2)
1443 );
1444 managed_barrier_state.collect(2, barrier3.epoch);
1445 managed_barrier_state.collect(3, barrier3.epoch);
1446 assert_eq!(
1447 managed_barrier_state.pop_next_completed_epoch().await,
1448 test_epoch(2)
1449 );
1450 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1451 }
1452
1453 #[tokio::test]
1454 async fn test_managed_state_stop_actor() {
1455 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1456 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1457 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1458 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1459 let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1460 let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1461 let actor_ids_to_collect3 = HashSet::from([1, 2]);
1462 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1463 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1464 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1465
1466 managed_barrier_state.collect(1, barrier1.epoch);
1467 managed_barrier_state.collect(1, barrier2.epoch);
1468 managed_barrier_state.collect(1, barrier3.epoch);
1469 managed_barrier_state.collect(2, barrier1.epoch);
1470 managed_barrier_state.collect(2, barrier2.epoch);
1471 managed_barrier_state.collect(2, barrier3.epoch);
1472 assert_eq!(
1473 managed_barrier_state
1474 .epoch_barrier_state_map
1475 .first_key_value()
1476 .unwrap()
1477 .0,
1478 &0
1479 );
1480 managed_barrier_state.collect(3, barrier1.epoch);
1481 managed_barrier_state.collect(3, barrier2.epoch);
1482 assert_eq!(
1483 managed_barrier_state
1484 .epoch_barrier_state_map
1485 .first_key_value()
1486 .unwrap()
1487 .0,
1488 &0
1489 );
1490 managed_barrier_state.collect(4, barrier1.epoch);
1491 assert_eq!(
1492 managed_barrier_state.pop_next_completed_epoch().await,
1493 test_epoch(0)
1494 );
1495 assert_eq!(
1496 managed_barrier_state.pop_next_completed_epoch().await,
1497 test_epoch(1)
1498 );
1499 assert_eq!(
1500 managed_barrier_state.pop_next_completed_epoch().await,
1501 test_epoch(2)
1502 );
1503 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1504 }
1505}