1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::future::BoxFuture;
25use futures::stream::{FuturesOrdered, FuturesUnordered};
26use futures::{FutureExt, StreamExt};
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::TableId;
29use risingwave_common::id::SourceId;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_pb::stream_plan::barrier::BarrierKind;
32use risingwave_pb::stream_service::barrier_complete_response::{
33 PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
34 PbListFinishedSource, PbLoadFinishedSource,
35};
36use risingwave_storage::StateStoreImpl;
37use tokio::sync::mpsc;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
39use tokio::task::JoinHandle;
40
41use crate::error::{StreamError, StreamResult};
42use crate::executor::Barrier;
43use crate::executor::monitor::StreamingMetrics;
44use crate::task::progress::BackfillState;
45use crate::task::{
46 ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
47 StreamActorManager, UpDownActorIds,
48};
49
50struct IssuedState {
51 pub remaining_actors: BTreeSet<ActorId>,
53
54 pub barrier_inflight_latency: HistogramTimer,
55}
56
57impl Debug for IssuedState {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("IssuedState")
60 .field("remaining_actors", &self.remaining_actors)
61 .finish()
62 }
63}
64
65#[derive(Debug)]
67enum ManagedBarrierStateInner {
68 Issued(IssuedState),
70
71 AllCollected {
73 create_mview_progress: Vec<PbCreateMviewProgress>,
74 list_finished_source_ids: Vec<PbListFinishedSource>,
75 load_finished_source_ids: Vec<PbLoadFinishedSource>,
76 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
77 cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
78 truncate_tables: Vec<TableId>,
79 refresh_finished_tables: Vec<TableId>,
80 },
81}
82
83#[derive(Debug)]
84struct BarrierState {
85 barrier: Barrier,
86 table_ids: Option<HashSet<TableId>>,
88 inner: ManagedBarrierStateInner,
89}
90
91use risingwave_common::must_match;
92use risingwave_pb::id::FragmentId;
93use risingwave_pb::stream_service::InjectBarrierRequest;
94
95use crate::executor::exchange::permit;
96use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
97use crate::task::barrier_worker::{ScoredStreamError, TakeReceiverRequest};
98use crate::task::cdc_progress::CdcTableBackfillState;
99
100pub(super) struct ManagedBarrierStateDebugInfo<'a> {
101 running_actors: BTreeSet<ActorId>,
102 graph_state: &'a PartialGraphManagedBarrierState,
103}
104
105impl Display for ManagedBarrierStateDebugInfo<'_> {
106 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
107 write!(f, "running_actors: ")?;
108 for actor_id in &self.running_actors {
109 write!(f, "{}, ", actor_id)?;
110 }
111 {
112 writeln!(f, "graph states")?;
113 write!(f, "{}", self.graph_state)?;
114 }
115 Ok(())
116 }
117}
118
119impl Display for &'_ PartialGraphManagedBarrierState {
120 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
121 let mut prev_epoch = 0u64;
122 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
123 write!(f, "> Epoch {}: ", epoch)?;
124 match &barrier_state.inner {
125 ManagedBarrierStateInner::Issued(state) => {
126 write!(
127 f,
128 "Issued [{:?}]. Remaining actors: [",
129 barrier_state.barrier.kind
130 )?;
131 let mut is_prev_epoch_issued = false;
132 if prev_epoch != 0 {
133 let bs = &self.epoch_barrier_state_map[&prev_epoch];
134 if let ManagedBarrierStateInner::Issued(IssuedState {
135 remaining_actors: remaining_actors_prev,
136 ..
137 }) = &bs.inner
138 {
139 is_prev_epoch_issued = true;
141 let mut duplicates = 0usize;
142 for actor_id in &state.remaining_actors {
143 if !remaining_actors_prev.contains(actor_id) {
144 write!(f, "{}, ", actor_id)?;
145 } else {
146 duplicates += 1;
147 }
148 }
149 if duplicates > 0 {
150 write!(f, "...and {} actors in prev epoch", duplicates)?;
151 }
152 }
153 }
154 if !is_prev_epoch_issued {
155 for actor_id in &state.remaining_actors {
156 write!(f, "{}, ", actor_id)?;
157 }
158 }
159 write!(f, "]")?;
160 }
161 ManagedBarrierStateInner::AllCollected { .. } => {
162 write!(f, "AllCollected")?;
163 }
164 }
165 prev_epoch = *epoch;
166 writeln!(f)?;
167 }
168
169 if !self.create_mview_progress.is_empty() {
170 writeln!(f, "Create MView Progress:")?;
171 for (epoch, progress) in &self.create_mview_progress {
172 write!(f, "> Epoch {}:", epoch)?;
173 for (actor_id, (_, state)) in progress {
174 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
175 }
176 }
177 }
178
179 Ok(())
180 }
181}
182
183enum InflightActorStatus {
184 IssuedFirst(Vec<Barrier>),
186 Running(u64),
188}
189
190impl InflightActorStatus {
191 fn max_issued_epoch(&self) -> u64 {
192 match self {
193 InflightActorStatus::Running(epoch) => *epoch,
194 InflightActorStatus::IssuedFirst(issued_barriers) => {
195 issued_barriers.last().expect("non-empty").epoch.prev
196 }
197 }
198 }
199}
200
201pub(crate) struct InflightActorState {
202 actor_id: ActorId,
203 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
204 pub(in crate::task) inflight_barriers: VecDeque<u64>,
206 status: InflightActorStatus,
207 is_stopping: bool,
209
210 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
211 join_handle: JoinHandle<()>,
212 monitor_task_handle: Option<JoinHandle<()>>,
213}
214
215impl InflightActorState {
216 pub(super) fn start(
217 actor_id: ActorId,
218 initial_barrier: &Barrier,
219 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
220 join_handle: JoinHandle<()>,
221 monitor_task_handle: Option<JoinHandle<()>>,
222 ) -> Self {
223 Self {
224 actor_id,
225 barrier_senders: vec![],
226 inflight_barriers: VecDeque::from_iter([initial_barrier.epoch.prev]),
227 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
228 is_stopping: false,
229 new_output_request_tx,
230 join_handle,
231 monitor_task_handle,
232 }
233 }
234
235 pub(super) fn issue_barrier(&mut self, barrier: &Barrier, is_stop: bool) -> StreamResult<()> {
236 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
237
238 for barrier_sender in &self.barrier_senders {
239 barrier_sender.send(barrier.clone()).map_err(|_| {
240 StreamError::barrier_send(
241 barrier.clone(),
242 self.actor_id,
243 "failed to send to registered sender",
244 )
245 })?;
246 }
247
248 if let Some(prev_epoch) = self.inflight_barriers.back() {
249 assert!(*prev_epoch < barrier.epoch.prev);
250 }
251 self.inflight_barriers.push_back(barrier.epoch.prev);
252
253 match &mut self.status {
254 InflightActorStatus::IssuedFirst(pending_barriers) => {
255 pending_barriers.push(barrier.clone());
256 }
257 InflightActorStatus::Running(prev_epoch) => {
258 *prev_epoch = barrier.epoch.prev;
259 }
260 };
261
262 if is_stop {
263 assert!(!self.is_stopping, "stopped actor should not issue barrier");
264 self.is_stopping = true;
265 }
266 Ok(())
267 }
268
269 pub(super) fn collect(&mut self, epoch: EpochPair) -> bool {
270 let prev_epoch = self.inflight_barriers.pop_front().expect("should exist");
271 assert_eq!(prev_epoch, epoch.prev);
272 match &self.status {
273 InflightActorStatus::IssuedFirst(pending_barriers) => {
274 assert_eq!(
275 prev_epoch,
276 pending_barriers.first().expect("non-empty").epoch.prev
277 );
278 self.status = InflightActorStatus::Running(
279 pending_barriers.last().expect("non-empty").epoch.prev,
280 );
281 }
282 InflightActorStatus::Running(_) => {}
283 }
284
285 self.inflight_barriers.is_empty() && self.is_stopping
286 }
287}
288
289pub(crate) struct PartialGraphManagedBarrierState {
291 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
295
296 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
297
298 pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, (FragmentId, BackfillState)>>,
306
307 pub(crate) list_finished_source_ids: HashMap<u64, Vec<PbListFinishedSource>>,
311
312 pub(crate) load_finished_source_ids: HashMap<u64, Vec<PbLoadFinishedSource>>,
316
317 pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
318
319 pub(crate) cdc_source_offset_updated: HashMap<u64, Vec<PbCdcSourceOffsetUpdated>>,
322
323 pub(crate) truncate_tables: HashMap<u64, HashSet<TableId>>,
325 pub(crate) refresh_finished_tables: HashMap<u64, HashSet<TableId>>,
328
329 state_store: StateStoreImpl,
330
331 streaming_metrics: Arc<StreamingMetrics>,
332}
333
334impl PartialGraphManagedBarrierState {
335 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
336 Self::new_inner(
337 actor_manager.env.state_store(),
338 actor_manager.streaming_metrics.clone(),
339 )
340 }
341
342 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
343 Self {
344 epoch_barrier_state_map: Default::default(),
345 prev_barrier_table_ids: None,
346 create_mview_progress: Default::default(),
347 list_finished_source_ids: Default::default(),
348 load_finished_source_ids: Default::default(),
349 cdc_table_backfill_progress: Default::default(),
350 cdc_source_offset_updated: Default::default(),
351 truncate_tables: Default::default(),
352 refresh_finished_tables: Default::default(),
353 state_store,
354 streaming_metrics,
355 }
356 }
357
358 #[cfg(test)]
359 pub(crate) fn for_test() -> Self {
360 Self::new_inner(
361 StateStoreImpl::for_test(),
362 Arc::new(StreamingMetrics::unused()),
363 )
364 }
365
366 pub(super) fn is_empty(&self) -> bool {
367 self.epoch_barrier_state_map.is_empty()
368 }
369}
370
371pub(crate) struct SuspendedPartialGraphState {
372 pub(super) suspend_time: Instant,
373 inner: PartialGraphState,
374 failure: Option<(Option<ActorId>, StreamError)>,
375}
376
377impl SuspendedPartialGraphState {
378 fn new(
379 state: PartialGraphState,
380 failure: Option<(Option<ActorId>, StreamError)>,
381 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
383 Self {
384 suspend_time: Instant::now(),
385 inner: state,
386 failure,
387 }
388 }
389
390 async fn reset(mut self) -> ResetPartialGraphOutput {
391 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
392 self.inner.abort_and_wait_actors().await;
393 ResetPartialGraphOutput { root_err }
394 }
395}
396
397pub(crate) struct ResetPartialGraphOutput {
398 pub(crate) root_err: Option<ScoredStreamError>,
399}
400
401pub(in crate::task) enum PartialGraphStatus {
402 ReceivedExchangeRequest(Vec<(UpDownActorIds, TakeReceiverRequest)>),
403 Running(PartialGraphState),
404 Suspended(SuspendedPartialGraphState),
405 Resetting,
406 Unspecified,
408}
409
410impl PartialGraphStatus {
411 pub(crate) async fn abort(&mut self) {
412 match self {
413 PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
414 for (_, request) in pending_requests.drain(..) {
415 if let TakeReceiverRequest::Remote { result_sender, .. } = request {
416 let _ = result_sender.send(Err(anyhow!("partial graph aborted").into()));
417 }
418 }
419 }
420 PartialGraphStatus::Running(state) => {
421 state.abort_and_wait_actors().await;
422 }
423 PartialGraphStatus::Suspended(SuspendedPartialGraphState { inner: state, .. }) => {
424 state.abort_and_wait_actors().await;
425 }
426 PartialGraphStatus::Resetting => {}
427 PartialGraphStatus::Unspecified => {
428 unreachable!()
429 }
430 }
431 }
432
433 pub(crate) fn state_for_request(&mut self) -> Option<&mut PartialGraphState> {
434 match self {
435 PartialGraphStatus::ReceivedExchangeRequest(_) => {
436 unreachable!("should not handle request")
437 }
438 PartialGraphStatus::Running(state) => Some(state),
439 PartialGraphStatus::Suspended(_) => None,
440 PartialGraphStatus::Resetting => {
441 unreachable!("should not receive further request during cleaning")
442 }
443 PartialGraphStatus::Unspecified => {
444 unreachable!()
445 }
446 }
447 }
448
449 pub(super) fn poll_next_event(
450 &mut self,
451 cx: &mut Context<'_>,
452 ) -> Poll<ManagedBarrierStateEvent> {
453 match self {
454 PartialGraphStatus::ReceivedExchangeRequest(_) => Poll::Pending,
455 PartialGraphStatus::Running(state) => state.poll_next_event(cx),
456 PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting => Poll::Pending,
457 PartialGraphStatus::Unspecified => {
458 unreachable!()
459 }
460 }
461 }
462
463 pub(super) fn suspend(
464 &mut self,
465 failed_actor: Option<ActorId>,
466 err: StreamError,
467 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
468 ) {
469 let state = must_match!(replace(self, PartialGraphStatus::Unspecified), PartialGraphStatus::Running(state) => state);
470 *self = PartialGraphStatus::Suspended(SuspendedPartialGraphState::new(
471 state,
472 Some((failed_actor, err)),
473 completing_futures,
474 ));
475 }
476
477 pub(super) fn start_reset(
478 &mut self,
479 partial_graph_id: PartialGraphId,
480 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
481 table_ids_to_clear: &mut HashSet<TableId>,
482 ) -> BoxFuture<'static, ResetPartialGraphOutput> {
483 match replace(self, PartialGraphStatus::Resetting) {
484 PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
485 for (_, request) in pending_requests {
486 if let TakeReceiverRequest::Remote { result_sender, .. } = request {
487 let _ = result_sender.send(Err(anyhow!("partial graph reset").into()));
488 }
489 }
490 async move { ResetPartialGraphOutput { root_err: None } }.boxed()
491 }
492 PartialGraphStatus::Running(state) => {
493 assert_eq!(partial_graph_id, state.partial_graph_id);
494 info!(
495 %partial_graph_id,
496 "start partial graph reset from Running"
497 );
498 table_ids_to_clear.extend(state.table_ids.iter().copied());
499 SuspendedPartialGraphState::new(state, None, completing_futures)
500 .reset()
501 .boxed()
502 }
503 PartialGraphStatus::Suspended(state) => {
504 assert!(
505 completing_futures.is_none(),
506 "should have been clear when suspended"
507 );
508 assert_eq!(partial_graph_id, state.inner.partial_graph_id);
509 info!(
510 %partial_graph_id,
511 suspend_elapsed = ?state.suspend_time.elapsed(),
512 "start partial graph reset after suspended"
513 );
514 table_ids_to_clear.extend(state.inner.table_ids.iter().copied());
515 state.reset().boxed()
516 }
517 PartialGraphStatus::Resetting => {
518 unreachable!("should not reset for twice");
519 }
520 PartialGraphStatus::Unspecified => {
521 unreachable!()
522 }
523 }
524 }
525}
526
527#[derive(Default)]
528pub(in crate::task) struct ManagedBarrierState {
529 pub(super) partial_graphs: HashMap<PartialGraphId, PartialGraphStatus>,
530 pub(super) resetting_graphs:
531 FuturesUnordered<JoinHandle<Vec<(PartialGraphId, ResetPartialGraphOutput)>>>,
532}
533
534pub(super) enum ManagedBarrierStateEvent {
535 BarrierCollected {
536 partial_graph_id: PartialGraphId,
537 barrier: Barrier,
538 },
539 ActorError {
540 partial_graph_id: PartialGraphId,
541 actor_id: ActorId,
542 err: StreamError,
543 },
544 PartialGraphsReset(Vec<(PartialGraphId, ResetPartialGraphOutput)>),
545 RegisterLocalUpstreamOutput {
546 actor_id: ActorId,
547 upstream_actor_id: ActorId,
548 upstream_partial_graph_id: PartialGraphId,
549 tx: permit::Sender,
550 },
551}
552
553impl ManagedBarrierState {
554 pub(super) fn next_event(&mut self) -> impl Future<Output = ManagedBarrierStateEvent> + '_ {
555 poll_fn(|cx| {
556 for graph in self.partial_graphs.values_mut() {
557 if let Poll::Ready(event) = graph.poll_next_event(cx) {
558 return Poll::Ready(event);
559 }
560 }
561 if let Poll::Ready(Some(result)) = self.resetting_graphs.poll_next_unpin(cx) {
562 let outputs = result.expect("failed to join resetting future");
563 for (partial_graph_id, _) in &outputs {
564 let PartialGraphStatus::Resetting = self
565 .partial_graphs
566 .remove(partial_graph_id)
567 .expect("should exist")
568 else {
569 panic!("should be resetting")
570 };
571 }
572 return Poll::Ready(ManagedBarrierStateEvent::PartialGraphsReset(outputs));
573 }
574 Poll::Pending
575 })
576 }
577}
578
579pub(crate) struct PartialGraphState {
584 partial_graph_id: PartialGraphId,
585 pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
586 pub(super) actor_pending_new_output_requests:
587 HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
588
589 pub(crate) graph_state: PartialGraphManagedBarrierState,
590
591 table_ids: HashSet<TableId>,
592
593 actor_manager: Arc<StreamActorManager>,
594
595 pub(super) local_barrier_manager: LocalBarrierManager,
596
597 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
598 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
599}
600
601impl PartialGraphState {
602 pub(super) fn new(
604 partial_graph_id: PartialGraphId,
605 term_id: String,
606 actor_manager: Arc<StreamActorManager>,
607 ) -> Self {
608 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
609 LocalBarrierManager::new(term_id, actor_manager.env.clone());
610 Self {
611 partial_graph_id,
612 actor_states: Default::default(),
613 actor_pending_new_output_requests: Default::default(),
614 graph_state: PartialGraphManagedBarrierState::new(&actor_manager),
615 table_ids: Default::default(),
616 actor_manager,
617 local_barrier_manager,
618 barrier_event_rx,
619 actor_failure_rx,
620 }
621 }
622
623 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
624 ManagedBarrierStateDebugInfo {
625 running_actors: self.actor_states.keys().cloned().collect(),
626 graph_state: &self.graph_state,
627 }
628 }
629
630 async fn abort_and_wait_actors(&mut self) {
631 for (actor_id, state) in &self.actor_states {
632 tracing::debug!("force stopping actor {}", actor_id);
633 state.join_handle.abort();
634 if let Some(monitor_task_handle) = &state.monitor_task_handle {
635 monitor_task_handle.abort();
636 }
637 }
638
639 for (actor_id, state) in self.actor_states.drain() {
640 tracing::debug!("join actor {}", actor_id);
641 let result = state.join_handle.await;
642 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
643 }
644 }
645}
646
647impl InflightActorState {
648 pub(super) fn register_barrier_sender(
649 &mut self,
650 tx: mpsc::UnboundedSender<Barrier>,
651 ) -> StreamResult<()> {
652 match &self.status {
653 InflightActorStatus::IssuedFirst(pending_barriers) => {
654 for barrier in pending_barriers {
655 tx.send(barrier.clone()).map_err(|_| {
656 StreamError::barrier_send(
657 barrier.clone(),
658 self.actor_id,
659 "failed to send pending barriers to newly registered sender",
660 )
661 })?;
662 }
663 self.barrier_senders.push(tx);
664 }
665 InflightActorStatus::Running(_) => {
666 unreachable!("should not register barrier sender when entering Running status")
667 }
668 }
669 Ok(())
670 }
671}
672
673impl PartialGraphState {
674 pub(super) fn register_barrier_sender(
675 &mut self,
676 actor_id: ActorId,
677 tx: mpsc::UnboundedSender<Barrier>,
678 ) -> StreamResult<()> {
679 self.actor_states
680 .get_mut(&actor_id)
681 .expect("should exist")
682 .register_barrier_sender(tx)
683 }
684}
685
686impl PartialGraphState {
687 pub(super) fn transform_to_issued(
688 &mut self,
689 barrier: &Barrier,
690 request: InjectBarrierRequest,
691 ) -> StreamResult<()> {
692 assert_eq!(self.partial_graph_id, request.partial_graph_id);
693 let actor_to_stop = barrier.all_stop_actors();
694 let is_stop_actor = |actor_id| {
695 actor_to_stop
696 .map(|actors| actors.contains(&actor_id))
697 .unwrap_or(false)
698 };
699
700 let table_ids = HashSet::from_iter(request.table_ids_to_sync);
701 self.table_ids.extend(table_ids.iter().cloned());
702
703 self.graph_state.transform_to_issued(
704 barrier,
705 request.actor_ids_to_collect.iter().copied(),
706 table_ids,
707 );
708
709 let mut new_actors = HashSet::new();
710 for (node, fragment_id, actor) in
711 request
712 .actors_to_build
713 .into_iter()
714 .flat_map(|fragment_actors| {
715 let node = Arc::new(fragment_actors.node.unwrap());
716 fragment_actors
717 .actors
718 .into_iter()
719 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
720 })
721 {
722 let actor_id = actor.actor_id;
723 assert!(!is_stop_actor(actor_id));
724 assert!(new_actors.insert(actor_id));
725 assert!(request.actor_ids_to_collect.contains(&actor_id));
726 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
727 if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
728 {
729 for request in pending_requests {
730 let _ = new_output_request_tx.send(request);
731 }
732 }
733 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
734 actor,
735 fragment_id,
736 node,
737 self.local_barrier_manager.clone(),
738 new_output_request_rx,
739 );
740 assert!(
741 self.actor_states
742 .try_insert(
743 actor_id,
744 InflightActorState::start(
745 actor_id,
746 barrier,
747 new_output_request_tx,
748 join_handle,
749 monitor_join_handle
750 )
751 )
752 .is_ok()
753 );
754 }
755
756 if cfg!(test) {
760 for &actor_id in &request.actor_ids_to_collect {
761 if !self.actor_states.contains_key(&actor_id) {
762 let (tx, rx) = unbounded_channel();
763 let join_handle = self.actor_manager.runtime.spawn(async move {
764 let _ = rx;
766 pending().await
767 });
768 assert!(
769 self.actor_states
770 .try_insert(
771 actor_id,
772 InflightActorState::start(actor_id, barrier, tx, join_handle, None,)
773 )
774 .is_ok()
775 );
776 new_actors.insert(actor_id);
777 }
778 }
779 }
780
781 for &actor_id in &request.actor_ids_to_collect {
784 if new_actors.contains(&actor_id) {
785 continue;
786 }
787 self.actor_states
788 .get_mut(&actor_id)
789 .unwrap_or_else(|| {
790 panic!(
791 "should exist: {} {:?}",
792 actor_id, request.actor_ids_to_collect
793 );
794 })
795 .issue_barrier(barrier, is_stop_actor(actor_id))?;
796 }
797
798 Ok(())
799 }
800
801 pub(super) fn new_actor_output_request(
802 &mut self,
803 actor_id: ActorId,
804 upstream_actor_id: ActorId,
805 request: TakeReceiverRequest,
806 ) {
807 let request = match request {
808 TakeReceiverRequest::Remote {
809 result_sender,
810 upstream_fragment_id,
811 } => {
812 let upstream_fragment_id_str = upstream_fragment_id.to_string();
813 let fragment_channel_buffered_bytes = self
814 .actor_manager
815 .streaming_metrics
816 .fragment_channel_buffered_bytes
817 .with_guarded_label_values(&[&upstream_fragment_id_str]);
818 let (tx, rx) = permit::channel_from_config_with_metrics(
819 self.local_barrier_manager.env.global_config(),
820 permit::ChannelMetrics {
821 sender_actor_channel_buffered_bytes: fragment_channel_buffered_bytes
822 .clone(),
823 receiver_actor_channel_buffered_bytes: fragment_channel_buffered_bytes,
824 },
825 );
826 let _ = result_sender.send(Ok(rx));
827 NewOutputRequest::Remote(tx)
828 }
829 TakeReceiverRequest::Local(tx) => NewOutputRequest::Local(tx),
830 };
831 if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
832 let _ = actor.new_output_request_tx.send((actor_id, request));
833 } else {
834 self.actor_pending_new_output_requests
835 .entry(upstream_actor_id)
836 .or_default()
837 .push((actor_id, request));
838 }
839 }
840
841 pub(super) fn poll_next_event(
843 &mut self,
844 cx: &mut Context<'_>,
845 ) -> Poll<ManagedBarrierStateEvent> {
846 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
847 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
848 return Poll::Ready(ManagedBarrierStateEvent::ActorError {
849 actor_id,
850 err,
851 partial_graph_id: self.partial_graph_id,
852 });
853 }
854 {
856 if let Some(barrier) = self.graph_state.may_have_collected_all() {
857 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
858 barrier,
859 partial_graph_id: self.partial_graph_id,
860 });
861 }
862 }
863 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
864 match event.expect("non-empty when tx in local_barrier_manager") {
865 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
866 if let Some(barrier) = self.collect(actor_id, epoch) {
867 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
868 barrier,
869 partial_graph_id: self.partial_graph_id,
870 });
871 }
872 }
873 LocalBarrierEvent::ReportCreateProgress {
874 epoch,
875 fragment_id,
876 actor,
877 state,
878 } => {
879 self.update_create_mview_progress(epoch, fragment_id, actor, state);
880 }
881 LocalBarrierEvent::ReportSourceListFinished {
882 epoch,
883 actor_id,
884 table_id,
885 associated_source_id,
886 } => {
887 self.report_source_list_finished(
888 epoch,
889 actor_id,
890 table_id,
891 associated_source_id,
892 );
893 }
894 LocalBarrierEvent::ReportSourceLoadFinished {
895 epoch,
896 actor_id,
897 table_id,
898 associated_source_id,
899 } => {
900 self.report_source_load_finished(
901 epoch,
902 actor_id,
903 table_id,
904 associated_source_id,
905 );
906 }
907 LocalBarrierEvent::RefreshFinished {
908 epoch,
909 actor_id,
910 table_id,
911 staging_table_id,
912 } => {
913 self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
914 }
915 LocalBarrierEvent::RegisterBarrierSender {
916 actor_id,
917 barrier_sender,
918 } => {
919 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
920 return Poll::Ready(ManagedBarrierStateEvent::ActorError {
921 actor_id,
922 err,
923 partial_graph_id: self.partial_graph_id,
924 });
925 }
926 }
927 LocalBarrierEvent::RegisterLocalUpstreamOutput {
928 actor_id,
929 upstream_actor_id,
930 upstream_partial_graph_id,
931 tx,
932 } => {
933 return Poll::Ready(ManagedBarrierStateEvent::RegisterLocalUpstreamOutput {
934 actor_id,
935 upstream_actor_id,
936 upstream_partial_graph_id,
937 tx,
938 });
939 }
940 LocalBarrierEvent::ReportCdcTableBackfillProgress {
941 actor_id,
942 epoch,
943 state,
944 } => {
945 self.update_cdc_table_backfill_progress(epoch, actor_id, state);
946 }
947 LocalBarrierEvent::ReportCdcSourceOffsetUpdated {
948 epoch,
949 actor_id,
950 source_id,
951 } => {
952 self.report_cdc_source_offset_updated(epoch, actor_id, source_id);
953 }
954 }
955 }
956
957 debug_assert!(self.graph_state.may_have_collected_all().is_none());
958 Poll::Pending
959 }
960}
961
962impl PartialGraphState {
963 #[must_use]
964 pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) -> Option<Barrier> {
965 let is_finished = self
966 .actor_states
967 .get_mut(&actor_id)
968 .expect("should exist")
969 .collect(epoch);
970 if is_finished {
971 let state = self.actor_states.remove(&actor_id).expect("should exist");
972 if let Some(monitor_task_handle) = state.monitor_task_handle {
973 monitor_task_handle.abort();
974 }
975 }
976 self.graph_state.collect(actor_id, epoch);
977 self.graph_state.may_have_collected_all()
978 }
979
980 pub(super) fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
981 self.graph_state.pop_barrier_to_complete(prev_epoch)
982 }
983
984 async fn try_find_root_actor_failure(
988 &mut self,
989 first_failure: Option<(Option<ActorId>, StreamError)>,
990 ) -> Option<ScoredStreamError> {
991 let mut later_errs = vec![];
992 let _ = tokio::time::timeout(Duration::from_secs(3), async {
994 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
995 if let Some((Some(failed_actor), _)) = &first_failure {
996 uncollected_actors.remove(failed_actor);
997 }
998 while !uncollected_actors.is_empty()
999 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1000 {
1001 uncollected_actors.remove(&actor_id);
1002 later_errs.push(error);
1003 }
1004 })
1005 .await;
1006
1007 first_failure
1008 .into_iter()
1009 .map(|(_, err)| err)
1010 .chain(later_errs.into_iter())
1011 .map(|e| e.with_score())
1012 .max_by_key(|e| e.score)
1013 }
1014
1015 pub(super) fn report_source_list_finished(
1017 &mut self,
1018 epoch: EpochPair,
1019 actor_id: ActorId,
1020 table_id: TableId,
1021 associated_source_id: SourceId,
1022 ) {
1023 if let Some(actor_state) = self.actor_states.get(&actor_id)
1025 && actor_state.inflight_barriers.contains(&epoch.prev)
1026 {
1027 self.graph_state
1028 .list_finished_source_ids
1029 .entry(epoch.curr)
1030 .or_default()
1031 .push(PbListFinishedSource {
1032 reporter_actor_id: actor_id,
1033 table_id,
1034 associated_source_id,
1035 });
1036 } else {
1037 warn!(
1038 ?epoch,
1039 %actor_id, %table_id, %associated_source_id, "ignore source list finished"
1040 );
1041 }
1042 }
1043
1044 pub(super) fn report_source_load_finished(
1046 &mut self,
1047 epoch: EpochPair,
1048 actor_id: ActorId,
1049 table_id: TableId,
1050 associated_source_id: SourceId,
1051 ) {
1052 if let Some(actor_state) = self.actor_states.get(&actor_id)
1054 && actor_state.inflight_barriers.contains(&epoch.prev)
1055 {
1056 self.graph_state
1057 .load_finished_source_ids
1058 .entry(epoch.curr)
1059 .or_default()
1060 .push(PbLoadFinishedSource {
1061 reporter_actor_id: actor_id,
1062 table_id,
1063 associated_source_id,
1064 });
1065 } else {
1066 warn!(
1067 ?epoch,
1068 %actor_id, %table_id, %associated_source_id, "ignore source load finished"
1069 );
1070 }
1071 }
1072
1073 pub(super) fn report_cdc_source_offset_updated(
1075 &mut self,
1076 epoch: EpochPair,
1077 actor_id: ActorId,
1078 source_id: SourceId,
1079 ) {
1080 if let Some(actor_state) = self.actor_states.get(&actor_id)
1081 && actor_state.inflight_barriers.contains(&epoch.prev)
1082 {
1083 self.graph_state
1084 .cdc_source_offset_updated
1085 .entry(epoch.curr)
1086 .or_default()
1087 .push(PbCdcSourceOffsetUpdated {
1088 reporter_actor_id: actor_id.as_raw_id(),
1089 source_id: source_id.as_raw_id(),
1090 });
1091 } else {
1092 warn!(
1093 ?epoch,
1094 %actor_id, %source_id, "ignore cdc source offset updated"
1095 );
1096 }
1097 }
1098
1099 pub(super) fn report_refresh_finished(
1101 &mut self,
1102 epoch: EpochPair,
1103 actor_id: ActorId,
1104 table_id: TableId,
1105 staging_table_id: TableId,
1106 ) {
1107 let Some(actor_state) = self.actor_states.get(&actor_id) else {
1109 warn!(
1110 ?epoch,
1111 %actor_id, %table_id, "ignore refresh finished table: actor_state not found"
1112 );
1113 return;
1114 };
1115 if !actor_state.inflight_barriers.contains(&epoch.prev) {
1116 warn!(
1117 ?epoch,
1118 %actor_id,
1119 %table_id,
1120 inflight_barriers = ?actor_state.inflight_barriers,
1121 "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1122 );
1123 return;
1124 };
1125 self.graph_state
1126 .refresh_finished_tables
1127 .entry(epoch.curr)
1128 .or_default()
1129 .insert(table_id);
1130 self.graph_state
1131 .truncate_tables
1132 .entry(epoch.curr)
1133 .or_default()
1134 .insert(staging_table_id);
1135 }
1136}
1137
1138impl PartialGraphManagedBarrierState {
1139 fn may_have_collected_all(&mut self) -> Option<Barrier> {
1143 for barrier_state in self.epoch_barrier_state_map.values_mut() {
1144 match &barrier_state.inner {
1145 ManagedBarrierStateInner::Issued(IssuedState {
1146 remaining_actors, ..
1147 }) if remaining_actors.is_empty() => {}
1148 ManagedBarrierStateInner::AllCollected { .. } => {
1149 continue;
1150 }
1151 ManagedBarrierStateInner::Issued(_) => {
1152 break;
1153 }
1154 }
1155
1156 self.streaming_metrics.barrier_manager_progress.inc();
1157
1158 let create_mview_progress = self
1159 .create_mview_progress
1160 .remove(&barrier_state.barrier.epoch.curr)
1161 .unwrap_or_default()
1162 .into_iter()
1163 .map(|(actor, (fragment_id, state))| state.to_pb(fragment_id, actor))
1164 .collect();
1165
1166 let list_finished_source_ids = self
1167 .list_finished_source_ids
1168 .remove(&barrier_state.barrier.epoch.curr)
1169 .unwrap_or_default();
1170
1171 let load_finished_source_ids = self
1172 .load_finished_source_ids
1173 .remove(&barrier_state.barrier.epoch.curr)
1174 .unwrap_or_default();
1175
1176 let cdc_table_backfill_progress = self
1177 .cdc_table_backfill_progress
1178 .remove(&barrier_state.barrier.epoch.curr)
1179 .unwrap_or_default()
1180 .into_iter()
1181 .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1182 .collect();
1183
1184 let cdc_source_offset_updated = self
1185 .cdc_source_offset_updated
1186 .remove(&barrier_state.barrier.epoch.curr)
1187 .unwrap_or_default();
1188
1189 let truncate_tables = self
1190 .truncate_tables
1191 .remove(&barrier_state.barrier.epoch.curr)
1192 .unwrap_or_default()
1193 .into_iter()
1194 .collect();
1195
1196 let refresh_finished_tables = self
1197 .refresh_finished_tables
1198 .remove(&barrier_state.barrier.epoch.curr)
1199 .unwrap_or_default()
1200 .into_iter()
1201 .collect();
1202 let prev_state = replace(
1203 &mut barrier_state.inner,
1204 ManagedBarrierStateInner::AllCollected {
1205 create_mview_progress,
1206 list_finished_source_ids,
1207 load_finished_source_ids,
1208 truncate_tables,
1209 refresh_finished_tables,
1210 cdc_table_backfill_progress,
1211 cdc_source_offset_updated,
1212 },
1213 );
1214
1215 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1216 barrier_inflight_latency: timer,
1217 ..
1218 }) => {
1219 timer.observe_duration();
1220 });
1221
1222 return Some(barrier_state.barrier.clone());
1223 }
1224 None
1225 }
1226
1227 fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1228 let (popped_prev_epoch, barrier_state) = self
1229 .epoch_barrier_state_map
1230 .pop_first()
1231 .expect("should exist");
1232
1233 assert_eq!(prev_epoch, popped_prev_epoch);
1234
1235 let (
1236 create_mview_progress,
1237 list_finished_source_ids,
1238 load_finished_source_ids,
1239 cdc_table_backfill_progress,
1240 cdc_source_offset_updated,
1241 truncate_tables,
1242 refresh_finished_tables,
1243 ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1244 create_mview_progress,
1245 list_finished_source_ids,
1246 load_finished_source_ids,
1247 truncate_tables,
1248 refresh_finished_tables,
1249 cdc_table_backfill_progress,
1250 cdc_source_offset_updated,
1251 } => {
1252 (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, cdc_source_offset_updated, truncate_tables, refresh_finished_tables)
1253 });
1254 BarrierToComplete {
1255 barrier: barrier_state.barrier,
1256 table_ids: barrier_state.table_ids,
1257 create_mview_progress,
1258 list_finished_source_ids,
1259 load_finished_source_ids,
1260 truncate_tables,
1261 refresh_finished_tables,
1262 cdc_table_backfill_progress,
1263 cdc_source_offset_updated,
1264 }
1265 }
1266}
1267
1268pub(crate) struct BarrierToComplete {
1269 pub barrier: Barrier,
1270 pub table_ids: Option<HashSet<TableId>>,
1271 pub create_mview_progress: Vec<PbCreateMviewProgress>,
1272 pub list_finished_source_ids: Vec<PbListFinishedSource>,
1273 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
1274 pub truncate_tables: Vec<TableId>,
1275 pub refresh_finished_tables: Vec<TableId>,
1276 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1277 pub cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
1278}
1279
1280impl PartialGraphManagedBarrierState {
1281 pub(super) fn collect(&mut self, actor_id: impl Into<ActorId>, epoch: EpochPair) {
1283 let actor_id = actor_id.into();
1284 tracing::debug!(
1285 target: "events::stream::barrier::manager::collect",
1286 ?epoch, %actor_id, state = ?self.epoch_barrier_state_map,
1287 "collect_barrier",
1288 );
1289
1290 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1291 None => {
1292 panic!(
1296 "cannot collect new actor barrier {:?} at current state: None",
1297 epoch,
1298 )
1299 }
1300 Some(&mut BarrierState {
1301 ref barrier,
1302 inner:
1303 ManagedBarrierStateInner::Issued(IssuedState {
1304 ref mut remaining_actors,
1305 ..
1306 }),
1307 ..
1308 }) => {
1309 let exist = remaining_actors.remove(&actor_id);
1310 assert!(
1311 exist,
1312 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1313 actor_id, epoch.curr
1314 );
1315 assert_eq!(barrier.epoch.curr, epoch.curr);
1316 }
1317 Some(BarrierState { inner, .. }) => {
1318 panic!(
1319 "cannot collect new actor barrier {:?} at current state: {:?}",
1320 epoch, inner
1321 )
1322 }
1323 }
1324 }
1325
1326 pub(super) fn transform_to_issued(
1329 &mut self,
1330 barrier: &Barrier,
1331 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1332 table_ids: HashSet<TableId>,
1333 ) {
1334 let timer = self
1335 .streaming_metrics
1336 .barrier_inflight_latency
1337 .start_timer();
1338
1339 if let Some(hummock) = self.state_store.as_hummock() {
1340 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1341 }
1342
1343 let table_ids = match barrier.kind {
1344 BarrierKind::Unspecified => {
1345 unreachable!()
1346 }
1347 BarrierKind::Initial => {
1348 assert!(
1349 self.prev_barrier_table_ids.is_none(),
1350 "non empty table_ids at initial barrier: {:?}",
1351 self.prev_barrier_table_ids
1352 );
1353 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1354 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1355 None
1356 }
1357 BarrierKind::Barrier => {
1358 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1359 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1360 assert_eq!(prev_table_ids, &table_ids);
1361 *prev_epoch = barrier.epoch;
1362 } else {
1363 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1364 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1365 }
1366 None
1367 }
1368 BarrierKind::Checkpoint => Some(
1369 if let Some((prev_epoch, prev_table_ids)) = self
1370 .prev_barrier_table_ids
1371 .replace((barrier.epoch, table_ids))
1372 && prev_epoch.curr == barrier.epoch.prev
1373 {
1374 prev_table_ids
1375 } else {
1376 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1377 HashSet::new()
1378 },
1379 ),
1380 };
1381
1382 if let Some(&mut BarrierState { ref inner, .. }) =
1383 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1384 {
1385 {
1386 panic!(
1387 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1388 barrier.epoch, inner
1389 );
1390 }
1391 };
1392
1393 self.epoch_barrier_state_map.insert(
1394 barrier.epoch.prev,
1395 BarrierState {
1396 barrier: barrier.clone(),
1397 inner: ManagedBarrierStateInner::Issued(IssuedState {
1398 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1399 barrier_inflight_latency: timer,
1400 }),
1401 table_ids,
1402 },
1403 );
1404 }
1405
1406 #[cfg(test)]
1407 async fn pop_next_completed_epoch(&mut self) -> u64 {
1408 if let Some(barrier) = self.may_have_collected_all() {
1409 self.pop_barrier_to_complete(barrier.epoch.prev);
1410 return barrier.epoch.prev;
1411 }
1412 pending().await
1413 }
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418 use std::collections::HashSet;
1419
1420 use risingwave_common::util::epoch::test_epoch;
1421
1422 use crate::executor::Barrier;
1423 use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1424
1425 #[tokio::test]
1426 async fn test_managed_state_add_actor() {
1427 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1428 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1429 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1430 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1431 let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into()]);
1432 let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into()]);
1433 let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into(), 3.into()]);
1434 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1435 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1436 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1437 managed_barrier_state.collect(1, barrier1.epoch);
1438 managed_barrier_state.collect(2, barrier1.epoch);
1439 assert_eq!(
1440 managed_barrier_state.pop_next_completed_epoch().await,
1441 test_epoch(0)
1442 );
1443 assert_eq!(
1444 managed_barrier_state
1445 .epoch_barrier_state_map
1446 .first_key_value()
1447 .unwrap()
1448 .0,
1449 &test_epoch(1)
1450 );
1451 managed_barrier_state.collect(1, barrier2.epoch);
1452 managed_barrier_state.collect(1, barrier3.epoch);
1453 managed_barrier_state.collect(2, barrier2.epoch);
1454 assert_eq!(
1455 managed_barrier_state.pop_next_completed_epoch().await,
1456 test_epoch(1)
1457 );
1458 assert_eq!(
1459 managed_barrier_state
1460 .epoch_barrier_state_map
1461 .first_key_value()
1462 .unwrap()
1463 .0,
1464 &test_epoch(2)
1465 );
1466 managed_barrier_state.collect(2, barrier3.epoch);
1467 managed_barrier_state.collect(3, barrier3.epoch);
1468 assert_eq!(
1469 managed_barrier_state.pop_next_completed_epoch().await,
1470 test_epoch(2)
1471 );
1472 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1473 }
1474
1475 #[tokio::test]
1476 async fn test_managed_state_stop_actor() {
1477 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1478 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1479 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1480 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1481 let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into(), 3.into(), 4.into()]);
1482 let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into(), 3.into()]);
1483 let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into()]);
1484 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1485 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1486 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1487
1488 managed_barrier_state.collect(1, barrier1.epoch);
1489 managed_barrier_state.collect(1, barrier2.epoch);
1490 managed_barrier_state.collect(1, barrier3.epoch);
1491 managed_barrier_state.collect(2, barrier1.epoch);
1492 managed_barrier_state.collect(2, barrier2.epoch);
1493 managed_barrier_state.collect(2, barrier3.epoch);
1494 assert_eq!(
1495 managed_barrier_state
1496 .epoch_barrier_state_map
1497 .first_key_value()
1498 .unwrap()
1499 .0,
1500 &0
1501 );
1502 managed_barrier_state.collect(3, barrier1.epoch);
1503 managed_barrier_state.collect(3, barrier2.epoch);
1504 assert_eq!(
1505 managed_barrier_state
1506 .epoch_barrier_state_map
1507 .first_key_value()
1508 .unwrap()
1509 .0,
1510 &0
1511 );
1512 managed_barrier_state.collect(4, barrier1.epoch);
1513 assert_eq!(
1514 managed_barrier_state.pop_next_completed_epoch().await,
1515 test_epoch(0)
1516 );
1517 assert_eq!(
1518 managed_barrier_state.pop_next_completed_epoch().await,
1519 test_epoch(1)
1520 );
1521 assert_eq!(
1522 managed_barrier_state.pop_next_completed_epoch().await,
1523 test_epoch(2)
1524 );
1525 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1526 }
1527}