risingwave_stream/task/barrier_worker/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::future::BoxFuture;
25use futures::stream::{FuturesOrdered, FuturesUnordered};
26use futures::{FutureExt, StreamExt};
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::TableId;
29use risingwave_common::id::SourceId;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_pb::stream_plan::barrier::BarrierKind;
32use risingwave_pb::stream_service::barrier_complete_response::{
33    PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
34};
35use risingwave_storage::StateStoreImpl;
36use tokio::sync::mpsc;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
38use tokio::task::JoinHandle;
39
40use crate::error::{StreamError, StreamResult};
41use crate::executor::Barrier;
42use crate::executor::monitor::StreamingMetrics;
43use crate::task::progress::BackfillState;
44use crate::task::{
45    ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
46    StreamActorManager, UpDownActorIds,
47};
48
49struct IssuedState {
50    /// Actor ids remaining to be collected.
51    pub remaining_actors: BTreeSet<ActorId>,
52
53    pub barrier_inflight_latency: HistogramTimer,
54}
55
56impl Debug for IssuedState {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("IssuedState")
59            .field("remaining_actors", &self.remaining_actors)
60            .finish()
61    }
62}
63
64/// The state machine of local barrier manager.
65#[derive(Debug)]
66enum ManagedBarrierStateInner {
67    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
68    Issued(IssuedState),
69
70    /// The barrier has been collected by all remaining actors
71    AllCollected {
72        create_mview_progress: Vec<PbCreateMviewProgress>,
73        list_finished_source_ids: Vec<PbListFinishedSource>,
74        load_finished_source_ids: Vec<PbLoadFinishedSource>,
75        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
76        truncate_tables: Vec<TableId>,
77        refresh_finished_tables: Vec<TableId>,
78    },
79}
80
81#[derive(Debug)]
82struct BarrierState {
83    barrier: Barrier,
84    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
85    table_ids: Option<HashSet<TableId>>,
86    inner: ManagedBarrierStateInner,
87}
88
89use risingwave_common::must_match;
90use risingwave_pb::id::FragmentId;
91use risingwave_pb::stream_service::InjectBarrierRequest;
92
93use crate::executor::exchange::permit;
94use crate::executor::exchange::permit::channel_from_config;
95use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
96use crate::task::barrier_worker::{ScoredStreamError, TakeReceiverRequest};
97use crate::task::cdc_progress::CdcTableBackfillState;
98
99pub(super) struct ManagedBarrierStateDebugInfo<'a> {
100    running_actors: BTreeSet<ActorId>,
101    graph_state: &'a PartialGraphManagedBarrierState,
102}
103
104impl Display for ManagedBarrierStateDebugInfo<'_> {
105    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106        write!(f, "running_actors: ")?;
107        for actor_id in &self.running_actors {
108            write!(f, "{}, ", actor_id)?;
109        }
110        {
111            writeln!(f, "graph states")?;
112            write!(f, "{}", self.graph_state)?;
113        }
114        Ok(())
115    }
116}
117
118impl Display for &'_ PartialGraphManagedBarrierState {
119    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120        let mut prev_epoch = 0u64;
121        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
122            write!(f, "> Epoch {}: ", epoch)?;
123            match &barrier_state.inner {
124                ManagedBarrierStateInner::Issued(state) => {
125                    write!(
126                        f,
127                        "Issued [{:?}]. Remaining actors: [",
128                        barrier_state.barrier.kind
129                    )?;
130                    let mut is_prev_epoch_issued = false;
131                    if prev_epoch != 0 {
132                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
133                        if let ManagedBarrierStateInner::Issued(IssuedState {
134                            remaining_actors: remaining_actors_prev,
135                            ..
136                        }) = &bs.inner
137                        {
138                            // Only show the actors that are not in the previous epoch.
139                            is_prev_epoch_issued = true;
140                            let mut duplicates = 0usize;
141                            for actor_id in &state.remaining_actors {
142                                if !remaining_actors_prev.contains(actor_id) {
143                                    write!(f, "{}, ", actor_id)?;
144                                } else {
145                                    duplicates += 1;
146                                }
147                            }
148                            if duplicates > 0 {
149                                write!(f, "...and {} actors in prev epoch", duplicates)?;
150                            }
151                        }
152                    }
153                    if !is_prev_epoch_issued {
154                        for actor_id in &state.remaining_actors {
155                            write!(f, "{}, ", actor_id)?;
156                        }
157                    }
158                    write!(f, "]")?;
159                }
160                ManagedBarrierStateInner::AllCollected { .. } => {
161                    write!(f, "AllCollected")?;
162                }
163            }
164            prev_epoch = *epoch;
165            writeln!(f)?;
166        }
167
168        if !self.create_mview_progress.is_empty() {
169            writeln!(f, "Create MView Progress:")?;
170            for (epoch, progress) in &self.create_mview_progress {
171                write!(f, "> Epoch {}:", epoch)?;
172                for (actor_id, (_, state)) in progress {
173                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
174                }
175            }
176        }
177
178        Ok(())
179    }
180}
181
182enum InflightActorStatus {
183    /// The actor has been issued some barriers, but has not collected the first barrier
184    IssuedFirst(Vec<Barrier>),
185    /// The actor has been issued some barriers, and has collected the first barrier
186    Running(u64),
187}
188
189impl InflightActorStatus {
190    fn max_issued_epoch(&self) -> u64 {
191        match self {
192            InflightActorStatus::Running(epoch) => *epoch,
193            InflightActorStatus::IssuedFirst(issued_barriers) => {
194                issued_barriers.last().expect("non-empty").epoch.prev
195            }
196        }
197    }
198}
199
200pub(crate) struct InflightActorState {
201    actor_id: ActorId,
202    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
203    /// `prev_epoch`. `push_back` and `pop_front`
204    pub(in crate::task) inflight_barriers: VecDeque<u64>,
205    status: InflightActorStatus,
206    /// Whether the actor has been issued a stop barrier
207    is_stopping: bool,
208
209    new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
210    join_handle: JoinHandle<()>,
211    monitor_task_handle: Option<JoinHandle<()>>,
212}
213
214impl InflightActorState {
215    pub(super) fn start(
216        actor_id: ActorId,
217        initial_barrier: &Barrier,
218        new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
219        join_handle: JoinHandle<()>,
220        monitor_task_handle: Option<JoinHandle<()>>,
221    ) -> Self {
222        Self {
223            actor_id,
224            barrier_senders: vec![],
225            inflight_barriers: VecDeque::from_iter([initial_barrier.epoch.prev]),
226            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
227            is_stopping: false,
228            new_output_request_tx,
229            join_handle,
230            monitor_task_handle,
231        }
232    }
233
234    pub(super) fn issue_barrier(&mut self, barrier: &Barrier, is_stop: bool) -> StreamResult<()> {
235        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
236
237        for barrier_sender in &self.barrier_senders {
238            barrier_sender.send(barrier.clone()).map_err(|_| {
239                StreamError::barrier_send(
240                    barrier.clone(),
241                    self.actor_id,
242                    "failed to send to registered sender",
243                )
244            })?;
245        }
246
247        if let Some(prev_epoch) = self.inflight_barriers.back() {
248            assert!(*prev_epoch < barrier.epoch.prev);
249        }
250        self.inflight_barriers.push_back(barrier.epoch.prev);
251
252        match &mut self.status {
253            InflightActorStatus::IssuedFirst(pending_barriers) => {
254                pending_barriers.push(barrier.clone());
255            }
256            InflightActorStatus::Running(prev_epoch) => {
257                *prev_epoch = barrier.epoch.prev;
258            }
259        };
260
261        if is_stop {
262            assert!(!self.is_stopping, "stopped actor should not issue barrier");
263            self.is_stopping = true;
264        }
265        Ok(())
266    }
267
268    pub(super) fn collect(&mut self, epoch: EpochPair) -> bool {
269        let prev_epoch = self.inflight_barriers.pop_front().expect("should exist");
270        assert_eq!(prev_epoch, epoch.prev);
271        match &self.status {
272            InflightActorStatus::IssuedFirst(pending_barriers) => {
273                assert_eq!(
274                    prev_epoch,
275                    pending_barriers.first().expect("non-empty").epoch.prev
276                );
277                self.status = InflightActorStatus::Running(
278                    pending_barriers.last().expect("non-empty").epoch.prev,
279                );
280            }
281            InflightActorStatus::Running(_) => {}
282        }
283
284        self.inflight_barriers.is_empty() && self.is_stopping
285    }
286}
287
288/// Part of [`PartialGraphState`]
289pub(crate) struct PartialGraphManagedBarrierState {
290    /// Record barrier state for each epoch of concurrent checkpoints.
291    ///
292    /// The key is `prev_epoch`, and the first value is `curr_epoch`
293    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
294
295    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
296
297    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
298    ///
299    /// The process of progress reporting is as follows:
300    /// 1. updated by [`crate::task::barrier_manager::CreateMviewProgressReporter::update`]
301    /// 2. converted to [`ManagedBarrierStateInner`] in [`Self::may_have_collected_all`]
302    /// 3. handled by [`Self::pop_barrier_to_complete`]
303    /// 4. put in [`crate::task::barrier_worker::BarrierCompleteResult`] and reported to meta.
304    pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, (FragmentId, BackfillState)>>,
305
306    /// Record the source list finished reports for each epoch of concurrent checkpoints.
307    /// Used for refreshable batch source. The map key is epoch and the value is
308    /// a list of pb messages reported by actors.
309    pub(crate) list_finished_source_ids: HashMap<u64, Vec<PbListFinishedSource>>,
310
311    /// Record the source load finished reports for each epoch of concurrent checkpoints.
312    /// Used for refreshable batch source. The map key is epoch and the value is
313    /// a list of pb messages reported by actors.
314    pub(crate) load_finished_source_ids: HashMap<u64, Vec<PbLoadFinishedSource>>,
315
316    pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
317
318    /// Record the tables to truncate for each epoch of concurrent checkpoints.
319    pub(crate) truncate_tables: HashMap<u64, HashSet<TableId>>,
320    /// Record the tables that have finished refresh for each epoch of concurrent checkpoints.
321    /// Used for materialized view refresh completion reporting.
322    pub(crate) refresh_finished_tables: HashMap<u64, HashSet<TableId>>,
323
324    state_store: StateStoreImpl,
325
326    streaming_metrics: Arc<StreamingMetrics>,
327}
328
329impl PartialGraphManagedBarrierState {
330    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
331        Self::new_inner(
332            actor_manager.env.state_store(),
333            actor_manager.streaming_metrics.clone(),
334        )
335    }
336
337    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
338        Self {
339            epoch_barrier_state_map: Default::default(),
340            prev_barrier_table_ids: None,
341            create_mview_progress: Default::default(),
342            list_finished_source_ids: Default::default(),
343            load_finished_source_ids: Default::default(),
344            cdc_table_backfill_progress: Default::default(),
345            truncate_tables: Default::default(),
346            refresh_finished_tables: Default::default(),
347            state_store,
348            streaming_metrics,
349        }
350    }
351
352    #[cfg(test)]
353    pub(crate) fn for_test() -> Self {
354        Self::new_inner(
355            StateStoreImpl::for_test(),
356            Arc::new(StreamingMetrics::unused()),
357        )
358    }
359
360    pub(super) fn is_empty(&self) -> bool {
361        self.epoch_barrier_state_map.is_empty()
362    }
363}
364
365pub(crate) struct SuspendedPartialGraphState {
366    pub(super) suspend_time: Instant,
367    inner: PartialGraphState,
368    failure: Option<(Option<ActorId>, StreamError)>,
369}
370
371impl SuspendedPartialGraphState {
372    fn new(
373        state: PartialGraphState,
374        failure: Option<(Option<ActorId>, StreamError)>,
375        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
376    ) -> Self {
377        Self {
378            suspend_time: Instant::now(),
379            inner: state,
380            failure,
381        }
382    }
383
384    async fn reset(mut self) -> ResetPartialGraphOutput {
385        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
386        self.inner.abort_and_wait_actors().await;
387        ResetPartialGraphOutput { root_err }
388    }
389}
390
391pub(crate) struct ResetPartialGraphOutput {
392    pub(crate) root_err: Option<ScoredStreamError>,
393}
394
395pub(in crate::task) enum PartialGraphStatus {
396    ReceivedExchangeRequest(Vec<(UpDownActorIds, TakeReceiverRequest)>),
397    Running(PartialGraphState),
398    Suspended(SuspendedPartialGraphState),
399    Resetting,
400    /// temporary place holder
401    Unspecified,
402}
403
404impl PartialGraphStatus {
405    pub(crate) async fn abort(&mut self) {
406        match self {
407            PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
408                for (_, request) in pending_requests.drain(..) {
409                    if let TakeReceiverRequest::Remote(sender) = request {
410                        let _ = sender.send(Err(anyhow!("partial graph aborted").into()));
411                    }
412                }
413            }
414            PartialGraphStatus::Running(state) => {
415                state.abort_and_wait_actors().await;
416            }
417            PartialGraphStatus::Suspended(SuspendedPartialGraphState { inner: state, .. }) => {
418                state.abort_and_wait_actors().await;
419            }
420            PartialGraphStatus::Resetting => {}
421            PartialGraphStatus::Unspecified => {
422                unreachable!()
423            }
424        }
425    }
426
427    pub(crate) fn state_for_request(&mut self) -> Option<&mut PartialGraphState> {
428        match self {
429            PartialGraphStatus::ReceivedExchangeRequest(_) => {
430                unreachable!("should not handle request")
431            }
432            PartialGraphStatus::Running(state) => Some(state),
433            PartialGraphStatus::Suspended(_) => None,
434            PartialGraphStatus::Resetting => {
435                unreachable!("should not receive further request during cleaning")
436            }
437            PartialGraphStatus::Unspecified => {
438                unreachable!()
439            }
440        }
441    }
442
443    pub(super) fn poll_next_event(
444        &mut self,
445        cx: &mut Context<'_>,
446    ) -> Poll<ManagedBarrierStateEvent> {
447        match self {
448            PartialGraphStatus::ReceivedExchangeRequest(_) => Poll::Pending,
449            PartialGraphStatus::Running(state) => state.poll_next_event(cx),
450            PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting => Poll::Pending,
451            PartialGraphStatus::Unspecified => {
452                unreachable!()
453            }
454        }
455    }
456
457    pub(super) fn suspend(
458        &mut self,
459        failed_actor: Option<ActorId>,
460        err: StreamError,
461        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
462    ) {
463        let state = must_match!(replace(self, PartialGraphStatus::Unspecified), PartialGraphStatus::Running(state) => state);
464        *self = PartialGraphStatus::Suspended(SuspendedPartialGraphState::new(
465            state,
466            Some((failed_actor, err)),
467            completing_futures,
468        ));
469    }
470
471    pub(super) fn start_reset(
472        &mut self,
473        partial_graph_id: PartialGraphId,
474        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
475        table_ids_to_clear: &mut HashSet<TableId>,
476    ) -> BoxFuture<'static, ResetPartialGraphOutput> {
477        match replace(self, PartialGraphStatus::Resetting) {
478            PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
479                for (_, request) in pending_requests {
480                    if let TakeReceiverRequest::Remote(sender) = request {
481                        let _ = sender.send(Err(anyhow!("partial graph reset").into()));
482                    }
483                }
484                async move { ResetPartialGraphOutput { root_err: None } }.boxed()
485            }
486            PartialGraphStatus::Running(state) => {
487                assert_eq!(partial_graph_id, state.partial_graph_id);
488                info!(
489                    %partial_graph_id,
490                    "start partial graph reset from Running"
491                );
492                table_ids_to_clear.extend(state.table_ids.iter().copied());
493                SuspendedPartialGraphState::new(state, None, completing_futures)
494                    .reset()
495                    .boxed()
496            }
497            PartialGraphStatus::Suspended(state) => {
498                assert!(
499                    completing_futures.is_none(),
500                    "should have been clear when suspended"
501                );
502                assert_eq!(partial_graph_id, state.inner.partial_graph_id);
503                info!(
504                    %partial_graph_id,
505                    suspend_elapsed = ?state.suspend_time.elapsed(),
506                    "start partial graph reset after suspended"
507                );
508                table_ids_to_clear.extend(state.inner.table_ids.iter().copied());
509                state.reset().boxed()
510            }
511            PartialGraphStatus::Resetting => {
512                unreachable!("should not reset for twice");
513            }
514            PartialGraphStatus::Unspecified => {
515                unreachable!()
516            }
517        }
518    }
519}
520
521#[derive(Default)]
522pub(in crate::task) struct ManagedBarrierState {
523    pub(super) partial_graphs: HashMap<PartialGraphId, PartialGraphStatus>,
524    pub(super) resetting_graphs:
525        FuturesUnordered<JoinHandle<Vec<(PartialGraphId, ResetPartialGraphOutput)>>>,
526}
527
528pub(super) enum ManagedBarrierStateEvent {
529    BarrierCollected {
530        partial_graph_id: PartialGraphId,
531        barrier: Barrier,
532    },
533    ActorError {
534        partial_graph_id: PartialGraphId,
535        actor_id: ActorId,
536        err: StreamError,
537    },
538    PartialGraphsReset(Vec<(PartialGraphId, ResetPartialGraphOutput)>),
539    RegisterLocalUpstreamOutput {
540        actor_id: ActorId,
541        upstream_actor_id: ActorId,
542        upstream_partial_graph_id: PartialGraphId,
543        tx: permit::Sender,
544    },
545}
546
547impl ManagedBarrierState {
548    pub(super) fn next_event(&mut self) -> impl Future<Output = ManagedBarrierStateEvent> + '_ {
549        poll_fn(|cx| {
550            for graph in self.partial_graphs.values_mut() {
551                if let Poll::Ready(event) = graph.poll_next_event(cx) {
552                    return Poll::Ready(event);
553                }
554            }
555            if let Poll::Ready(Some(result)) = self.resetting_graphs.poll_next_unpin(cx) {
556                let outputs = result.expect("failed to join resetting future");
557                for (partial_graph_id, _) in &outputs {
558                    let PartialGraphStatus::Resetting = self
559                        .partial_graphs
560                        .remove(partial_graph_id)
561                        .expect("should exist")
562                    else {
563                        panic!("should be resetting")
564                    };
565                }
566                return Poll::Ready(ManagedBarrierStateEvent::PartialGraphsReset(outputs));
567            }
568            Poll::Pending
569        })
570    }
571}
572
573/// Per-partial-graph barrier state manager. Handles barriers for one specific partial graph.
574/// Part of [`ManagedBarrierState`] in [`super::LocalBarrierWorker`].
575///
576/// See [`crate::task`] for architecture overview.
577pub(crate) struct PartialGraphState {
578    partial_graph_id: PartialGraphId,
579    pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
580    pub(super) actor_pending_new_output_requests:
581        HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
582
583    pub(crate) graph_state: PartialGraphManagedBarrierState,
584
585    table_ids: HashSet<TableId>,
586
587    actor_manager: Arc<StreamActorManager>,
588
589    pub(super) local_barrier_manager: LocalBarrierManager,
590
591    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
592    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
593}
594
595impl PartialGraphState {
596    /// Create a barrier manager state. This will be called only once.
597    pub(super) fn new(
598        partial_graph_id: PartialGraphId,
599        term_id: String,
600        actor_manager: Arc<StreamActorManager>,
601    ) -> Self {
602        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
603            LocalBarrierManager::new(term_id, actor_manager.env.clone());
604        Self {
605            partial_graph_id,
606            actor_states: Default::default(),
607            actor_pending_new_output_requests: Default::default(),
608            graph_state: PartialGraphManagedBarrierState::new(&actor_manager),
609            table_ids: Default::default(),
610            actor_manager,
611            local_barrier_manager,
612            barrier_event_rx,
613            actor_failure_rx,
614        }
615    }
616
617    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
618        ManagedBarrierStateDebugInfo {
619            running_actors: self.actor_states.keys().cloned().collect(),
620            graph_state: &self.graph_state,
621        }
622    }
623
624    async fn abort_and_wait_actors(&mut self) {
625        for (actor_id, state) in &self.actor_states {
626            tracing::debug!("force stopping actor {}", actor_id);
627            state.join_handle.abort();
628            if let Some(monitor_task_handle) = &state.monitor_task_handle {
629                monitor_task_handle.abort();
630            }
631        }
632
633        for (actor_id, state) in self.actor_states.drain() {
634            tracing::debug!("join actor {}", actor_id);
635            let result = state.join_handle.await;
636            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
637        }
638    }
639}
640
641impl InflightActorState {
642    pub(super) fn register_barrier_sender(
643        &mut self,
644        tx: mpsc::UnboundedSender<Barrier>,
645    ) -> StreamResult<()> {
646        match &self.status {
647            InflightActorStatus::IssuedFirst(pending_barriers) => {
648                for barrier in pending_barriers {
649                    tx.send(barrier.clone()).map_err(|_| {
650                        StreamError::barrier_send(
651                            barrier.clone(),
652                            self.actor_id,
653                            "failed to send pending barriers to newly registered sender",
654                        )
655                    })?;
656                }
657                self.barrier_senders.push(tx);
658            }
659            InflightActorStatus::Running(_) => {
660                unreachable!("should not register barrier sender when entering Running status")
661            }
662        }
663        Ok(())
664    }
665}
666
667impl PartialGraphState {
668    pub(super) fn register_barrier_sender(
669        &mut self,
670        actor_id: ActorId,
671        tx: mpsc::UnboundedSender<Barrier>,
672    ) -> StreamResult<()> {
673        self.actor_states
674            .get_mut(&actor_id)
675            .expect("should exist")
676            .register_barrier_sender(tx)
677    }
678}
679
680impl PartialGraphState {
681    pub(super) fn transform_to_issued(
682        &mut self,
683        barrier: &Barrier,
684        request: InjectBarrierRequest,
685    ) -> StreamResult<()> {
686        assert_eq!(self.partial_graph_id, request.partial_graph_id);
687        let actor_to_stop = barrier.all_stop_actors();
688        let is_stop_actor = |actor_id| {
689            actor_to_stop
690                .map(|actors| actors.contains(&actor_id))
691                .unwrap_or(false)
692        };
693
694        let table_ids = HashSet::from_iter(request.table_ids_to_sync);
695        self.table_ids.extend(table_ids.iter().cloned());
696
697        self.graph_state.transform_to_issued(
698            barrier,
699            request.actor_ids_to_collect.iter().copied(),
700            table_ids,
701        );
702
703        let mut new_actors = HashSet::new();
704        for (node, fragment_id, actor) in
705            request
706                .actors_to_build
707                .into_iter()
708                .flat_map(|fragment_actors| {
709                    let node = Arc::new(fragment_actors.node.unwrap());
710                    fragment_actors
711                        .actors
712                        .into_iter()
713                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
714                })
715        {
716            let actor_id = actor.actor_id;
717            assert!(!is_stop_actor(actor_id));
718            assert!(new_actors.insert(actor_id));
719            assert!(request.actor_ids_to_collect.contains(&actor_id));
720            let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
721            if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
722            {
723                for request in pending_requests {
724                    let _ = new_output_request_tx.send(request);
725                }
726            }
727            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
728                actor,
729                fragment_id,
730                node,
731                self.local_barrier_manager.clone(),
732                new_output_request_rx,
733            );
734            assert!(
735                self.actor_states
736                    .try_insert(
737                        actor_id,
738                        InflightActorState::start(
739                            actor_id,
740                            barrier,
741                            new_output_request_tx,
742                            join_handle,
743                            monitor_join_handle
744                        )
745                    )
746                    .is_ok()
747            );
748        }
749
750        // Spawn a trivial join handle to be compatible with the unit test. In the unit tests that involve local barrier manager,
751        // actors are spawned in the local test logic, but we assume that there is an entry for each spawned actor in ·actor_states`,
752        // so under cfg!(test) we add a dummy entry for each new actor.
753        if cfg!(test) {
754            for &actor_id in &request.actor_ids_to_collect {
755                if !self.actor_states.contains_key(&actor_id) {
756                    let (tx, rx) = unbounded_channel();
757                    let join_handle = self.actor_manager.runtime.spawn(async move {
758                        // The rx is spawned so that tx.send() will not fail.
759                        let _ = rx;
760                        pending().await
761                    });
762                    assert!(
763                        self.actor_states
764                            .try_insert(
765                                actor_id,
766                                InflightActorState::start(actor_id, barrier, tx, join_handle, None,)
767                            )
768                            .is_ok()
769                    );
770                    new_actors.insert(actor_id);
771                }
772            }
773        }
774
775        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
776        // we call `start_epoch` on the graph before the actors receive the barrier
777        for &actor_id in &request.actor_ids_to_collect {
778            if new_actors.contains(&actor_id) {
779                continue;
780            }
781            self.actor_states
782                .get_mut(&actor_id)
783                .unwrap_or_else(|| {
784                    panic!(
785                        "should exist: {} {:?}",
786                        actor_id, request.actor_ids_to_collect
787                    );
788                })
789                .issue_barrier(barrier, is_stop_actor(actor_id))?;
790        }
791
792        Ok(())
793    }
794
795    pub(super) fn new_actor_output_request(
796        &mut self,
797        actor_id: ActorId,
798        upstream_actor_id: ActorId,
799        request: TakeReceiverRequest,
800    ) {
801        let request = match request {
802            TakeReceiverRequest::Remote(result_sender) => {
803                let (tx, rx) = channel_from_config(self.local_barrier_manager.env.global_config());
804                let _ = result_sender.send(Ok(rx));
805                NewOutputRequest::Remote(tx)
806            }
807            TakeReceiverRequest::Local(tx) => NewOutputRequest::Local(tx),
808        };
809        if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
810            let _ = actor.new_output_request_tx.send((actor_id, request));
811        } else {
812            self.actor_pending_new_output_requests
813                .entry(upstream_actor_id)
814                .or_default()
815                .push((actor_id, request));
816        }
817    }
818
819    /// Handles [`LocalBarrierEvent`] from [`crate::task::barrier_manager::LocalBarrierManager`].
820    pub(super) fn poll_next_event(
821        &mut self,
822        cx: &mut Context<'_>,
823    ) -> Poll<ManagedBarrierStateEvent> {
824        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
825            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
826            return Poll::Ready(ManagedBarrierStateEvent::ActorError {
827                actor_id,
828                err,
829                partial_graph_id: self.partial_graph_id,
830            });
831        }
832        // yield some pending collected epochs
833        {
834            if let Some(barrier) = self.graph_state.may_have_collected_all() {
835                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
836                    barrier,
837                    partial_graph_id: self.partial_graph_id,
838                });
839            }
840        }
841        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
842            match event.expect("non-empty when tx in local_barrier_manager") {
843                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
844                    if let Some(barrier) = self.collect(actor_id, epoch) {
845                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
846                            barrier,
847                            partial_graph_id: self.partial_graph_id,
848                        });
849                    }
850                }
851                LocalBarrierEvent::ReportCreateProgress {
852                    epoch,
853                    fragment_id,
854                    actor,
855                    state,
856                } => {
857                    self.update_create_mview_progress(epoch, fragment_id, actor, state);
858                }
859                LocalBarrierEvent::ReportSourceListFinished {
860                    epoch,
861                    actor_id,
862                    table_id,
863                    associated_source_id,
864                } => {
865                    self.report_source_list_finished(
866                        epoch,
867                        actor_id,
868                        table_id,
869                        associated_source_id,
870                    );
871                }
872                LocalBarrierEvent::ReportSourceLoadFinished {
873                    epoch,
874                    actor_id,
875                    table_id,
876                    associated_source_id,
877                } => {
878                    self.report_source_load_finished(
879                        epoch,
880                        actor_id,
881                        table_id,
882                        associated_source_id,
883                    );
884                }
885                LocalBarrierEvent::RefreshFinished {
886                    epoch,
887                    actor_id,
888                    table_id,
889                    staging_table_id,
890                } => {
891                    self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
892                }
893                LocalBarrierEvent::RegisterBarrierSender {
894                    actor_id,
895                    barrier_sender,
896                } => {
897                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
898                        return Poll::Ready(ManagedBarrierStateEvent::ActorError {
899                            actor_id,
900                            err,
901                            partial_graph_id: self.partial_graph_id,
902                        });
903                    }
904                }
905                LocalBarrierEvent::RegisterLocalUpstreamOutput {
906                    actor_id,
907                    upstream_actor_id,
908                    upstream_partial_graph_id,
909                    tx,
910                } => {
911                    return Poll::Ready(ManagedBarrierStateEvent::RegisterLocalUpstreamOutput {
912                        actor_id,
913                        upstream_actor_id,
914                        upstream_partial_graph_id,
915                        tx,
916                    });
917                }
918                LocalBarrierEvent::ReportCdcTableBackfillProgress {
919                    actor_id,
920                    epoch,
921                    state,
922                } => {
923                    self.update_cdc_table_backfill_progress(epoch, actor_id, state);
924                }
925            }
926        }
927
928        debug_assert!(self.graph_state.may_have_collected_all().is_none());
929        Poll::Pending
930    }
931}
932
933impl PartialGraphState {
934    #[must_use]
935    pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) -> Option<Barrier> {
936        let is_finished = self
937            .actor_states
938            .get_mut(&actor_id)
939            .expect("should exist")
940            .collect(epoch);
941        if is_finished {
942            let state = self.actor_states.remove(&actor_id).expect("should exist");
943            if let Some(monitor_task_handle) = state.monitor_task_handle {
944                monitor_task_handle.abort();
945            }
946        }
947        self.graph_state.collect(actor_id, epoch);
948        self.graph_state.may_have_collected_all()
949    }
950
951    pub(super) fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
952        self.graph_state.pop_barrier_to_complete(prev_epoch)
953    }
954
955    /// Collect actor errors for a while and find the one that might be the root cause.
956    ///
957    /// Returns `None` if there's no actor error received.
958    async fn try_find_root_actor_failure(
959        &mut self,
960        first_failure: Option<(Option<ActorId>, StreamError)>,
961    ) -> Option<ScoredStreamError> {
962        let mut later_errs = vec![];
963        // fetch more actor errors within a timeout
964        let _ = tokio::time::timeout(Duration::from_secs(3), async {
965            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
966            if let Some((Some(failed_actor), _)) = &first_failure {
967                uncollected_actors.remove(failed_actor);
968            }
969            while !uncollected_actors.is_empty()
970                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
971            {
972                uncollected_actors.remove(&actor_id);
973                later_errs.push(error);
974            }
975        })
976        .await;
977
978        first_failure
979            .into_iter()
980            .map(|(_, err)| err)
981            .chain(later_errs.into_iter())
982            .map(|e| e.with_score())
983            .max_by_key(|e| e.score)
984    }
985
986    /// Report that a source has finished listing for a specific epoch
987    pub(super) fn report_source_list_finished(
988        &mut self,
989        epoch: EpochPair,
990        actor_id: ActorId,
991        table_id: TableId,
992        associated_source_id: SourceId,
993    ) {
994        // Find the correct partial graph state by matching the actor's partial graph id
995        if let Some(actor_state) = self.actor_states.get(&actor_id)
996            && actor_state.inflight_barriers.contains(&epoch.prev)
997        {
998            self.graph_state
999                .list_finished_source_ids
1000                .entry(epoch.curr)
1001                .or_default()
1002                .push(PbListFinishedSource {
1003                    reporter_actor_id: actor_id,
1004                    table_id,
1005                    associated_source_id,
1006                });
1007        } else {
1008            warn!(
1009                ?epoch,
1010                %actor_id, %table_id, %associated_source_id, "ignore source list finished"
1011            );
1012        }
1013    }
1014
1015    /// Report that a source has finished loading for a specific epoch
1016    pub(super) fn report_source_load_finished(
1017        &mut self,
1018        epoch: EpochPair,
1019        actor_id: ActorId,
1020        table_id: TableId,
1021        associated_source_id: SourceId,
1022    ) {
1023        // Find the correct partial graph state by matching the actor's partial graph id
1024        if let Some(actor_state) = self.actor_states.get(&actor_id)
1025            && actor_state.inflight_barriers.contains(&epoch.prev)
1026        {
1027            self.graph_state
1028                .load_finished_source_ids
1029                .entry(epoch.curr)
1030                .or_default()
1031                .push(PbLoadFinishedSource {
1032                    reporter_actor_id: actor_id,
1033                    table_id,
1034                    associated_source_id,
1035                });
1036        } else {
1037            warn!(
1038                ?epoch,
1039                %actor_id, %table_id, %associated_source_id, "ignore source load finished"
1040            );
1041        }
1042    }
1043
1044    /// Report that a table has finished refreshing for a specific epoch
1045    pub(super) fn report_refresh_finished(
1046        &mut self,
1047        epoch: EpochPair,
1048        actor_id: ActorId,
1049        table_id: TableId,
1050        staging_table_id: TableId,
1051    ) {
1052        // Find the correct partial graph state by matching the actor's partial graph id
1053        let Some(actor_state) = self.actor_states.get(&actor_id) else {
1054            warn!(
1055                ?epoch,
1056                %actor_id, %table_id, "ignore refresh finished table: actor_state not found"
1057            );
1058            return;
1059        };
1060        if !actor_state.inflight_barriers.contains(&epoch.prev) {
1061            warn!(
1062                ?epoch,
1063                %actor_id,
1064                %table_id,
1065                inflight_barriers = ?actor_state.inflight_barriers,
1066                "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1067            );
1068            return;
1069        };
1070        self.graph_state
1071            .refresh_finished_tables
1072            .entry(epoch.curr)
1073            .or_default()
1074            .insert(table_id);
1075        self.graph_state
1076            .truncate_tables
1077            .entry(epoch.curr)
1078            .or_default()
1079            .insert(staging_table_id);
1080    }
1081}
1082
1083impl PartialGraphManagedBarrierState {
1084    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
1085    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
1086    /// has been collected from all actors for an `Issued` barrier.
1087    fn may_have_collected_all(&mut self) -> Option<Barrier> {
1088        for barrier_state in self.epoch_barrier_state_map.values_mut() {
1089            match &barrier_state.inner {
1090                ManagedBarrierStateInner::Issued(IssuedState {
1091                    remaining_actors, ..
1092                }) if remaining_actors.is_empty() => {}
1093                ManagedBarrierStateInner::AllCollected { .. } => {
1094                    continue;
1095                }
1096                ManagedBarrierStateInner::Issued(_) => {
1097                    break;
1098                }
1099            }
1100
1101            self.streaming_metrics.barrier_manager_progress.inc();
1102
1103            let create_mview_progress = self
1104                .create_mview_progress
1105                .remove(&barrier_state.barrier.epoch.curr)
1106                .unwrap_or_default()
1107                .into_iter()
1108                .map(|(actor, (fragment_id, state))| state.to_pb(fragment_id, actor))
1109                .collect();
1110
1111            let list_finished_source_ids = self
1112                .list_finished_source_ids
1113                .remove(&barrier_state.barrier.epoch.curr)
1114                .unwrap_or_default();
1115
1116            let load_finished_source_ids = self
1117                .load_finished_source_ids
1118                .remove(&barrier_state.barrier.epoch.curr)
1119                .unwrap_or_default();
1120
1121            let cdc_table_backfill_progress = self
1122                .cdc_table_backfill_progress
1123                .remove(&barrier_state.barrier.epoch.curr)
1124                .unwrap_or_default()
1125                .into_iter()
1126                .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1127                .collect();
1128
1129            let truncate_tables = self
1130                .truncate_tables
1131                .remove(&barrier_state.barrier.epoch.curr)
1132                .unwrap_or_default()
1133                .into_iter()
1134                .collect();
1135
1136            let refresh_finished_tables = self
1137                .refresh_finished_tables
1138                .remove(&barrier_state.barrier.epoch.curr)
1139                .unwrap_or_default()
1140                .into_iter()
1141                .collect();
1142            let prev_state = replace(
1143                &mut barrier_state.inner,
1144                ManagedBarrierStateInner::AllCollected {
1145                    create_mview_progress,
1146                    list_finished_source_ids,
1147                    load_finished_source_ids,
1148                    truncate_tables,
1149                    refresh_finished_tables,
1150                    cdc_table_backfill_progress,
1151                },
1152            );
1153
1154            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1155                barrier_inflight_latency: timer,
1156                ..
1157            }) => {
1158                timer.observe_duration();
1159            });
1160
1161            return Some(barrier_state.barrier.clone());
1162        }
1163        None
1164    }
1165
1166    fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1167        let (popped_prev_epoch, barrier_state) = self
1168            .epoch_barrier_state_map
1169            .pop_first()
1170            .expect("should exist");
1171
1172        assert_eq!(prev_epoch, popped_prev_epoch);
1173
1174        let (
1175            create_mview_progress,
1176            list_finished_source_ids,
1177            load_finished_source_ids,
1178            cdc_table_backfill_progress,
1179            truncate_tables,
1180            refresh_finished_tables,
1181        ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1182            create_mview_progress,
1183            list_finished_source_ids,
1184            load_finished_source_ids,
1185            truncate_tables,
1186            refresh_finished_tables,
1187            cdc_table_backfill_progress,
1188        } => {
1189            (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, truncate_tables, refresh_finished_tables)
1190        });
1191        BarrierToComplete {
1192            barrier: barrier_state.barrier,
1193            table_ids: barrier_state.table_ids,
1194            create_mview_progress,
1195            list_finished_source_ids,
1196            load_finished_source_ids,
1197            truncate_tables,
1198            refresh_finished_tables,
1199            cdc_table_backfill_progress,
1200        }
1201    }
1202}
1203
1204pub(crate) struct BarrierToComplete {
1205    pub barrier: Barrier,
1206    pub table_ids: Option<HashSet<TableId>>,
1207    pub create_mview_progress: Vec<PbCreateMviewProgress>,
1208    pub list_finished_source_ids: Vec<PbListFinishedSource>,
1209    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
1210    pub truncate_tables: Vec<TableId>,
1211    pub refresh_finished_tables: Vec<TableId>,
1212    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1213}
1214
1215impl PartialGraphManagedBarrierState {
1216    /// Collect a `barrier` from the actor with `actor_id`.
1217    pub(super) fn collect(&mut self, actor_id: impl Into<ActorId>, epoch: EpochPair) {
1218        let actor_id = actor_id.into();
1219        tracing::debug!(
1220            target: "events::stream::barrier::manager::collect",
1221            ?epoch, %actor_id, state = ?self.epoch_barrier_state_map,
1222            "collect_barrier",
1223        );
1224
1225        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1226            None => {
1227                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1228                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1229                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1230                panic!(
1231                    "cannot collect new actor barrier {:?} at current state: None",
1232                    epoch,
1233                )
1234            }
1235            Some(&mut BarrierState {
1236                ref barrier,
1237                inner:
1238                    ManagedBarrierStateInner::Issued(IssuedState {
1239                        ref mut remaining_actors,
1240                        ..
1241                    }),
1242                ..
1243            }) => {
1244                let exist = remaining_actors.remove(&actor_id);
1245                assert!(
1246                    exist,
1247                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1248                    actor_id, epoch.curr
1249                );
1250                assert_eq!(barrier.epoch.curr, epoch.curr);
1251            }
1252            Some(BarrierState { inner, .. }) => {
1253                panic!(
1254                    "cannot collect new actor barrier {:?} at current state: {:?}",
1255                    epoch, inner
1256                )
1257            }
1258        }
1259    }
1260
1261    /// When the meta service issues a `send_barrier` request, call this function to transform to
1262    /// `Issued` and start to collect or to notify.
1263    pub(super) fn transform_to_issued(
1264        &mut self,
1265        barrier: &Barrier,
1266        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1267        table_ids: HashSet<TableId>,
1268    ) {
1269        let timer = self
1270            .streaming_metrics
1271            .barrier_inflight_latency
1272            .start_timer();
1273
1274        if let Some(hummock) = self.state_store.as_hummock() {
1275            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1276        }
1277
1278        let table_ids = match barrier.kind {
1279            BarrierKind::Unspecified => {
1280                unreachable!()
1281            }
1282            BarrierKind::Initial => {
1283                assert!(
1284                    self.prev_barrier_table_ids.is_none(),
1285                    "non empty table_ids at initial barrier: {:?}",
1286                    self.prev_barrier_table_ids
1287                );
1288                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1289                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1290                None
1291            }
1292            BarrierKind::Barrier => {
1293                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1294                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1295                    assert_eq!(prev_table_ids, &table_ids);
1296                    *prev_epoch = barrier.epoch;
1297                } else {
1298                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1299                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1300                }
1301                None
1302            }
1303            BarrierKind::Checkpoint => Some(
1304                if let Some((prev_epoch, prev_table_ids)) = self
1305                    .prev_barrier_table_ids
1306                    .replace((barrier.epoch, table_ids))
1307                    && prev_epoch.curr == barrier.epoch.prev
1308                {
1309                    prev_table_ids
1310                } else {
1311                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1312                    HashSet::new()
1313                },
1314            ),
1315        };
1316
1317        if let Some(&mut BarrierState { ref inner, .. }) =
1318            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1319        {
1320            {
1321                panic!(
1322                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1323                    barrier.epoch, inner
1324                );
1325            }
1326        };
1327
1328        self.epoch_barrier_state_map.insert(
1329            barrier.epoch.prev,
1330            BarrierState {
1331                barrier: barrier.clone(),
1332                inner: ManagedBarrierStateInner::Issued(IssuedState {
1333                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1334                    barrier_inflight_latency: timer,
1335                }),
1336                table_ids,
1337            },
1338        );
1339    }
1340
1341    #[cfg(test)]
1342    async fn pop_next_completed_epoch(&mut self) -> u64 {
1343        if let Some(barrier) = self.may_have_collected_all() {
1344            self.pop_barrier_to_complete(barrier.epoch.prev);
1345            return barrier.epoch.prev;
1346        }
1347        pending().await
1348    }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353    use std::collections::HashSet;
1354
1355    use risingwave_common::util::epoch::test_epoch;
1356
1357    use crate::executor::Barrier;
1358    use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1359
1360    #[tokio::test]
1361    async fn test_managed_state_add_actor() {
1362        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1363        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1364        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1365        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1366        let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into()]);
1367        let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into()]);
1368        let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into(), 3.into()]);
1369        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1370        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1371        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1372        managed_barrier_state.collect(1, barrier1.epoch);
1373        managed_barrier_state.collect(2, barrier1.epoch);
1374        assert_eq!(
1375            managed_barrier_state.pop_next_completed_epoch().await,
1376            test_epoch(0)
1377        );
1378        assert_eq!(
1379            managed_barrier_state
1380                .epoch_barrier_state_map
1381                .first_key_value()
1382                .unwrap()
1383                .0,
1384            &test_epoch(1)
1385        );
1386        managed_barrier_state.collect(1, barrier2.epoch);
1387        managed_barrier_state.collect(1, barrier3.epoch);
1388        managed_barrier_state.collect(2, barrier2.epoch);
1389        assert_eq!(
1390            managed_barrier_state.pop_next_completed_epoch().await,
1391            test_epoch(1)
1392        );
1393        assert_eq!(
1394            managed_barrier_state
1395                .epoch_barrier_state_map
1396                .first_key_value()
1397                .unwrap()
1398                .0,
1399            &test_epoch(2)
1400        );
1401        managed_barrier_state.collect(2, barrier3.epoch);
1402        managed_barrier_state.collect(3, barrier3.epoch);
1403        assert_eq!(
1404            managed_barrier_state.pop_next_completed_epoch().await,
1405            test_epoch(2)
1406        );
1407        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1408    }
1409
1410    #[tokio::test]
1411    async fn test_managed_state_stop_actor() {
1412        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1413        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1414        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1415        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1416        let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into(), 3.into(), 4.into()]);
1417        let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into(), 3.into()]);
1418        let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into()]);
1419        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1420        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1421        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1422
1423        managed_barrier_state.collect(1, barrier1.epoch);
1424        managed_barrier_state.collect(1, barrier2.epoch);
1425        managed_barrier_state.collect(1, barrier3.epoch);
1426        managed_barrier_state.collect(2, barrier1.epoch);
1427        managed_barrier_state.collect(2, barrier2.epoch);
1428        managed_barrier_state.collect(2, barrier3.epoch);
1429        assert_eq!(
1430            managed_barrier_state
1431                .epoch_barrier_state_map
1432                .first_key_value()
1433                .unwrap()
1434                .0,
1435            &0
1436        );
1437        managed_barrier_state.collect(3, barrier1.epoch);
1438        managed_barrier_state.collect(3, barrier2.epoch);
1439        assert_eq!(
1440            managed_barrier_state
1441                .epoch_barrier_state_map
1442                .first_key_value()
1443                .unwrap()
1444                .0,
1445            &0
1446        );
1447        managed_barrier_state.collect(4, barrier1.epoch);
1448        assert_eq!(
1449            managed_barrier_state.pop_next_completed_epoch().await,
1450            test_epoch(0)
1451        );
1452        assert_eq!(
1453            managed_barrier_state.pop_next_completed_epoch().await,
1454            test_epoch(1)
1455        );
1456        assert_eq!(
1457            managed_barrier_state.pop_next_completed_epoch().await,
1458            test_epoch(2)
1459        );
1460        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1461    }
1462}