risingwave_stream/task/barrier_worker/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::FutureExt;
25use futures::stream::FuturesOrdered;
26use prometheus::HistogramTimer;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::id::SourceId;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::{
32    PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
33};
34use risingwave_storage::StateStoreImpl;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
36use tokio::sync::{mpsc, oneshot};
37use tokio::task::JoinHandle;
38
39use crate::error::{StreamError, StreamResult};
40use crate::executor::Barrier;
41use crate::executor::monitor::StreamingMetrics;
42use crate::task::progress::BackfillState;
43use crate::task::{
44    ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
45    StreamActorManager, UpDownActorIds,
46};
47
48struct IssuedState {
49    /// Actor ids remaining to be collected.
50    pub remaining_actors: BTreeSet<ActorId>,
51
52    pub barrier_inflight_latency: HistogramTimer,
53}
54
55impl Debug for IssuedState {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("IssuedState")
58            .field("remaining_actors", &self.remaining_actors)
59            .finish()
60    }
61}
62
63/// The state machine of local barrier manager.
64#[derive(Debug)]
65enum ManagedBarrierStateInner {
66    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
67    Issued(IssuedState),
68
69    /// The barrier has been collected by all remaining actors
70    AllCollected {
71        create_mview_progress: Vec<PbCreateMviewProgress>,
72        list_finished_source_ids: Vec<PbListFinishedSource>,
73        load_finished_source_ids: Vec<PbLoadFinishedSource>,
74        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
75        truncate_tables: Vec<TableId>,
76        refresh_finished_tables: Vec<TableId>,
77    },
78}
79
80#[derive(Debug)]
81struct BarrierState {
82    barrier: Barrier,
83    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
84    table_ids: Option<HashSet<TableId>>,
85    inner: ManagedBarrierStateInner,
86}
87
88use risingwave_common::must_match;
89use risingwave_pb::id::FragmentId;
90use risingwave_pb::stream_service::InjectBarrierRequest;
91
92use crate::executor::exchange::permit;
93use crate::executor::exchange::permit::channel_from_config;
94use crate::task::barrier_worker::ScoredStreamError;
95use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
96use crate::task::cdc_progress::CdcTableBackfillState;
97
98pub(super) struct ManagedBarrierStateDebugInfo<'a> {
99    running_actors: BTreeSet<ActorId>,
100    graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
101}
102
103impl Display for ManagedBarrierStateDebugInfo<'_> {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        write!(f, "running_actors: ")?;
106        for actor_id in &self.running_actors {
107            write!(f, "{}, ", actor_id)?;
108        }
109        for (partial_graph_id, graph_states) in self.graph_states {
110            writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
111            write!(f, "{}", graph_states)?;
112        }
113        Ok(())
114    }
115}
116
117impl Display for &'_ PartialGraphManagedBarrierState {
118    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119        let mut prev_epoch = 0u64;
120        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
121            write!(f, "> Epoch {}: ", epoch)?;
122            match &barrier_state.inner {
123                ManagedBarrierStateInner::Issued(state) => {
124                    write!(
125                        f,
126                        "Issued [{:?}]. Remaining actors: [",
127                        barrier_state.barrier.kind
128                    )?;
129                    let mut is_prev_epoch_issued = false;
130                    if prev_epoch != 0 {
131                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
132                        if let ManagedBarrierStateInner::Issued(IssuedState {
133                            remaining_actors: remaining_actors_prev,
134                            ..
135                        }) = &bs.inner
136                        {
137                            // Only show the actors that are not in the previous epoch.
138                            is_prev_epoch_issued = true;
139                            let mut duplicates = 0usize;
140                            for actor_id in &state.remaining_actors {
141                                if !remaining_actors_prev.contains(actor_id) {
142                                    write!(f, "{}, ", actor_id)?;
143                                } else {
144                                    duplicates += 1;
145                                }
146                            }
147                            if duplicates > 0 {
148                                write!(f, "...and {} actors in prev epoch", duplicates)?;
149                            }
150                        }
151                    }
152                    if !is_prev_epoch_issued {
153                        for actor_id in &state.remaining_actors {
154                            write!(f, "{}, ", actor_id)?;
155                        }
156                    }
157                    write!(f, "]")?;
158                }
159                ManagedBarrierStateInner::AllCollected { .. } => {
160                    write!(f, "AllCollected")?;
161                }
162            }
163            prev_epoch = *epoch;
164            writeln!(f)?;
165        }
166
167        if !self.create_mview_progress.is_empty() {
168            writeln!(f, "Create MView Progress:")?;
169            for (epoch, progress) in &self.create_mview_progress {
170                write!(f, "> Epoch {}:", epoch)?;
171                for (actor_id, (_, state)) in progress {
172                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
173                }
174            }
175        }
176
177        Ok(())
178    }
179}
180
181enum InflightActorStatus {
182    /// The actor has been issued some barriers, but has not collected the first barrier
183    IssuedFirst(Vec<Barrier>),
184    /// The actor has been issued some barriers, and has collected the first barrier
185    Running(u64),
186}
187
188impl InflightActorStatus {
189    fn max_issued_epoch(&self) -> u64 {
190        match self {
191            InflightActorStatus::Running(epoch) => *epoch,
192            InflightActorStatus::IssuedFirst(issued_barriers) => {
193                issued_barriers.last().expect("non-empty").epoch.prev
194            }
195        }
196    }
197}
198
199pub(crate) struct InflightActorState {
200    actor_id: ActorId,
201    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
202    /// `prev_epoch` -> partial graph id
203    pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
204    status: InflightActorStatus,
205    /// Whether the actor has been issued a stop barrier
206    is_stopping: bool,
207
208    new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
209    join_handle: JoinHandle<()>,
210    monitor_task_handle: Option<JoinHandle<()>>,
211}
212
213impl InflightActorState {
214    pub(super) fn start(
215        actor_id: ActorId,
216        initial_partial_graph_id: PartialGraphId,
217        initial_barrier: &Barrier,
218        new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
219        join_handle: JoinHandle<()>,
220        monitor_task_handle: Option<JoinHandle<()>>,
221    ) -> Self {
222        Self {
223            actor_id,
224            barrier_senders: vec![],
225            inflight_barriers: BTreeMap::from_iter([(
226                initial_barrier.epoch.prev,
227                initial_partial_graph_id,
228            )]),
229            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
230            is_stopping: false,
231            new_output_request_tx,
232            join_handle,
233            monitor_task_handle,
234        }
235    }
236
237    pub(super) fn issue_barrier(
238        &mut self,
239        partial_graph_id: PartialGraphId,
240        barrier: &Barrier,
241        is_stop: bool,
242    ) -> StreamResult<()> {
243        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
244
245        for barrier_sender in &self.barrier_senders {
246            barrier_sender.send(barrier.clone()).map_err(|_| {
247                StreamError::barrier_send(
248                    barrier.clone(),
249                    self.actor_id,
250                    "failed to send to registered sender",
251                )
252            })?;
253        }
254
255        assert!(
256            self.inflight_barriers
257                .insert(barrier.epoch.prev, partial_graph_id)
258                .is_none()
259        );
260
261        match &mut self.status {
262            InflightActorStatus::IssuedFirst(pending_barriers) => {
263                pending_barriers.push(barrier.clone());
264            }
265            InflightActorStatus::Running(prev_epoch) => {
266                *prev_epoch = barrier.epoch.prev;
267            }
268        };
269
270        if is_stop {
271            assert!(!self.is_stopping, "stopped actor should not issue barrier");
272            self.is_stopping = true;
273        }
274        Ok(())
275    }
276
277    pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
278        let (prev_epoch, prev_partial_graph_id) =
279            self.inflight_barriers.pop_first().expect("should exist");
280        assert_eq!(prev_epoch, epoch.prev);
281        match &self.status {
282            InflightActorStatus::IssuedFirst(pending_barriers) => {
283                assert_eq!(
284                    prev_epoch,
285                    pending_barriers.first().expect("non-empty").epoch.prev
286                );
287                self.status = InflightActorStatus::Running(
288                    pending_barriers.last().expect("non-empty").epoch.prev,
289                );
290            }
291            InflightActorStatus::Running(_) => {}
292        }
293        (
294            prev_partial_graph_id,
295            self.inflight_barriers.is_empty() && self.is_stopping,
296        )
297    }
298}
299
300/// Part of [`DatabaseManagedBarrierState`]
301pub(crate) struct PartialGraphManagedBarrierState {
302    /// Record barrier state for each epoch of concurrent checkpoints.
303    ///
304    /// The key is `prev_epoch`, and the first value is `curr_epoch`
305    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
306
307    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
308
309    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
310    ///
311    /// The process of progress reporting is as follows:
312    /// 1. updated by [`crate::task::barrier_manager::CreateMviewProgressReporter::update`]
313    /// 2. converted to [`ManagedBarrierStateInner`] in [`Self::may_have_collected_all`]
314    /// 3. handled by [`Self::pop_barrier_to_complete`]
315    /// 4. put in [`crate::task::barrier_worker::BarrierCompleteResult`] and reported to meta.
316    pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, (FragmentId, BackfillState)>>,
317
318    /// Record the source list finished reports for each epoch of concurrent checkpoints.
319    /// Used for refreshable batch source. The map key is epoch and the value is
320    /// a list of pb messages reported by actors.
321    pub(crate) list_finished_source_ids: HashMap<u64, Vec<PbListFinishedSource>>,
322
323    /// Record the source load finished reports for each epoch of concurrent checkpoints.
324    /// Used for refreshable batch source. The map key is epoch and the value is
325    /// a list of pb messages reported by actors.
326    pub(crate) load_finished_source_ids: HashMap<u64, Vec<PbLoadFinishedSource>>,
327
328    pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
329
330    /// Record the tables to truncate for each epoch of concurrent checkpoints.
331    pub(crate) truncate_tables: HashMap<u64, HashSet<TableId>>,
332    /// Record the tables that have finished refresh for each epoch of concurrent checkpoints.
333    /// Used for materialized view refresh completion reporting.
334    pub(crate) refresh_finished_tables: HashMap<u64, HashSet<TableId>>,
335
336    state_store: StateStoreImpl,
337
338    streaming_metrics: Arc<StreamingMetrics>,
339}
340
341impl PartialGraphManagedBarrierState {
342    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
343        Self::new_inner(
344            actor_manager.env.state_store(),
345            actor_manager.streaming_metrics.clone(),
346        )
347    }
348
349    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
350        Self {
351            epoch_barrier_state_map: Default::default(),
352            prev_barrier_table_ids: None,
353            create_mview_progress: Default::default(),
354            list_finished_source_ids: Default::default(),
355            load_finished_source_ids: Default::default(),
356            cdc_table_backfill_progress: Default::default(),
357            truncate_tables: Default::default(),
358            refresh_finished_tables: Default::default(),
359            state_store,
360            streaming_metrics,
361        }
362    }
363
364    #[cfg(test)]
365    pub(crate) fn for_test() -> Self {
366        Self::new_inner(
367            StateStoreImpl::for_test(),
368            Arc::new(StreamingMetrics::unused()),
369        )
370    }
371
372    pub(super) fn is_empty(&self) -> bool {
373        self.epoch_barrier_state_map.is_empty()
374    }
375}
376
377pub(crate) struct SuspendedDatabaseState {
378    pub(super) suspend_time: Instant,
379    inner: DatabaseManagedBarrierState,
380    failure: Option<(Option<ActorId>, StreamError)>,
381}
382
383impl SuspendedDatabaseState {
384    fn new(
385        state: DatabaseManagedBarrierState,
386        failure: Option<(Option<ActorId>, StreamError)>,
387        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
388    ) -> Self {
389        Self {
390            suspend_time: Instant::now(),
391            inner: state,
392            failure,
393        }
394    }
395
396    async fn reset(mut self) -> ResetDatabaseOutput {
397        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
398        self.inner.abort_and_wait_actors().await;
399        if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
400            hummock.clear_tables(self.inner.table_ids).await;
401        }
402        ResetDatabaseOutput { root_err }
403    }
404}
405
406pub(crate) struct ResettingDatabaseState {
407    join_handle: JoinHandle<ResetDatabaseOutput>,
408    reset_request_id: u32,
409}
410
411pub(crate) struct ResetDatabaseOutput {
412    pub(crate) root_err: Option<ScoredStreamError>,
413}
414
415pub(crate) enum DatabaseStatus {
416    ReceivedExchangeRequest(
417        Vec<(
418            UpDownActorIds,
419            oneshot::Sender<StreamResult<permit::Receiver>>,
420        )>,
421    ),
422    Running(DatabaseManagedBarrierState),
423    Suspended(SuspendedDatabaseState),
424    Resetting(ResettingDatabaseState),
425    /// temporary place holder
426    Unspecified,
427}
428
429impl DatabaseStatus {
430    pub(crate) async fn abort(&mut self) {
431        match self {
432            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
433                for (_, sender) in pending_requests.drain(..) {
434                    let _ = sender.send(Err(anyhow!("database aborted").into()));
435                }
436            }
437            DatabaseStatus::Running(state) => {
438                state.abort_and_wait_actors().await;
439            }
440            DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
441                state.abort_and_wait_actors().await;
442            }
443            DatabaseStatus::Resetting(state) => {
444                (&mut state.join_handle)
445                    .await
446                    .expect("failed to join reset database join handle");
447            }
448            DatabaseStatus::Unspecified => {
449                unreachable!()
450            }
451        }
452    }
453
454    pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
455        match self {
456            DatabaseStatus::ReceivedExchangeRequest(_) => {
457                unreachable!("should not handle request")
458            }
459            DatabaseStatus::Running(state) => Some(state),
460            DatabaseStatus::Suspended(_) => None,
461            DatabaseStatus::Resetting(_) => {
462                unreachable!("should not receive further request during cleaning")
463            }
464            DatabaseStatus::Unspecified => {
465                unreachable!()
466            }
467        }
468    }
469
470    pub(super) fn poll_next_event(
471        &mut self,
472        cx: &mut Context<'_>,
473    ) -> Poll<ManagedBarrierStateEvent> {
474        match self {
475            DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
476            DatabaseStatus::Running(state) => state.poll_next_event(cx),
477            DatabaseStatus::Suspended(_) => Poll::Pending,
478            DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
479                let output = result.expect("should be able to join");
480                ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
481            }),
482            DatabaseStatus::Unspecified => {
483                unreachable!()
484            }
485        }
486    }
487
488    pub(super) fn suspend(
489        &mut self,
490        failed_actor: Option<ActorId>,
491        err: StreamError,
492        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
493    ) {
494        let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
495        *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
496            state,
497            Some((failed_actor, err)),
498            completing_futures,
499        ));
500    }
501
502    pub(super) fn start_reset(
503        &mut self,
504        database_id: DatabaseId,
505        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
506        reset_request_id: u32,
507    ) {
508        let join_handle = match replace(self, DatabaseStatus::Unspecified) {
509            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
510                for (_, sender) in pending_requests {
511                    let _ = sender.send(Err(anyhow!("database reset").into()));
512                }
513                tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
514            }
515            DatabaseStatus::Running(state) => {
516                assert_eq!(database_id, state.database_id);
517                info!(
518                    %database_id,
519                    reset_request_id, "start database reset from Running"
520                );
521                tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
522            }
523            DatabaseStatus::Suspended(state) => {
524                assert!(
525                    completing_futures.is_none(),
526                    "should have been clear when suspended"
527                );
528                assert_eq!(database_id, state.inner.database_id);
529                info!(
530                    %database_id,
531                    reset_request_id,
532                    suspend_elapsed = ?state.suspend_time.elapsed(),
533                    "start database reset after suspended"
534                );
535                tokio::spawn(state.reset())
536            }
537            DatabaseStatus::Resetting(state) => {
538                let prev_request_id = state.reset_request_id;
539                info!(
540                    %database_id,
541                    reset_request_id, prev_request_id, "receive duplicate reset request"
542                );
543                assert!(reset_request_id > prev_request_id);
544                state.join_handle
545            }
546            DatabaseStatus::Unspecified => {
547                unreachable!()
548            }
549        };
550        *self = DatabaseStatus::Resetting(ResettingDatabaseState {
551            join_handle,
552            reset_request_id,
553        });
554    }
555}
556
557#[derive(Default)]
558pub(crate) struct ManagedBarrierState {
559    pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
560}
561
562pub(super) enum ManagedBarrierStateEvent {
563    BarrierCollected {
564        partial_graph_id: PartialGraphId,
565        barrier: Barrier,
566    },
567    ActorError {
568        actor_id: ActorId,
569        err: StreamError,
570    },
571    DatabaseReset(ResetDatabaseOutput, u32),
572}
573
574impl ManagedBarrierState {
575    pub(super) fn next_event(
576        &mut self,
577    ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
578        poll_fn(|cx| {
579            for (database_id, database) in &mut self.databases {
580                if let Poll::Ready(event) = database.poll_next_event(cx) {
581                    return Poll::Ready((*database_id, event));
582                }
583            }
584            Poll::Pending
585        })
586    }
587}
588
589/// Per-database barrier state manager. Handles barriers for one specific database.
590/// Part of [`ManagedBarrierState`] in [`super::LocalBarrierWorker`].
591///
592/// See [`crate::task`] for architecture overview.
593pub(crate) struct DatabaseManagedBarrierState {
594    database_id: DatabaseId,
595    pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
596    pub(super) actor_pending_new_output_requests:
597        HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
598
599    pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
600
601    table_ids: HashSet<TableId>,
602
603    actor_manager: Arc<StreamActorManager>,
604
605    pub(super) local_barrier_manager: LocalBarrierManager,
606
607    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
608    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
609}
610
611impl DatabaseManagedBarrierState {
612    /// Create a barrier manager state. This will be called only once.
613    pub(super) fn new(
614        database_id: DatabaseId,
615        term_id: String,
616        actor_manager: Arc<StreamActorManager>,
617    ) -> Self {
618        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
619            LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
620        Self {
621            database_id,
622            actor_states: Default::default(),
623            actor_pending_new_output_requests: Default::default(),
624            graph_states: Default::default(),
625            table_ids: Default::default(),
626            actor_manager,
627            local_barrier_manager,
628            barrier_event_rx,
629            actor_failure_rx,
630        }
631    }
632
633    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
634        ManagedBarrierStateDebugInfo {
635            running_actors: self.actor_states.keys().cloned().collect(),
636            graph_states: &self.graph_states,
637        }
638    }
639
640    async fn abort_and_wait_actors(&mut self) {
641        for (actor_id, state) in &self.actor_states {
642            tracing::debug!("force stopping actor {}", actor_id);
643            state.join_handle.abort();
644            if let Some(monitor_task_handle) = &state.monitor_task_handle {
645                monitor_task_handle.abort();
646            }
647        }
648
649        for (actor_id, state) in self.actor_states.drain() {
650            tracing::debug!("join actor {}", actor_id);
651            let result = state.join_handle.await;
652            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
653        }
654    }
655}
656
657impl InflightActorState {
658    pub(super) fn register_barrier_sender(
659        &mut self,
660        tx: mpsc::UnboundedSender<Barrier>,
661    ) -> StreamResult<()> {
662        match &self.status {
663            InflightActorStatus::IssuedFirst(pending_barriers) => {
664                for barrier in pending_barriers {
665                    tx.send(barrier.clone()).map_err(|_| {
666                        StreamError::barrier_send(
667                            barrier.clone(),
668                            self.actor_id,
669                            "failed to send pending barriers to newly registered sender",
670                        )
671                    })?;
672                }
673                self.barrier_senders.push(tx);
674            }
675            InflightActorStatus::Running(_) => {
676                unreachable!("should not register barrier sender when entering Running status")
677            }
678        }
679        Ok(())
680    }
681}
682
683impl DatabaseManagedBarrierState {
684    pub(super) fn register_barrier_sender(
685        &mut self,
686        actor_id: ActorId,
687        tx: mpsc::UnboundedSender<Barrier>,
688    ) -> StreamResult<()> {
689        self.actor_states
690            .get_mut(&actor_id)
691            .expect("should exist")
692            .register_barrier_sender(tx)
693    }
694}
695
696impl DatabaseManagedBarrierState {
697    pub(super) fn transform_to_issued(
698        &mut self,
699        barrier: &Barrier,
700        request: InjectBarrierRequest,
701    ) -> StreamResult<()> {
702        let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
703        let actor_to_stop = barrier.all_stop_actors();
704        let is_stop_actor = |actor_id| {
705            actor_to_stop
706                .map(|actors| actors.contains(&actor_id))
707                .unwrap_or(false)
708        };
709        let graph_state = self
710            .graph_states
711            .get_mut(&partial_graph_id)
712            .expect("should exist");
713
714        let table_ids = HashSet::from_iter(request.table_ids_to_sync);
715        self.table_ids.extend(table_ids.iter().cloned());
716
717        graph_state.transform_to_issued(
718            barrier,
719            request.actor_ids_to_collect.iter().copied(),
720            table_ids,
721        );
722
723        let mut new_actors = HashSet::new();
724        for (node, fragment_id, actor) in
725            request
726                .actors_to_build
727                .into_iter()
728                .flat_map(|fragment_actors| {
729                    let node = Arc::new(fragment_actors.node.unwrap());
730                    fragment_actors
731                        .actors
732                        .into_iter()
733                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
734                })
735        {
736            let actor_id = actor.actor_id;
737            assert!(!is_stop_actor(actor_id));
738            assert!(new_actors.insert(actor_id));
739            assert!(request.actor_ids_to_collect.contains(&actor_id));
740            let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
741            if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
742            {
743                for request in pending_requests {
744                    let _ = new_output_request_tx.send(request);
745                }
746            }
747            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
748                actor,
749                fragment_id,
750                node,
751                self.local_barrier_manager.clone(),
752                new_output_request_rx,
753            );
754            assert!(
755                self.actor_states
756                    .try_insert(
757                        actor_id,
758                        InflightActorState::start(
759                            actor_id,
760                            partial_graph_id,
761                            barrier,
762                            new_output_request_tx,
763                            join_handle,
764                            monitor_join_handle
765                        )
766                    )
767                    .is_ok()
768            );
769        }
770
771        // Spawn a trivial join handle to be compatible with the unit test. In the unit tests that involve local barrier manager,
772        // actors are spawned in the local test logic, but we assume that there is an entry for each spawned actor in ·actor_states`,
773        // so under cfg!(test) we add a dummy entry for each new actor.
774        if cfg!(test) {
775            for &actor_id in &request.actor_ids_to_collect {
776                if !self.actor_states.contains_key(&actor_id) {
777                    let (tx, rx) = unbounded_channel();
778                    let join_handle = self.actor_manager.runtime.spawn(async move {
779                        // The rx is spawned so that tx.send() will not fail.
780                        let _ = rx;
781                        pending().await
782                    });
783                    assert!(
784                        self.actor_states
785                            .try_insert(
786                                actor_id,
787                                InflightActorState::start(
788                                    actor_id,
789                                    partial_graph_id,
790                                    barrier,
791                                    tx,
792                                    join_handle,
793                                    None,
794                                )
795                            )
796                            .is_ok()
797                    );
798                    new_actors.insert(actor_id);
799                }
800            }
801        }
802
803        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
804        // we call `start_epoch` on the graph before the actors receive the barrier
805        for &actor_id in &request.actor_ids_to_collect {
806            if new_actors.contains(&actor_id) {
807                continue;
808            }
809            self.actor_states
810                .get_mut(&actor_id)
811                .unwrap_or_else(|| {
812                    panic!(
813                        "should exist: {} {:?}",
814                        actor_id, request.actor_ids_to_collect
815                    );
816                })
817                .issue_barrier(partial_graph_id, barrier, is_stop_actor(actor_id))?;
818        }
819
820        Ok(())
821    }
822
823    pub(super) fn new_actor_remote_output_request(
824        &mut self,
825        actor_id: ActorId,
826        upstream_actor_id: ActorId,
827        result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
828    ) {
829        let (tx, rx) = channel_from_config(self.local_barrier_manager.env.global_config());
830        self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
831        let _ = result_sender.send(Ok(rx));
832    }
833
834    pub(super) fn new_actor_output_request(
835        &mut self,
836        actor_id: ActorId,
837        upstream_actor_id: ActorId,
838        request: NewOutputRequest,
839    ) {
840        if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
841            let _ = actor.new_output_request_tx.send((actor_id, request));
842        } else {
843            self.actor_pending_new_output_requests
844                .entry(upstream_actor_id)
845                .or_default()
846                .push((actor_id, request));
847        }
848    }
849
850    /// Handles [`LocalBarrierEvent`] from [`crate::task::barrier_manager::LocalBarrierManager`].
851    pub(super) fn poll_next_event(
852        &mut self,
853        cx: &mut Context<'_>,
854    ) -> Poll<ManagedBarrierStateEvent> {
855        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
856            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
857            return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
858        }
859        // yield some pending collected epochs
860        for (partial_graph_id, graph_state) in &mut self.graph_states {
861            if let Some(barrier) = graph_state.may_have_collected_all() {
862                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
863                    partial_graph_id: *partial_graph_id,
864                    barrier,
865                });
866            }
867        }
868        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
869            match event.expect("non-empty when tx in local_barrier_manager") {
870                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
871                    if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
872                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
873                            partial_graph_id,
874                            barrier,
875                        });
876                    }
877                }
878                LocalBarrierEvent::ReportCreateProgress {
879                    epoch,
880                    fragment_id,
881                    actor,
882                    state,
883                } => {
884                    self.update_create_mview_progress(epoch, fragment_id, actor, state);
885                }
886                LocalBarrierEvent::ReportSourceListFinished {
887                    epoch,
888                    actor_id,
889                    table_id,
890                    associated_source_id,
891                } => {
892                    self.report_source_list_finished(
893                        epoch,
894                        actor_id,
895                        table_id,
896                        associated_source_id,
897                    );
898                }
899                LocalBarrierEvent::ReportSourceLoadFinished {
900                    epoch,
901                    actor_id,
902                    table_id,
903                    associated_source_id,
904                } => {
905                    self.report_source_load_finished(
906                        epoch,
907                        actor_id,
908                        table_id,
909                        associated_source_id,
910                    );
911                }
912                LocalBarrierEvent::RefreshFinished {
913                    epoch,
914                    actor_id,
915                    table_id,
916                    staging_table_id,
917                } => {
918                    self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
919                }
920                LocalBarrierEvent::RegisterBarrierSender {
921                    actor_id,
922                    barrier_sender,
923                } => {
924                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
925                        return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
926                    }
927                }
928                LocalBarrierEvent::RegisterLocalUpstreamOutput {
929                    actor_id,
930                    upstream_actor_id,
931                    tx,
932                } => {
933                    self.new_actor_output_request(
934                        actor_id,
935                        upstream_actor_id,
936                        NewOutputRequest::Local(tx),
937                    );
938                }
939                LocalBarrierEvent::ReportCdcTableBackfillProgress {
940                    actor_id,
941                    epoch,
942                    state,
943                } => {
944                    self.update_cdc_table_backfill_progress(epoch, actor_id, state);
945                }
946            }
947        }
948
949        debug_assert!(
950            self.graph_states
951                .values_mut()
952                .all(|graph_state| graph_state.may_have_collected_all().is_none())
953        );
954        Poll::Pending
955    }
956}
957
958impl DatabaseManagedBarrierState {
959    #[must_use]
960    pub(super) fn collect(
961        &mut self,
962        actor_id: ActorId,
963        epoch: EpochPair,
964    ) -> Option<(PartialGraphId, Barrier)> {
965        let (prev_partial_graph_id, is_finished) = self
966            .actor_states
967            .get_mut(&actor_id)
968            .expect("should exist")
969            .collect(epoch);
970        if is_finished {
971            let state = self.actor_states.remove(&actor_id).expect("should exist");
972            if let Some(monitor_task_handle) = state.monitor_task_handle {
973                monitor_task_handle.abort();
974            }
975        }
976        let prev_graph_state = self
977            .graph_states
978            .get_mut(&prev_partial_graph_id)
979            .expect("should exist");
980        prev_graph_state.collect(actor_id, epoch);
981        prev_graph_state
982            .may_have_collected_all()
983            .map(|barrier| (prev_partial_graph_id, barrier))
984    }
985
986    #[allow(clippy::type_complexity)]
987    pub(super) fn pop_barrier_to_complete(
988        &mut self,
989        partial_graph_id: PartialGraphId,
990        prev_epoch: u64,
991    ) -> BarrierToComplete {
992        self.graph_states
993            .get_mut(&partial_graph_id)
994            .expect("should exist")
995            .pop_barrier_to_complete(prev_epoch)
996    }
997
998    /// Collect actor errors for a while and find the one that might be the root cause.
999    ///
1000    /// Returns `None` if there's no actor error received.
1001    async fn try_find_root_actor_failure(
1002        &mut self,
1003        first_failure: Option<(Option<ActorId>, StreamError)>,
1004    ) -> Option<ScoredStreamError> {
1005        let mut later_errs = vec![];
1006        // fetch more actor errors within a timeout
1007        let _ = tokio::time::timeout(Duration::from_secs(3), async {
1008            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1009            if let Some((Some(failed_actor), _)) = &first_failure {
1010                uncollected_actors.remove(failed_actor);
1011            }
1012            while !uncollected_actors.is_empty()
1013                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1014            {
1015                uncollected_actors.remove(&actor_id);
1016                later_errs.push(error);
1017            }
1018        })
1019        .await;
1020
1021        first_failure
1022            .into_iter()
1023            .map(|(_, err)| err)
1024            .chain(later_errs.into_iter())
1025            .map(|e| e.with_score())
1026            .max_by_key(|e| e.score)
1027    }
1028
1029    /// Report that a source has finished listing for a specific epoch
1030    pub(super) fn report_source_list_finished(
1031        &mut self,
1032        epoch: EpochPair,
1033        actor_id: ActorId,
1034        table_id: TableId,
1035        associated_source_id: SourceId,
1036    ) {
1037        // Find the correct partial graph state by matching the actor's partial graph id
1038        if let Some(actor_state) = self.actor_states.get(&actor_id)
1039            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1040            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1041        {
1042            graph_state
1043                .list_finished_source_ids
1044                .entry(epoch.curr)
1045                .or_default()
1046                .push(PbListFinishedSource {
1047                    reporter_actor_id: actor_id,
1048                    table_id,
1049                    associated_source_id,
1050                });
1051        } else {
1052            warn!(
1053                ?epoch,
1054                %actor_id, %table_id, %associated_source_id, "ignore source list finished"
1055            );
1056        }
1057    }
1058
1059    /// Report that a source has finished loading for a specific epoch
1060    pub(super) fn report_source_load_finished(
1061        &mut self,
1062        epoch: EpochPair,
1063        actor_id: ActorId,
1064        table_id: TableId,
1065        associated_source_id: SourceId,
1066    ) {
1067        // Find the correct partial graph state by matching the actor's partial graph id
1068        if let Some(actor_state) = self.actor_states.get(&actor_id)
1069            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1070            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1071        {
1072            graph_state
1073                .load_finished_source_ids
1074                .entry(epoch.curr)
1075                .or_default()
1076                .push(PbLoadFinishedSource {
1077                    reporter_actor_id: actor_id,
1078                    table_id,
1079                    associated_source_id,
1080                });
1081        } else {
1082            warn!(
1083                ?epoch,
1084                %actor_id, %table_id, %associated_source_id, "ignore source load finished"
1085            );
1086        }
1087    }
1088
1089    /// Report that a table has finished refreshing for a specific epoch
1090    pub(super) fn report_refresh_finished(
1091        &mut self,
1092        epoch: EpochPair,
1093        actor_id: ActorId,
1094        table_id: TableId,
1095        staging_table_id: TableId,
1096    ) {
1097        // Find the correct partial graph state by matching the actor's partial graph id
1098        let Some(actor_state) = self.actor_states.get(&actor_id) else {
1099            warn!(
1100                ?epoch,
1101                %actor_id, %table_id, "ignore refresh finished table: actor_state not found"
1102            );
1103            return;
1104        };
1105        let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) else {
1106            let inflight_barriers = actor_state.inflight_barriers.keys().collect::<Vec<_>>();
1107            warn!(
1108                ?epoch,
1109                %actor_id,
1110                %table_id,
1111                ?inflight_barriers,
1112                "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1113            );
1114            return;
1115        };
1116        let Some(graph_state) = self.graph_states.get_mut(partial_graph_id) else {
1117            warn!(
1118                ?epoch,
1119                %actor_id, %table_id, "ignore refresh finished table: graph_state not found"
1120            );
1121            return;
1122        };
1123        graph_state
1124            .refresh_finished_tables
1125            .entry(epoch.curr)
1126            .or_default()
1127            .insert(table_id);
1128        graph_state
1129            .truncate_tables
1130            .entry(epoch.curr)
1131            .or_default()
1132            .insert(staging_table_id);
1133    }
1134}
1135
1136impl PartialGraphManagedBarrierState {
1137    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
1138    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
1139    /// has been collected from all actors for an `Issued` barrier.
1140    fn may_have_collected_all(&mut self) -> Option<Barrier> {
1141        for barrier_state in self.epoch_barrier_state_map.values_mut() {
1142            match &barrier_state.inner {
1143                ManagedBarrierStateInner::Issued(IssuedState {
1144                    remaining_actors, ..
1145                }) if remaining_actors.is_empty() => {}
1146                ManagedBarrierStateInner::AllCollected { .. } => {
1147                    continue;
1148                }
1149                ManagedBarrierStateInner::Issued(_) => {
1150                    break;
1151                }
1152            }
1153
1154            self.streaming_metrics.barrier_manager_progress.inc();
1155
1156            let create_mview_progress = self
1157                .create_mview_progress
1158                .remove(&barrier_state.barrier.epoch.curr)
1159                .unwrap_or_default()
1160                .into_iter()
1161                .map(|(actor, (fragment_id, state))| state.to_pb(fragment_id, actor))
1162                .collect();
1163
1164            let list_finished_source_ids = self
1165                .list_finished_source_ids
1166                .remove(&barrier_state.barrier.epoch.curr)
1167                .unwrap_or_default();
1168
1169            let load_finished_source_ids = self
1170                .load_finished_source_ids
1171                .remove(&barrier_state.barrier.epoch.curr)
1172                .unwrap_or_default();
1173
1174            let cdc_table_backfill_progress = self
1175                .cdc_table_backfill_progress
1176                .remove(&barrier_state.barrier.epoch.curr)
1177                .unwrap_or_default()
1178                .into_iter()
1179                .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1180                .collect();
1181
1182            let truncate_tables = self
1183                .truncate_tables
1184                .remove(&barrier_state.barrier.epoch.curr)
1185                .unwrap_or_default()
1186                .into_iter()
1187                .collect();
1188
1189            let refresh_finished_tables = self
1190                .refresh_finished_tables
1191                .remove(&barrier_state.barrier.epoch.curr)
1192                .unwrap_or_default()
1193                .into_iter()
1194                .collect();
1195            let prev_state = replace(
1196                &mut barrier_state.inner,
1197                ManagedBarrierStateInner::AllCollected {
1198                    create_mview_progress,
1199                    list_finished_source_ids,
1200                    load_finished_source_ids,
1201                    truncate_tables,
1202                    refresh_finished_tables,
1203                    cdc_table_backfill_progress,
1204                },
1205            );
1206
1207            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1208                barrier_inflight_latency: timer,
1209                ..
1210            }) => {
1211                timer.observe_duration();
1212            });
1213
1214            return Some(barrier_state.barrier.clone());
1215        }
1216        None
1217    }
1218
1219    fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1220        let (popped_prev_epoch, barrier_state) = self
1221            .epoch_barrier_state_map
1222            .pop_first()
1223            .expect("should exist");
1224
1225        assert_eq!(prev_epoch, popped_prev_epoch);
1226
1227        let (
1228            create_mview_progress,
1229            list_finished_source_ids,
1230            load_finished_source_ids,
1231            cdc_table_backfill_progress,
1232            truncate_tables,
1233            refresh_finished_tables,
1234        ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1235            create_mview_progress,
1236            list_finished_source_ids,
1237            load_finished_source_ids,
1238            truncate_tables,
1239            refresh_finished_tables,
1240            cdc_table_backfill_progress,
1241        } => {
1242            (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, truncate_tables, refresh_finished_tables)
1243        });
1244        BarrierToComplete {
1245            barrier: barrier_state.barrier,
1246            table_ids: barrier_state.table_ids,
1247            create_mview_progress,
1248            list_finished_source_ids,
1249            load_finished_source_ids,
1250            truncate_tables,
1251            refresh_finished_tables,
1252            cdc_table_backfill_progress,
1253        }
1254    }
1255}
1256
1257pub(crate) struct BarrierToComplete {
1258    pub barrier: Barrier,
1259    pub table_ids: Option<HashSet<TableId>>,
1260    pub create_mview_progress: Vec<PbCreateMviewProgress>,
1261    pub list_finished_source_ids: Vec<PbListFinishedSource>,
1262    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
1263    pub truncate_tables: Vec<TableId>,
1264    pub refresh_finished_tables: Vec<TableId>,
1265    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1266}
1267
1268impl PartialGraphManagedBarrierState {
1269    /// Collect a `barrier` from the actor with `actor_id`.
1270    pub(super) fn collect(&mut self, actor_id: impl Into<ActorId>, epoch: EpochPair) {
1271        let actor_id = actor_id.into();
1272        tracing::debug!(
1273            target: "events::stream::barrier::manager::collect",
1274            ?epoch, %actor_id, state = ?self.epoch_barrier_state_map,
1275            "collect_barrier",
1276        );
1277
1278        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1279            None => {
1280                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1281                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1282                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1283                panic!(
1284                    "cannot collect new actor barrier {:?} at current state: None",
1285                    epoch,
1286                )
1287            }
1288            Some(&mut BarrierState {
1289                ref barrier,
1290                inner:
1291                    ManagedBarrierStateInner::Issued(IssuedState {
1292                        ref mut remaining_actors,
1293                        ..
1294                    }),
1295                ..
1296            }) => {
1297                let exist = remaining_actors.remove(&actor_id);
1298                assert!(
1299                    exist,
1300                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1301                    actor_id, epoch.curr
1302                );
1303                assert_eq!(barrier.epoch.curr, epoch.curr);
1304            }
1305            Some(BarrierState { inner, .. }) => {
1306                panic!(
1307                    "cannot collect new actor barrier {:?} at current state: {:?}",
1308                    epoch, inner
1309                )
1310            }
1311        }
1312    }
1313
1314    /// When the meta service issues a `send_barrier` request, call this function to transform to
1315    /// `Issued` and start to collect or to notify.
1316    pub(super) fn transform_to_issued(
1317        &mut self,
1318        barrier: &Barrier,
1319        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1320        table_ids: HashSet<TableId>,
1321    ) {
1322        let timer = self
1323            .streaming_metrics
1324            .barrier_inflight_latency
1325            .start_timer();
1326
1327        if let Some(hummock) = self.state_store.as_hummock() {
1328            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1329        }
1330
1331        let table_ids = match barrier.kind {
1332            BarrierKind::Unspecified => {
1333                unreachable!()
1334            }
1335            BarrierKind::Initial => {
1336                assert!(
1337                    self.prev_barrier_table_ids.is_none(),
1338                    "non empty table_ids at initial barrier: {:?}",
1339                    self.prev_barrier_table_ids
1340                );
1341                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1342                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1343                None
1344            }
1345            BarrierKind::Barrier => {
1346                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1347                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1348                    assert_eq!(prev_table_ids, &table_ids);
1349                    *prev_epoch = barrier.epoch;
1350                } else {
1351                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1352                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1353                }
1354                None
1355            }
1356            BarrierKind::Checkpoint => Some(
1357                if let Some((prev_epoch, prev_table_ids)) = self
1358                    .prev_barrier_table_ids
1359                    .replace((barrier.epoch, table_ids))
1360                    && prev_epoch.curr == barrier.epoch.prev
1361                {
1362                    prev_table_ids
1363                } else {
1364                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1365                    HashSet::new()
1366                },
1367            ),
1368        };
1369
1370        if let Some(&mut BarrierState { ref inner, .. }) =
1371            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1372        {
1373            {
1374                panic!(
1375                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1376                    barrier.epoch, inner
1377                );
1378            }
1379        };
1380
1381        self.epoch_barrier_state_map.insert(
1382            barrier.epoch.prev,
1383            BarrierState {
1384                barrier: barrier.clone(),
1385                inner: ManagedBarrierStateInner::Issued(IssuedState {
1386                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1387                    barrier_inflight_latency: timer,
1388                }),
1389                table_ids,
1390            },
1391        );
1392    }
1393
1394    #[cfg(test)]
1395    async fn pop_next_completed_epoch(&mut self) -> u64 {
1396        if let Some(barrier) = self.may_have_collected_all() {
1397            self.pop_barrier_to_complete(barrier.epoch.prev);
1398            return barrier.epoch.prev;
1399        }
1400        pending().await
1401    }
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406    use std::collections::HashSet;
1407
1408    use risingwave_common::util::epoch::test_epoch;
1409
1410    use crate::executor::Barrier;
1411    use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1412
1413    #[tokio::test]
1414    async fn test_managed_state_add_actor() {
1415        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1416        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1417        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1418        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1419        let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into()]);
1420        let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into()]);
1421        let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into(), 3.into()]);
1422        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1423        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1424        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1425        managed_barrier_state.collect(1, barrier1.epoch);
1426        managed_barrier_state.collect(2, barrier1.epoch);
1427        assert_eq!(
1428            managed_barrier_state.pop_next_completed_epoch().await,
1429            test_epoch(0)
1430        );
1431        assert_eq!(
1432            managed_barrier_state
1433                .epoch_barrier_state_map
1434                .first_key_value()
1435                .unwrap()
1436                .0,
1437            &test_epoch(1)
1438        );
1439        managed_barrier_state.collect(1, barrier2.epoch);
1440        managed_barrier_state.collect(1, barrier3.epoch);
1441        managed_barrier_state.collect(2, barrier2.epoch);
1442        assert_eq!(
1443            managed_barrier_state.pop_next_completed_epoch().await,
1444            test_epoch(1)
1445        );
1446        assert_eq!(
1447            managed_barrier_state
1448                .epoch_barrier_state_map
1449                .first_key_value()
1450                .unwrap()
1451                .0,
1452            &test_epoch(2)
1453        );
1454        managed_barrier_state.collect(2, barrier3.epoch);
1455        managed_barrier_state.collect(3, barrier3.epoch);
1456        assert_eq!(
1457            managed_barrier_state.pop_next_completed_epoch().await,
1458            test_epoch(2)
1459        );
1460        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1461    }
1462
1463    #[tokio::test]
1464    async fn test_managed_state_stop_actor() {
1465        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1466        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1467        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1468        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1469        let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into(), 3.into(), 4.into()]);
1470        let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into(), 3.into()]);
1471        let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into()]);
1472        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1473        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1474        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1475
1476        managed_barrier_state.collect(1, barrier1.epoch);
1477        managed_barrier_state.collect(1, barrier2.epoch);
1478        managed_barrier_state.collect(1, barrier3.epoch);
1479        managed_barrier_state.collect(2, barrier1.epoch);
1480        managed_barrier_state.collect(2, barrier2.epoch);
1481        managed_barrier_state.collect(2, barrier3.epoch);
1482        assert_eq!(
1483            managed_barrier_state
1484                .epoch_barrier_state_map
1485                .first_key_value()
1486                .unwrap()
1487                .0,
1488            &0
1489        );
1490        managed_barrier_state.collect(3, barrier1.epoch);
1491        managed_barrier_state.collect(3, barrier2.epoch);
1492        assert_eq!(
1493            managed_barrier_state
1494                .epoch_barrier_state_map
1495                .first_key_value()
1496                .unwrap()
1497                .0,
1498            &0
1499        );
1500        managed_barrier_state.collect(4, barrier1.epoch);
1501        assert_eq!(
1502            managed_barrier_state.pop_next_completed_epoch().await,
1503            test_epoch(0)
1504        );
1505        assert_eq!(
1506            managed_barrier_state.pop_next_completed_epoch().await,
1507            test_epoch(1)
1508        );
1509        assert_eq!(
1510            managed_barrier_state.pop_next_completed_epoch().await,
1511            test_epoch(2)
1512        );
1513        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1514    }
1515}