1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::FutureExt;
25use futures::stream::FuturesOrdered;
26use prometheus::HistogramTimer;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::id::SourceId;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::{
32 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
33};
34use risingwave_storage::StateStoreImpl;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
36use tokio::sync::{mpsc, oneshot};
37use tokio::task::JoinHandle;
38
39use crate::error::{StreamError, StreamResult};
40use crate::executor::Barrier;
41use crate::executor::monitor::StreamingMetrics;
42use crate::task::progress::BackfillState;
43use crate::task::{
44 ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
45 StreamActorManager, UpDownActorIds,
46};
47
48struct IssuedState {
49 pub remaining_actors: BTreeSet<ActorId>,
51
52 pub barrier_inflight_latency: HistogramTimer,
53}
54
55impl Debug for IssuedState {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("IssuedState")
58 .field("remaining_actors", &self.remaining_actors)
59 .finish()
60 }
61}
62
63#[derive(Debug)]
65enum ManagedBarrierStateInner {
66 Issued(IssuedState),
68
69 AllCollected {
71 create_mview_progress: Vec<PbCreateMviewProgress>,
72 list_finished_source_ids: Vec<PbListFinishedSource>,
73 load_finished_source_ids: Vec<PbLoadFinishedSource>,
74 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
75 truncate_tables: Vec<TableId>,
76 refresh_finished_tables: Vec<TableId>,
77 },
78}
79
80#[derive(Debug)]
81struct BarrierState {
82 barrier: Barrier,
83 table_ids: Option<HashSet<TableId>>,
85 inner: ManagedBarrierStateInner,
86}
87
88use risingwave_common::must_match;
89use risingwave_pb::id::FragmentId;
90use risingwave_pb::stream_service::InjectBarrierRequest;
91
92use crate::executor::exchange::permit;
93use crate::executor::exchange::permit::channel_from_config;
94use crate::task::barrier_worker::ScoredStreamError;
95use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
96use crate::task::cdc_progress::CdcTableBackfillState;
97
98pub(super) struct ManagedBarrierStateDebugInfo<'a> {
99 running_actors: BTreeSet<ActorId>,
100 graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
101}
102
103impl Display for ManagedBarrierStateDebugInfo<'_> {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 write!(f, "running_actors: ")?;
106 for actor_id in &self.running_actors {
107 write!(f, "{}, ", actor_id)?;
108 }
109 for (partial_graph_id, graph_states) in self.graph_states {
110 writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
111 write!(f, "{}", graph_states)?;
112 }
113 Ok(())
114 }
115}
116
117impl Display for &'_ PartialGraphManagedBarrierState {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 let mut prev_epoch = 0u64;
120 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
121 write!(f, "> Epoch {}: ", epoch)?;
122 match &barrier_state.inner {
123 ManagedBarrierStateInner::Issued(state) => {
124 write!(
125 f,
126 "Issued [{:?}]. Remaining actors: [",
127 barrier_state.barrier.kind
128 )?;
129 let mut is_prev_epoch_issued = false;
130 if prev_epoch != 0 {
131 let bs = &self.epoch_barrier_state_map[&prev_epoch];
132 if let ManagedBarrierStateInner::Issued(IssuedState {
133 remaining_actors: remaining_actors_prev,
134 ..
135 }) = &bs.inner
136 {
137 is_prev_epoch_issued = true;
139 let mut duplicates = 0usize;
140 for actor_id in &state.remaining_actors {
141 if !remaining_actors_prev.contains(actor_id) {
142 write!(f, "{}, ", actor_id)?;
143 } else {
144 duplicates += 1;
145 }
146 }
147 if duplicates > 0 {
148 write!(f, "...and {} actors in prev epoch", duplicates)?;
149 }
150 }
151 }
152 if !is_prev_epoch_issued {
153 for actor_id in &state.remaining_actors {
154 write!(f, "{}, ", actor_id)?;
155 }
156 }
157 write!(f, "]")?;
158 }
159 ManagedBarrierStateInner::AllCollected { .. } => {
160 write!(f, "AllCollected")?;
161 }
162 }
163 prev_epoch = *epoch;
164 writeln!(f)?;
165 }
166
167 if !self.create_mview_progress.is_empty() {
168 writeln!(f, "Create MView Progress:")?;
169 for (epoch, progress) in &self.create_mview_progress {
170 write!(f, "> Epoch {}:", epoch)?;
171 for (actor_id, (_, state)) in progress {
172 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
173 }
174 }
175 }
176
177 Ok(())
178 }
179}
180
181enum InflightActorStatus {
182 IssuedFirst(Vec<Barrier>),
184 Running(u64),
186}
187
188impl InflightActorStatus {
189 fn max_issued_epoch(&self) -> u64 {
190 match self {
191 InflightActorStatus::Running(epoch) => *epoch,
192 InflightActorStatus::IssuedFirst(issued_barriers) => {
193 issued_barriers.last().expect("non-empty").epoch.prev
194 }
195 }
196 }
197}
198
199pub(crate) struct InflightActorState {
200 actor_id: ActorId,
201 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
202 pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
204 status: InflightActorStatus,
205 is_stopping: bool,
207
208 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
209 join_handle: JoinHandle<()>,
210 monitor_task_handle: Option<JoinHandle<()>>,
211}
212
213impl InflightActorState {
214 pub(super) fn start(
215 actor_id: ActorId,
216 initial_partial_graph_id: PartialGraphId,
217 initial_barrier: &Barrier,
218 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
219 join_handle: JoinHandle<()>,
220 monitor_task_handle: Option<JoinHandle<()>>,
221 ) -> Self {
222 Self {
223 actor_id,
224 barrier_senders: vec![],
225 inflight_barriers: BTreeMap::from_iter([(
226 initial_barrier.epoch.prev,
227 initial_partial_graph_id,
228 )]),
229 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
230 is_stopping: false,
231 new_output_request_tx,
232 join_handle,
233 monitor_task_handle,
234 }
235 }
236
237 pub(super) fn issue_barrier(
238 &mut self,
239 partial_graph_id: PartialGraphId,
240 barrier: &Barrier,
241 is_stop: bool,
242 ) -> StreamResult<()> {
243 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
244
245 for barrier_sender in &self.barrier_senders {
246 barrier_sender.send(barrier.clone()).map_err(|_| {
247 StreamError::barrier_send(
248 barrier.clone(),
249 self.actor_id,
250 "failed to send to registered sender",
251 )
252 })?;
253 }
254
255 assert!(
256 self.inflight_barriers
257 .insert(barrier.epoch.prev, partial_graph_id)
258 .is_none()
259 );
260
261 match &mut self.status {
262 InflightActorStatus::IssuedFirst(pending_barriers) => {
263 pending_barriers.push(barrier.clone());
264 }
265 InflightActorStatus::Running(prev_epoch) => {
266 *prev_epoch = barrier.epoch.prev;
267 }
268 };
269
270 if is_stop {
271 assert!(!self.is_stopping, "stopped actor should not issue barrier");
272 self.is_stopping = true;
273 }
274 Ok(())
275 }
276
277 pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
278 let (prev_epoch, prev_partial_graph_id) =
279 self.inflight_barriers.pop_first().expect("should exist");
280 assert_eq!(prev_epoch, epoch.prev);
281 match &self.status {
282 InflightActorStatus::IssuedFirst(pending_barriers) => {
283 assert_eq!(
284 prev_epoch,
285 pending_barriers.first().expect("non-empty").epoch.prev
286 );
287 self.status = InflightActorStatus::Running(
288 pending_barriers.last().expect("non-empty").epoch.prev,
289 );
290 }
291 InflightActorStatus::Running(_) => {}
292 }
293 (
294 prev_partial_graph_id,
295 self.inflight_barriers.is_empty() && self.is_stopping,
296 )
297 }
298}
299
300pub(crate) struct PartialGraphManagedBarrierState {
302 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
306
307 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
308
309 pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, (FragmentId, BackfillState)>>,
317
318 pub(crate) list_finished_source_ids: HashMap<u64, Vec<PbListFinishedSource>>,
322
323 pub(crate) load_finished_source_ids: HashMap<u64, Vec<PbLoadFinishedSource>>,
327
328 pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
329
330 pub(crate) truncate_tables: HashMap<u64, HashSet<TableId>>,
332 pub(crate) refresh_finished_tables: HashMap<u64, HashSet<TableId>>,
335
336 state_store: StateStoreImpl,
337
338 streaming_metrics: Arc<StreamingMetrics>,
339}
340
341impl PartialGraphManagedBarrierState {
342 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
343 Self::new_inner(
344 actor_manager.env.state_store(),
345 actor_manager.streaming_metrics.clone(),
346 )
347 }
348
349 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
350 Self {
351 epoch_barrier_state_map: Default::default(),
352 prev_barrier_table_ids: None,
353 create_mview_progress: Default::default(),
354 list_finished_source_ids: Default::default(),
355 load_finished_source_ids: Default::default(),
356 cdc_table_backfill_progress: Default::default(),
357 truncate_tables: Default::default(),
358 refresh_finished_tables: Default::default(),
359 state_store,
360 streaming_metrics,
361 }
362 }
363
364 #[cfg(test)]
365 pub(crate) fn for_test() -> Self {
366 Self::new_inner(
367 StateStoreImpl::for_test(),
368 Arc::new(StreamingMetrics::unused()),
369 )
370 }
371
372 pub(super) fn is_empty(&self) -> bool {
373 self.epoch_barrier_state_map.is_empty()
374 }
375}
376
377pub(crate) struct SuspendedDatabaseState {
378 pub(super) suspend_time: Instant,
379 inner: DatabaseManagedBarrierState,
380 failure: Option<(Option<ActorId>, StreamError)>,
381}
382
383impl SuspendedDatabaseState {
384 fn new(
385 state: DatabaseManagedBarrierState,
386 failure: Option<(Option<ActorId>, StreamError)>,
387 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
389 Self {
390 suspend_time: Instant::now(),
391 inner: state,
392 failure,
393 }
394 }
395
396 async fn reset(mut self) -> ResetDatabaseOutput {
397 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
398 self.inner.abort_and_wait_actors().await;
399 if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
400 hummock.clear_tables(self.inner.table_ids).await;
401 }
402 ResetDatabaseOutput { root_err }
403 }
404}
405
406pub(crate) struct ResettingDatabaseState {
407 join_handle: JoinHandle<ResetDatabaseOutput>,
408 reset_request_id: u32,
409}
410
411pub(crate) struct ResetDatabaseOutput {
412 pub(crate) root_err: Option<ScoredStreamError>,
413}
414
415pub(crate) enum DatabaseStatus {
416 ReceivedExchangeRequest(
417 Vec<(
418 UpDownActorIds,
419 oneshot::Sender<StreamResult<permit::Receiver>>,
420 )>,
421 ),
422 Running(DatabaseManagedBarrierState),
423 Suspended(SuspendedDatabaseState),
424 Resetting(ResettingDatabaseState),
425 Unspecified,
427}
428
429impl DatabaseStatus {
430 pub(crate) async fn abort(&mut self) {
431 match self {
432 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
433 for (_, sender) in pending_requests.drain(..) {
434 let _ = sender.send(Err(anyhow!("database aborted").into()));
435 }
436 }
437 DatabaseStatus::Running(state) => {
438 state.abort_and_wait_actors().await;
439 }
440 DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
441 state.abort_and_wait_actors().await;
442 }
443 DatabaseStatus::Resetting(state) => {
444 (&mut state.join_handle)
445 .await
446 .expect("failed to join reset database join handle");
447 }
448 DatabaseStatus::Unspecified => {
449 unreachable!()
450 }
451 }
452 }
453
454 pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
455 match self {
456 DatabaseStatus::ReceivedExchangeRequest(_) => {
457 unreachable!("should not handle request")
458 }
459 DatabaseStatus::Running(state) => Some(state),
460 DatabaseStatus::Suspended(_) => None,
461 DatabaseStatus::Resetting(_) => {
462 unreachable!("should not receive further request during cleaning")
463 }
464 DatabaseStatus::Unspecified => {
465 unreachable!()
466 }
467 }
468 }
469
470 pub(super) fn poll_next_event(
471 &mut self,
472 cx: &mut Context<'_>,
473 ) -> Poll<ManagedBarrierStateEvent> {
474 match self {
475 DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
476 DatabaseStatus::Running(state) => state.poll_next_event(cx),
477 DatabaseStatus::Suspended(_) => Poll::Pending,
478 DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
479 let output = result.expect("should be able to join");
480 ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
481 }),
482 DatabaseStatus::Unspecified => {
483 unreachable!()
484 }
485 }
486 }
487
488 pub(super) fn suspend(
489 &mut self,
490 failed_actor: Option<ActorId>,
491 err: StreamError,
492 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
493 ) {
494 let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
495 *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
496 state,
497 Some((failed_actor, err)),
498 completing_futures,
499 ));
500 }
501
502 pub(super) fn start_reset(
503 &mut self,
504 database_id: DatabaseId,
505 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
506 reset_request_id: u32,
507 ) {
508 let join_handle = match replace(self, DatabaseStatus::Unspecified) {
509 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
510 for (_, sender) in pending_requests {
511 let _ = sender.send(Err(anyhow!("database reset").into()));
512 }
513 tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
514 }
515 DatabaseStatus::Running(state) => {
516 assert_eq!(database_id, state.database_id);
517 info!(
518 %database_id,
519 reset_request_id, "start database reset from Running"
520 );
521 tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
522 }
523 DatabaseStatus::Suspended(state) => {
524 assert!(
525 completing_futures.is_none(),
526 "should have been clear when suspended"
527 );
528 assert_eq!(database_id, state.inner.database_id);
529 info!(
530 %database_id,
531 reset_request_id,
532 suspend_elapsed = ?state.suspend_time.elapsed(),
533 "start database reset after suspended"
534 );
535 tokio::spawn(state.reset())
536 }
537 DatabaseStatus::Resetting(state) => {
538 let prev_request_id = state.reset_request_id;
539 info!(
540 %database_id,
541 reset_request_id, prev_request_id, "receive duplicate reset request"
542 );
543 assert!(reset_request_id > prev_request_id);
544 state.join_handle
545 }
546 DatabaseStatus::Unspecified => {
547 unreachable!()
548 }
549 };
550 *self = DatabaseStatus::Resetting(ResettingDatabaseState {
551 join_handle,
552 reset_request_id,
553 });
554 }
555}
556
557#[derive(Default)]
558pub(crate) struct ManagedBarrierState {
559 pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
560}
561
562pub(super) enum ManagedBarrierStateEvent {
563 BarrierCollected {
564 partial_graph_id: PartialGraphId,
565 barrier: Barrier,
566 },
567 ActorError {
568 actor_id: ActorId,
569 err: StreamError,
570 },
571 DatabaseReset(ResetDatabaseOutput, u32),
572}
573
574impl ManagedBarrierState {
575 pub(super) fn next_event(
576 &mut self,
577 ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
578 poll_fn(|cx| {
579 for (database_id, database) in &mut self.databases {
580 if let Poll::Ready(event) = database.poll_next_event(cx) {
581 return Poll::Ready((*database_id, event));
582 }
583 }
584 Poll::Pending
585 })
586 }
587}
588
589pub(crate) struct DatabaseManagedBarrierState {
594 database_id: DatabaseId,
595 pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
596 pub(super) actor_pending_new_output_requests:
597 HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
598
599 pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
600
601 table_ids: HashSet<TableId>,
602
603 actor_manager: Arc<StreamActorManager>,
604
605 pub(super) local_barrier_manager: LocalBarrierManager,
606
607 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
608 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
609}
610
611impl DatabaseManagedBarrierState {
612 pub(super) fn new(
614 database_id: DatabaseId,
615 term_id: String,
616 actor_manager: Arc<StreamActorManager>,
617 ) -> Self {
618 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
619 LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
620 Self {
621 database_id,
622 actor_states: Default::default(),
623 actor_pending_new_output_requests: Default::default(),
624 graph_states: Default::default(),
625 table_ids: Default::default(),
626 actor_manager,
627 local_barrier_manager,
628 barrier_event_rx,
629 actor_failure_rx,
630 }
631 }
632
633 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
634 ManagedBarrierStateDebugInfo {
635 running_actors: self.actor_states.keys().cloned().collect(),
636 graph_states: &self.graph_states,
637 }
638 }
639
640 async fn abort_and_wait_actors(&mut self) {
641 for (actor_id, state) in &self.actor_states {
642 tracing::debug!("force stopping actor {}", actor_id);
643 state.join_handle.abort();
644 if let Some(monitor_task_handle) = &state.monitor_task_handle {
645 monitor_task_handle.abort();
646 }
647 }
648
649 for (actor_id, state) in self.actor_states.drain() {
650 tracing::debug!("join actor {}", actor_id);
651 let result = state.join_handle.await;
652 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
653 }
654 }
655}
656
657impl InflightActorState {
658 pub(super) fn register_barrier_sender(
659 &mut self,
660 tx: mpsc::UnboundedSender<Barrier>,
661 ) -> StreamResult<()> {
662 match &self.status {
663 InflightActorStatus::IssuedFirst(pending_barriers) => {
664 for barrier in pending_barriers {
665 tx.send(barrier.clone()).map_err(|_| {
666 StreamError::barrier_send(
667 barrier.clone(),
668 self.actor_id,
669 "failed to send pending barriers to newly registered sender",
670 )
671 })?;
672 }
673 self.barrier_senders.push(tx);
674 }
675 InflightActorStatus::Running(_) => {
676 unreachable!("should not register barrier sender when entering Running status")
677 }
678 }
679 Ok(())
680 }
681}
682
683impl DatabaseManagedBarrierState {
684 pub(super) fn register_barrier_sender(
685 &mut self,
686 actor_id: ActorId,
687 tx: mpsc::UnboundedSender<Barrier>,
688 ) -> StreamResult<()> {
689 self.actor_states
690 .get_mut(&actor_id)
691 .expect("should exist")
692 .register_barrier_sender(tx)
693 }
694}
695
696impl DatabaseManagedBarrierState {
697 pub(super) fn transform_to_issued(
698 &mut self,
699 barrier: &Barrier,
700 request: InjectBarrierRequest,
701 ) -> StreamResult<()> {
702 let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
703 let actor_to_stop = barrier.all_stop_actors();
704 let is_stop_actor = |actor_id| {
705 actor_to_stop
706 .map(|actors| actors.contains(&actor_id))
707 .unwrap_or(false)
708 };
709 let graph_state = self
710 .graph_states
711 .get_mut(&partial_graph_id)
712 .expect("should exist");
713
714 let table_ids = HashSet::from_iter(request.table_ids_to_sync);
715 self.table_ids.extend(table_ids.iter().cloned());
716
717 graph_state.transform_to_issued(
718 barrier,
719 request.actor_ids_to_collect.iter().copied(),
720 table_ids,
721 );
722
723 let mut new_actors = HashSet::new();
724 for (node, fragment_id, actor) in
725 request
726 .actors_to_build
727 .into_iter()
728 .flat_map(|fragment_actors| {
729 let node = Arc::new(fragment_actors.node.unwrap());
730 fragment_actors
731 .actors
732 .into_iter()
733 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
734 })
735 {
736 let actor_id = actor.actor_id;
737 assert!(!is_stop_actor(actor_id));
738 assert!(new_actors.insert(actor_id));
739 assert!(request.actor_ids_to_collect.contains(&actor_id));
740 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
741 if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
742 {
743 for request in pending_requests {
744 let _ = new_output_request_tx.send(request);
745 }
746 }
747 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
748 actor,
749 fragment_id,
750 node,
751 self.local_barrier_manager.clone(),
752 new_output_request_rx,
753 );
754 assert!(
755 self.actor_states
756 .try_insert(
757 actor_id,
758 InflightActorState::start(
759 actor_id,
760 partial_graph_id,
761 barrier,
762 new_output_request_tx,
763 join_handle,
764 monitor_join_handle
765 )
766 )
767 .is_ok()
768 );
769 }
770
771 if cfg!(test) {
775 for &actor_id in &request.actor_ids_to_collect {
776 if !self.actor_states.contains_key(&actor_id) {
777 let (tx, rx) = unbounded_channel();
778 let join_handle = self.actor_manager.runtime.spawn(async move {
779 let _ = rx;
781 pending().await
782 });
783 assert!(
784 self.actor_states
785 .try_insert(
786 actor_id,
787 InflightActorState::start(
788 actor_id,
789 partial_graph_id,
790 barrier,
791 tx,
792 join_handle,
793 None,
794 )
795 )
796 .is_ok()
797 );
798 new_actors.insert(actor_id);
799 }
800 }
801 }
802
803 for &actor_id in &request.actor_ids_to_collect {
806 if new_actors.contains(&actor_id) {
807 continue;
808 }
809 self.actor_states
810 .get_mut(&actor_id)
811 .unwrap_or_else(|| {
812 panic!(
813 "should exist: {} {:?}",
814 actor_id, request.actor_ids_to_collect
815 );
816 })
817 .issue_barrier(partial_graph_id, barrier, is_stop_actor(actor_id))?;
818 }
819
820 Ok(())
821 }
822
823 pub(super) fn new_actor_remote_output_request(
824 &mut self,
825 actor_id: ActorId,
826 upstream_actor_id: ActorId,
827 result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
828 ) {
829 let (tx, rx) = channel_from_config(self.local_barrier_manager.env.global_config());
830 self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
831 let _ = result_sender.send(Ok(rx));
832 }
833
834 pub(super) fn new_actor_output_request(
835 &mut self,
836 actor_id: ActorId,
837 upstream_actor_id: ActorId,
838 request: NewOutputRequest,
839 ) {
840 if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
841 let _ = actor.new_output_request_tx.send((actor_id, request));
842 } else {
843 self.actor_pending_new_output_requests
844 .entry(upstream_actor_id)
845 .or_default()
846 .push((actor_id, request));
847 }
848 }
849
850 pub(super) fn poll_next_event(
852 &mut self,
853 cx: &mut Context<'_>,
854 ) -> Poll<ManagedBarrierStateEvent> {
855 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
856 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
857 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
858 }
859 for (partial_graph_id, graph_state) in &mut self.graph_states {
861 if let Some(barrier) = graph_state.may_have_collected_all() {
862 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
863 partial_graph_id: *partial_graph_id,
864 barrier,
865 });
866 }
867 }
868 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
869 match event.expect("non-empty when tx in local_barrier_manager") {
870 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
871 if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
872 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
873 partial_graph_id,
874 barrier,
875 });
876 }
877 }
878 LocalBarrierEvent::ReportCreateProgress {
879 epoch,
880 fragment_id,
881 actor,
882 state,
883 } => {
884 self.update_create_mview_progress(epoch, fragment_id, actor, state);
885 }
886 LocalBarrierEvent::ReportSourceListFinished {
887 epoch,
888 actor_id,
889 table_id,
890 associated_source_id,
891 } => {
892 self.report_source_list_finished(
893 epoch,
894 actor_id,
895 table_id,
896 associated_source_id,
897 );
898 }
899 LocalBarrierEvent::ReportSourceLoadFinished {
900 epoch,
901 actor_id,
902 table_id,
903 associated_source_id,
904 } => {
905 self.report_source_load_finished(
906 epoch,
907 actor_id,
908 table_id,
909 associated_source_id,
910 );
911 }
912 LocalBarrierEvent::RefreshFinished {
913 epoch,
914 actor_id,
915 table_id,
916 staging_table_id,
917 } => {
918 self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
919 }
920 LocalBarrierEvent::RegisterBarrierSender {
921 actor_id,
922 barrier_sender,
923 } => {
924 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
925 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
926 }
927 }
928 LocalBarrierEvent::RegisterLocalUpstreamOutput {
929 actor_id,
930 upstream_actor_id,
931 tx,
932 } => {
933 self.new_actor_output_request(
934 actor_id,
935 upstream_actor_id,
936 NewOutputRequest::Local(tx),
937 );
938 }
939 LocalBarrierEvent::ReportCdcTableBackfillProgress {
940 actor_id,
941 epoch,
942 state,
943 } => {
944 self.update_cdc_table_backfill_progress(epoch, actor_id, state);
945 }
946 }
947 }
948
949 debug_assert!(
950 self.graph_states
951 .values_mut()
952 .all(|graph_state| graph_state.may_have_collected_all().is_none())
953 );
954 Poll::Pending
955 }
956}
957
958impl DatabaseManagedBarrierState {
959 #[must_use]
960 pub(super) fn collect(
961 &mut self,
962 actor_id: ActorId,
963 epoch: EpochPair,
964 ) -> Option<(PartialGraphId, Barrier)> {
965 let (prev_partial_graph_id, is_finished) = self
966 .actor_states
967 .get_mut(&actor_id)
968 .expect("should exist")
969 .collect(epoch);
970 if is_finished {
971 let state = self.actor_states.remove(&actor_id).expect("should exist");
972 if let Some(monitor_task_handle) = state.monitor_task_handle {
973 monitor_task_handle.abort();
974 }
975 }
976 let prev_graph_state = self
977 .graph_states
978 .get_mut(&prev_partial_graph_id)
979 .expect("should exist");
980 prev_graph_state.collect(actor_id, epoch);
981 prev_graph_state
982 .may_have_collected_all()
983 .map(|barrier| (prev_partial_graph_id, barrier))
984 }
985
986 #[allow(clippy::type_complexity)]
987 pub(super) fn pop_barrier_to_complete(
988 &mut self,
989 partial_graph_id: PartialGraphId,
990 prev_epoch: u64,
991 ) -> BarrierToComplete {
992 self.graph_states
993 .get_mut(&partial_graph_id)
994 .expect("should exist")
995 .pop_barrier_to_complete(prev_epoch)
996 }
997
998 async fn try_find_root_actor_failure(
1002 &mut self,
1003 first_failure: Option<(Option<ActorId>, StreamError)>,
1004 ) -> Option<ScoredStreamError> {
1005 let mut later_errs = vec![];
1006 let _ = tokio::time::timeout(Duration::from_secs(3), async {
1008 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1009 if let Some((Some(failed_actor), _)) = &first_failure {
1010 uncollected_actors.remove(failed_actor);
1011 }
1012 while !uncollected_actors.is_empty()
1013 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1014 {
1015 uncollected_actors.remove(&actor_id);
1016 later_errs.push(error);
1017 }
1018 })
1019 .await;
1020
1021 first_failure
1022 .into_iter()
1023 .map(|(_, err)| err)
1024 .chain(later_errs.into_iter())
1025 .map(|e| e.with_score())
1026 .max_by_key(|e| e.score)
1027 }
1028
1029 pub(super) fn report_source_list_finished(
1031 &mut self,
1032 epoch: EpochPair,
1033 actor_id: ActorId,
1034 table_id: TableId,
1035 associated_source_id: SourceId,
1036 ) {
1037 if let Some(actor_state) = self.actor_states.get(&actor_id)
1039 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1040 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1041 {
1042 graph_state
1043 .list_finished_source_ids
1044 .entry(epoch.curr)
1045 .or_default()
1046 .push(PbListFinishedSource {
1047 reporter_actor_id: actor_id,
1048 table_id,
1049 associated_source_id,
1050 });
1051 } else {
1052 warn!(
1053 ?epoch,
1054 %actor_id, %table_id, %associated_source_id, "ignore source list finished"
1055 );
1056 }
1057 }
1058
1059 pub(super) fn report_source_load_finished(
1061 &mut self,
1062 epoch: EpochPair,
1063 actor_id: ActorId,
1064 table_id: TableId,
1065 associated_source_id: SourceId,
1066 ) {
1067 if let Some(actor_state) = self.actor_states.get(&actor_id)
1069 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1070 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1071 {
1072 graph_state
1073 .load_finished_source_ids
1074 .entry(epoch.curr)
1075 .or_default()
1076 .push(PbLoadFinishedSource {
1077 reporter_actor_id: actor_id,
1078 table_id,
1079 associated_source_id,
1080 });
1081 } else {
1082 warn!(
1083 ?epoch,
1084 %actor_id, %table_id, %associated_source_id, "ignore source load finished"
1085 );
1086 }
1087 }
1088
1089 pub(super) fn report_refresh_finished(
1091 &mut self,
1092 epoch: EpochPair,
1093 actor_id: ActorId,
1094 table_id: TableId,
1095 staging_table_id: TableId,
1096 ) {
1097 let Some(actor_state) = self.actor_states.get(&actor_id) else {
1099 warn!(
1100 ?epoch,
1101 %actor_id, %table_id, "ignore refresh finished table: actor_state not found"
1102 );
1103 return;
1104 };
1105 let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) else {
1106 let inflight_barriers = actor_state.inflight_barriers.keys().collect::<Vec<_>>();
1107 warn!(
1108 ?epoch,
1109 %actor_id,
1110 %table_id,
1111 ?inflight_barriers,
1112 "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1113 );
1114 return;
1115 };
1116 let Some(graph_state) = self.graph_states.get_mut(partial_graph_id) else {
1117 warn!(
1118 ?epoch,
1119 %actor_id, %table_id, "ignore refresh finished table: graph_state not found"
1120 );
1121 return;
1122 };
1123 graph_state
1124 .refresh_finished_tables
1125 .entry(epoch.curr)
1126 .or_default()
1127 .insert(table_id);
1128 graph_state
1129 .truncate_tables
1130 .entry(epoch.curr)
1131 .or_default()
1132 .insert(staging_table_id);
1133 }
1134}
1135
1136impl PartialGraphManagedBarrierState {
1137 fn may_have_collected_all(&mut self) -> Option<Barrier> {
1141 for barrier_state in self.epoch_barrier_state_map.values_mut() {
1142 match &barrier_state.inner {
1143 ManagedBarrierStateInner::Issued(IssuedState {
1144 remaining_actors, ..
1145 }) if remaining_actors.is_empty() => {}
1146 ManagedBarrierStateInner::AllCollected { .. } => {
1147 continue;
1148 }
1149 ManagedBarrierStateInner::Issued(_) => {
1150 break;
1151 }
1152 }
1153
1154 self.streaming_metrics.barrier_manager_progress.inc();
1155
1156 let create_mview_progress = self
1157 .create_mview_progress
1158 .remove(&barrier_state.barrier.epoch.curr)
1159 .unwrap_or_default()
1160 .into_iter()
1161 .map(|(actor, (fragment_id, state))| state.to_pb(fragment_id, actor))
1162 .collect();
1163
1164 let list_finished_source_ids = self
1165 .list_finished_source_ids
1166 .remove(&barrier_state.barrier.epoch.curr)
1167 .unwrap_or_default();
1168
1169 let load_finished_source_ids = self
1170 .load_finished_source_ids
1171 .remove(&barrier_state.barrier.epoch.curr)
1172 .unwrap_or_default();
1173
1174 let cdc_table_backfill_progress = self
1175 .cdc_table_backfill_progress
1176 .remove(&barrier_state.barrier.epoch.curr)
1177 .unwrap_or_default()
1178 .into_iter()
1179 .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1180 .collect();
1181
1182 let truncate_tables = self
1183 .truncate_tables
1184 .remove(&barrier_state.barrier.epoch.curr)
1185 .unwrap_or_default()
1186 .into_iter()
1187 .collect();
1188
1189 let refresh_finished_tables = self
1190 .refresh_finished_tables
1191 .remove(&barrier_state.barrier.epoch.curr)
1192 .unwrap_or_default()
1193 .into_iter()
1194 .collect();
1195 let prev_state = replace(
1196 &mut barrier_state.inner,
1197 ManagedBarrierStateInner::AllCollected {
1198 create_mview_progress,
1199 list_finished_source_ids,
1200 load_finished_source_ids,
1201 truncate_tables,
1202 refresh_finished_tables,
1203 cdc_table_backfill_progress,
1204 },
1205 );
1206
1207 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1208 barrier_inflight_latency: timer,
1209 ..
1210 }) => {
1211 timer.observe_duration();
1212 });
1213
1214 return Some(barrier_state.barrier.clone());
1215 }
1216 None
1217 }
1218
1219 fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1220 let (popped_prev_epoch, barrier_state) = self
1221 .epoch_barrier_state_map
1222 .pop_first()
1223 .expect("should exist");
1224
1225 assert_eq!(prev_epoch, popped_prev_epoch);
1226
1227 let (
1228 create_mview_progress,
1229 list_finished_source_ids,
1230 load_finished_source_ids,
1231 cdc_table_backfill_progress,
1232 truncate_tables,
1233 refresh_finished_tables,
1234 ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1235 create_mview_progress,
1236 list_finished_source_ids,
1237 load_finished_source_ids,
1238 truncate_tables,
1239 refresh_finished_tables,
1240 cdc_table_backfill_progress,
1241 } => {
1242 (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, truncate_tables, refresh_finished_tables)
1243 });
1244 BarrierToComplete {
1245 barrier: barrier_state.barrier,
1246 table_ids: barrier_state.table_ids,
1247 create_mview_progress,
1248 list_finished_source_ids,
1249 load_finished_source_ids,
1250 truncate_tables,
1251 refresh_finished_tables,
1252 cdc_table_backfill_progress,
1253 }
1254 }
1255}
1256
1257pub(crate) struct BarrierToComplete {
1258 pub barrier: Barrier,
1259 pub table_ids: Option<HashSet<TableId>>,
1260 pub create_mview_progress: Vec<PbCreateMviewProgress>,
1261 pub list_finished_source_ids: Vec<PbListFinishedSource>,
1262 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
1263 pub truncate_tables: Vec<TableId>,
1264 pub refresh_finished_tables: Vec<TableId>,
1265 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1266}
1267
1268impl PartialGraphManagedBarrierState {
1269 pub(super) fn collect(&mut self, actor_id: impl Into<ActorId>, epoch: EpochPair) {
1271 let actor_id = actor_id.into();
1272 tracing::debug!(
1273 target: "events::stream::barrier::manager::collect",
1274 ?epoch, %actor_id, state = ?self.epoch_barrier_state_map,
1275 "collect_barrier",
1276 );
1277
1278 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1279 None => {
1280 panic!(
1284 "cannot collect new actor barrier {:?} at current state: None",
1285 epoch,
1286 )
1287 }
1288 Some(&mut BarrierState {
1289 ref barrier,
1290 inner:
1291 ManagedBarrierStateInner::Issued(IssuedState {
1292 ref mut remaining_actors,
1293 ..
1294 }),
1295 ..
1296 }) => {
1297 let exist = remaining_actors.remove(&actor_id);
1298 assert!(
1299 exist,
1300 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1301 actor_id, epoch.curr
1302 );
1303 assert_eq!(barrier.epoch.curr, epoch.curr);
1304 }
1305 Some(BarrierState { inner, .. }) => {
1306 panic!(
1307 "cannot collect new actor barrier {:?} at current state: {:?}",
1308 epoch, inner
1309 )
1310 }
1311 }
1312 }
1313
1314 pub(super) fn transform_to_issued(
1317 &mut self,
1318 barrier: &Barrier,
1319 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1320 table_ids: HashSet<TableId>,
1321 ) {
1322 let timer = self
1323 .streaming_metrics
1324 .barrier_inflight_latency
1325 .start_timer();
1326
1327 if let Some(hummock) = self.state_store.as_hummock() {
1328 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1329 }
1330
1331 let table_ids = match barrier.kind {
1332 BarrierKind::Unspecified => {
1333 unreachable!()
1334 }
1335 BarrierKind::Initial => {
1336 assert!(
1337 self.prev_barrier_table_ids.is_none(),
1338 "non empty table_ids at initial barrier: {:?}",
1339 self.prev_barrier_table_ids
1340 );
1341 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1342 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1343 None
1344 }
1345 BarrierKind::Barrier => {
1346 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1347 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1348 assert_eq!(prev_table_ids, &table_ids);
1349 *prev_epoch = barrier.epoch;
1350 } else {
1351 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1352 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1353 }
1354 None
1355 }
1356 BarrierKind::Checkpoint => Some(
1357 if let Some((prev_epoch, prev_table_ids)) = self
1358 .prev_barrier_table_ids
1359 .replace((barrier.epoch, table_ids))
1360 && prev_epoch.curr == barrier.epoch.prev
1361 {
1362 prev_table_ids
1363 } else {
1364 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1365 HashSet::new()
1366 },
1367 ),
1368 };
1369
1370 if let Some(&mut BarrierState { ref inner, .. }) =
1371 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1372 {
1373 {
1374 panic!(
1375 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1376 barrier.epoch, inner
1377 );
1378 }
1379 };
1380
1381 self.epoch_barrier_state_map.insert(
1382 barrier.epoch.prev,
1383 BarrierState {
1384 barrier: barrier.clone(),
1385 inner: ManagedBarrierStateInner::Issued(IssuedState {
1386 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1387 barrier_inflight_latency: timer,
1388 }),
1389 table_ids,
1390 },
1391 );
1392 }
1393
1394 #[cfg(test)]
1395 async fn pop_next_completed_epoch(&mut self) -> u64 {
1396 if let Some(barrier) = self.may_have_collected_all() {
1397 self.pop_barrier_to_complete(barrier.epoch.prev);
1398 return barrier.epoch.prev;
1399 }
1400 pending().await
1401 }
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406 use std::collections::HashSet;
1407
1408 use risingwave_common::util::epoch::test_epoch;
1409
1410 use crate::executor::Barrier;
1411 use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1412
1413 #[tokio::test]
1414 async fn test_managed_state_add_actor() {
1415 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1416 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1417 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1418 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1419 let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into()]);
1420 let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into()]);
1421 let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into(), 3.into()]);
1422 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1423 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1424 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1425 managed_barrier_state.collect(1, barrier1.epoch);
1426 managed_barrier_state.collect(2, barrier1.epoch);
1427 assert_eq!(
1428 managed_barrier_state.pop_next_completed_epoch().await,
1429 test_epoch(0)
1430 );
1431 assert_eq!(
1432 managed_barrier_state
1433 .epoch_barrier_state_map
1434 .first_key_value()
1435 .unwrap()
1436 .0,
1437 &test_epoch(1)
1438 );
1439 managed_barrier_state.collect(1, barrier2.epoch);
1440 managed_barrier_state.collect(1, barrier3.epoch);
1441 managed_barrier_state.collect(2, barrier2.epoch);
1442 assert_eq!(
1443 managed_barrier_state.pop_next_completed_epoch().await,
1444 test_epoch(1)
1445 );
1446 assert_eq!(
1447 managed_barrier_state
1448 .epoch_barrier_state_map
1449 .first_key_value()
1450 .unwrap()
1451 .0,
1452 &test_epoch(2)
1453 );
1454 managed_barrier_state.collect(2, barrier3.epoch);
1455 managed_barrier_state.collect(3, barrier3.epoch);
1456 assert_eq!(
1457 managed_barrier_state.pop_next_completed_epoch().await,
1458 test_epoch(2)
1459 );
1460 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1461 }
1462
1463 #[tokio::test]
1464 async fn test_managed_state_stop_actor() {
1465 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1466 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1467 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1468 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1469 let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into(), 3.into(), 4.into()]);
1470 let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into(), 3.into()]);
1471 let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into()]);
1472 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1473 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1474 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1475
1476 managed_barrier_state.collect(1, barrier1.epoch);
1477 managed_barrier_state.collect(1, barrier2.epoch);
1478 managed_barrier_state.collect(1, barrier3.epoch);
1479 managed_barrier_state.collect(2, barrier1.epoch);
1480 managed_barrier_state.collect(2, barrier2.epoch);
1481 managed_barrier_state.collect(2, barrier3.epoch);
1482 assert_eq!(
1483 managed_barrier_state
1484 .epoch_barrier_state_map
1485 .first_key_value()
1486 .unwrap()
1487 .0,
1488 &0
1489 );
1490 managed_barrier_state.collect(3, barrier1.epoch);
1491 managed_barrier_state.collect(3, barrier2.epoch);
1492 assert_eq!(
1493 managed_barrier_state
1494 .epoch_barrier_state_map
1495 .first_key_value()
1496 .unwrap()
1497 .0,
1498 &0
1499 );
1500 managed_barrier_state.collect(4, barrier1.epoch);
1501 assert_eq!(
1502 managed_barrier_state.pop_next_completed_epoch().await,
1503 test_epoch(0)
1504 );
1505 assert_eq!(
1506 managed_barrier_state.pop_next_completed_epoch().await,
1507 test_epoch(1)
1508 );
1509 assert_eq!(
1510 managed_barrier_state.pop_next_completed_epoch().await,
1511 test_epoch(2)
1512 );
1513 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1514 }
1515}