risingwave_stream/task/barrier_worker/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::future::BoxFuture;
25use futures::stream::{FuturesOrdered, FuturesUnordered};
26use futures::{FutureExt, StreamExt};
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::TableId;
29use risingwave_common::id::SourceId;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_pb::stream_plan::barrier::BarrierKind;
32use risingwave_pb::stream_service::barrier_complete_response::{
33    PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
34    PbListFinishedSource, PbLoadFinishedSource,
35};
36use risingwave_storage::StateStoreImpl;
37use tokio::sync::mpsc;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
39use tokio::task::JoinHandle;
40
41use crate::error::{StreamError, StreamResult};
42use crate::executor::Barrier;
43use crate::executor::monitor::StreamingMetrics;
44use crate::task::progress::BackfillState;
45use crate::task::{
46    ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
47    StreamActorManager, UpDownActorIds,
48};
49
50struct IssuedState {
51    /// Actor ids remaining to be collected.
52    pub remaining_actors: BTreeSet<ActorId>,
53
54    pub barrier_inflight_latency: HistogramTimer,
55}
56
57impl Debug for IssuedState {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("IssuedState")
60            .field("remaining_actors", &self.remaining_actors)
61            .finish()
62    }
63}
64
65/// The state machine of local barrier manager.
66#[derive(Debug)]
67enum ManagedBarrierStateInner {
68    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
69    Issued(IssuedState),
70
71    /// The barrier has been collected by all remaining actors
72    AllCollected {
73        create_mview_progress: Vec<PbCreateMviewProgress>,
74        list_finished_source_ids: Vec<PbListFinishedSource>,
75        load_finished_source_ids: Vec<PbLoadFinishedSource>,
76        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
77        cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
78        truncate_tables: Vec<TableId>,
79        refresh_finished_tables: Vec<TableId>,
80    },
81}
82
83#[derive(Debug)]
84struct BarrierState {
85    barrier: Barrier,
86    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
87    table_ids: Option<HashSet<TableId>>,
88    inner: ManagedBarrierStateInner,
89}
90
91use risingwave_common::must_match;
92use risingwave_pb::id::FragmentId;
93use risingwave_pb::stream_service::InjectBarrierRequest;
94
95use crate::executor::exchange::permit;
96use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
97use crate::task::barrier_worker::{ScoredStreamError, TakeReceiverRequest};
98use crate::task::cdc_progress::CdcTableBackfillState;
99
100pub(super) struct ManagedBarrierStateDebugInfo<'a> {
101    running_actors: BTreeSet<ActorId>,
102    graph_state: &'a PartialGraphManagedBarrierState,
103}
104
105impl Display for ManagedBarrierStateDebugInfo<'_> {
106    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
107        write!(f, "running_actors: ")?;
108        for actor_id in &self.running_actors {
109            write!(f, "{}, ", actor_id)?;
110        }
111        {
112            writeln!(f, "graph states")?;
113            write!(f, "{}", self.graph_state)?;
114        }
115        Ok(())
116    }
117}
118
119impl Display for &'_ PartialGraphManagedBarrierState {
120    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
121        let mut prev_epoch = 0u64;
122        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
123            write!(f, "> Epoch {}: ", epoch)?;
124            match &barrier_state.inner {
125                ManagedBarrierStateInner::Issued(state) => {
126                    write!(
127                        f,
128                        "Issued [{:?}]. Remaining actors: [",
129                        barrier_state.barrier.kind
130                    )?;
131                    let mut is_prev_epoch_issued = false;
132                    if prev_epoch != 0 {
133                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
134                        if let ManagedBarrierStateInner::Issued(IssuedState {
135                            remaining_actors: remaining_actors_prev,
136                            ..
137                        }) = &bs.inner
138                        {
139                            // Only show the actors that are not in the previous epoch.
140                            is_prev_epoch_issued = true;
141                            let mut duplicates = 0usize;
142                            for actor_id in &state.remaining_actors {
143                                if !remaining_actors_prev.contains(actor_id) {
144                                    write!(f, "{}, ", actor_id)?;
145                                } else {
146                                    duplicates += 1;
147                                }
148                            }
149                            if duplicates > 0 {
150                                write!(f, "...and {} actors in prev epoch", duplicates)?;
151                            }
152                        }
153                    }
154                    if !is_prev_epoch_issued {
155                        for actor_id in &state.remaining_actors {
156                            write!(f, "{}, ", actor_id)?;
157                        }
158                    }
159                    write!(f, "]")?;
160                }
161                ManagedBarrierStateInner::AllCollected { .. } => {
162                    write!(f, "AllCollected")?;
163                }
164            }
165            prev_epoch = *epoch;
166            writeln!(f)?;
167        }
168
169        if !self.create_mview_progress.is_empty() {
170            writeln!(f, "Create MView Progress:")?;
171            for (epoch, progress) in &self.create_mview_progress {
172                write!(f, "> Epoch {}:", epoch)?;
173                for (actor_id, (_, state)) in progress {
174                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
175                }
176            }
177        }
178
179        Ok(())
180    }
181}
182
183enum InflightActorStatus {
184    /// The actor has been issued some barriers, but has not collected the first barrier
185    IssuedFirst(Vec<Barrier>),
186    /// The actor has been issued some barriers, and has collected the first barrier
187    Running(u64),
188}
189
190impl InflightActorStatus {
191    fn max_issued_epoch(&self) -> u64 {
192        match self {
193            InflightActorStatus::Running(epoch) => *epoch,
194            InflightActorStatus::IssuedFirst(issued_barriers) => {
195                issued_barriers.last().expect("non-empty").epoch.prev
196            }
197        }
198    }
199}
200
201pub(crate) struct InflightActorState {
202    actor_id: ActorId,
203    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
204    /// `prev_epoch`. `push_back` and `pop_front`
205    pub(in crate::task) inflight_barriers: VecDeque<u64>,
206    status: InflightActorStatus,
207    /// Whether the actor has been issued a stop barrier
208    is_stopping: bool,
209
210    new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
211    join_handle: JoinHandle<()>,
212    monitor_task_handle: Option<JoinHandle<()>>,
213}
214
215impl InflightActorState {
216    pub(super) fn start(
217        actor_id: ActorId,
218        initial_barrier: &Barrier,
219        new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
220        join_handle: JoinHandle<()>,
221        monitor_task_handle: Option<JoinHandle<()>>,
222    ) -> Self {
223        Self {
224            actor_id,
225            barrier_senders: vec![],
226            inflight_barriers: VecDeque::from_iter([initial_barrier.epoch.prev]),
227            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
228            is_stopping: false,
229            new_output_request_tx,
230            join_handle,
231            monitor_task_handle,
232        }
233    }
234
235    pub(super) fn issue_barrier(&mut self, barrier: &Barrier, is_stop: bool) -> StreamResult<()> {
236        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
237
238        for barrier_sender in &self.barrier_senders {
239            barrier_sender.send(barrier.clone()).map_err(|_| {
240                StreamError::barrier_send(
241                    barrier.clone(),
242                    self.actor_id,
243                    "failed to send to registered sender",
244                )
245            })?;
246        }
247
248        if let Some(prev_epoch) = self.inflight_barriers.back() {
249            assert!(*prev_epoch < barrier.epoch.prev);
250        }
251        self.inflight_barriers.push_back(barrier.epoch.prev);
252
253        match &mut self.status {
254            InflightActorStatus::IssuedFirst(pending_barriers) => {
255                pending_barriers.push(barrier.clone());
256            }
257            InflightActorStatus::Running(prev_epoch) => {
258                *prev_epoch = barrier.epoch.prev;
259            }
260        };
261
262        if is_stop {
263            assert!(!self.is_stopping, "stopped actor should not issue barrier");
264            self.is_stopping = true;
265        }
266        Ok(())
267    }
268
269    pub(super) fn collect(&mut self, epoch: EpochPair) -> bool {
270        let prev_epoch = self.inflight_barriers.pop_front().expect("should exist");
271        assert_eq!(prev_epoch, epoch.prev);
272        match &self.status {
273            InflightActorStatus::IssuedFirst(pending_barriers) => {
274                assert_eq!(
275                    prev_epoch,
276                    pending_barriers.first().expect("non-empty").epoch.prev
277                );
278                self.status = InflightActorStatus::Running(
279                    pending_barriers.last().expect("non-empty").epoch.prev,
280                );
281            }
282            InflightActorStatus::Running(_) => {}
283        }
284
285        self.inflight_barriers.is_empty() && self.is_stopping
286    }
287}
288
289/// Part of [`PartialGraphState`]
290pub(crate) struct PartialGraphManagedBarrierState {
291    /// Record barrier state for each epoch of concurrent checkpoints.
292    ///
293    /// The key is `prev_epoch`, and the first value is `curr_epoch`
294    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
295
296    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
297
298    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
299    ///
300    /// The process of progress reporting is as follows:
301    /// 1. updated by [`crate::task::barrier_manager::CreateMviewProgressReporter::update`]
302    /// 2. converted to [`ManagedBarrierStateInner`] in [`Self::may_have_collected_all`]
303    /// 3. handled by [`Self::pop_barrier_to_complete`]
304    /// 4. put in [`crate::task::barrier_worker::BarrierCompleteResult`] and reported to meta.
305    pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, (FragmentId, BackfillState)>>,
306
307    /// Record the source list finished reports for each epoch of concurrent checkpoints.
308    /// Used for refreshable batch source. The map key is epoch and the value is
309    /// a list of pb messages reported by actors.
310    pub(crate) list_finished_source_ids: HashMap<u64, Vec<PbListFinishedSource>>,
311
312    /// Record the source load finished reports for each epoch of concurrent checkpoints.
313    /// Used for refreshable batch source. The map key is epoch and the value is
314    /// a list of pb messages reported by actors.
315    pub(crate) load_finished_source_ids: HashMap<u64, Vec<PbLoadFinishedSource>>,
316
317    pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
318
319    /// Record CDC source offset updated reports for each epoch of concurrent checkpoints.
320    /// Used to track when CDC sources have updated their offset at least once.
321    pub(crate) cdc_source_offset_updated: HashMap<u64, Vec<PbCdcSourceOffsetUpdated>>,
322
323    /// Record the tables to truncate for each epoch of concurrent checkpoints.
324    pub(crate) truncate_tables: HashMap<u64, HashSet<TableId>>,
325    /// Record the tables that have finished refresh for each epoch of concurrent checkpoints.
326    /// Used for materialized view refresh completion reporting.
327    pub(crate) refresh_finished_tables: HashMap<u64, HashSet<TableId>>,
328
329    state_store: StateStoreImpl,
330
331    streaming_metrics: Arc<StreamingMetrics>,
332}
333
334impl PartialGraphManagedBarrierState {
335    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
336        Self::new_inner(
337            actor_manager.env.state_store(),
338            actor_manager.streaming_metrics.clone(),
339        )
340    }
341
342    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
343        Self {
344            epoch_barrier_state_map: Default::default(),
345            prev_barrier_table_ids: None,
346            create_mview_progress: Default::default(),
347            list_finished_source_ids: Default::default(),
348            load_finished_source_ids: Default::default(),
349            cdc_table_backfill_progress: Default::default(),
350            cdc_source_offset_updated: Default::default(),
351            truncate_tables: Default::default(),
352            refresh_finished_tables: Default::default(),
353            state_store,
354            streaming_metrics,
355        }
356    }
357
358    #[cfg(test)]
359    pub(crate) fn for_test() -> Self {
360        Self::new_inner(
361            StateStoreImpl::for_test(),
362            Arc::new(StreamingMetrics::unused()),
363        )
364    }
365
366    pub(super) fn is_empty(&self) -> bool {
367        self.epoch_barrier_state_map.is_empty()
368    }
369}
370
371pub(crate) struct SuspendedPartialGraphState {
372    pub(super) suspend_time: Instant,
373    inner: PartialGraphState,
374    failure: Option<(Option<ActorId>, StreamError)>,
375}
376
377impl SuspendedPartialGraphState {
378    fn new(
379        state: PartialGraphState,
380        failure: Option<(Option<ActorId>, StreamError)>,
381        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
382    ) -> Self {
383        Self {
384            suspend_time: Instant::now(),
385            inner: state,
386            failure,
387        }
388    }
389
390    async fn reset(mut self) -> ResetPartialGraphOutput {
391        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
392        self.inner.abort_and_wait_actors().await;
393        ResetPartialGraphOutput { root_err }
394    }
395}
396
397pub(crate) struct ResetPartialGraphOutput {
398    pub(crate) root_err: Option<ScoredStreamError>,
399}
400
401pub(in crate::task) enum PartialGraphStatus {
402    ReceivedExchangeRequest(Vec<(UpDownActorIds, TakeReceiverRequest)>),
403    Running(PartialGraphState),
404    Suspended(SuspendedPartialGraphState),
405    Resetting,
406    /// temporary place holder
407    Unspecified,
408}
409
410impl PartialGraphStatus {
411    pub(crate) async fn abort(&mut self) {
412        match self {
413            PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
414                for (_, request) in pending_requests.drain(..) {
415                    if let TakeReceiverRequest::Remote { result_sender, .. } = request {
416                        let _ = result_sender.send(Err(anyhow!("partial graph aborted").into()));
417                    }
418                }
419            }
420            PartialGraphStatus::Running(state) => {
421                state.abort_and_wait_actors().await;
422            }
423            PartialGraphStatus::Suspended(SuspendedPartialGraphState { inner: state, .. }) => {
424                state.abort_and_wait_actors().await;
425            }
426            PartialGraphStatus::Resetting => {}
427            PartialGraphStatus::Unspecified => {
428                unreachable!()
429            }
430        }
431    }
432
433    pub(crate) fn state_for_request(&mut self) -> Option<&mut PartialGraphState> {
434        match self {
435            PartialGraphStatus::ReceivedExchangeRequest(_) => {
436                unreachable!("should not handle request")
437            }
438            PartialGraphStatus::Running(state) => Some(state),
439            PartialGraphStatus::Suspended(_) => None,
440            PartialGraphStatus::Resetting => {
441                unreachable!("should not receive further request during cleaning")
442            }
443            PartialGraphStatus::Unspecified => {
444                unreachable!()
445            }
446        }
447    }
448
449    pub(super) fn poll_next_event(
450        &mut self,
451        cx: &mut Context<'_>,
452    ) -> Poll<ManagedBarrierStateEvent> {
453        match self {
454            PartialGraphStatus::ReceivedExchangeRequest(_) => Poll::Pending,
455            PartialGraphStatus::Running(state) => state.poll_next_event(cx),
456            PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting => Poll::Pending,
457            PartialGraphStatus::Unspecified => {
458                unreachable!()
459            }
460        }
461    }
462
463    pub(super) fn suspend(
464        &mut self,
465        failed_actor: Option<ActorId>,
466        err: StreamError,
467        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
468    ) {
469        let state = must_match!(replace(self, PartialGraphStatus::Unspecified), PartialGraphStatus::Running(state) => state);
470        *self = PartialGraphStatus::Suspended(SuspendedPartialGraphState::new(
471            state,
472            Some((failed_actor, err)),
473            completing_futures,
474        ));
475    }
476
477    pub(super) fn start_reset(
478        &mut self,
479        partial_graph_id: PartialGraphId,
480        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
481        table_ids_to_clear: &mut HashSet<TableId>,
482    ) -> BoxFuture<'static, ResetPartialGraphOutput> {
483        match replace(self, PartialGraphStatus::Resetting) {
484            PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
485                for (_, request) in pending_requests {
486                    if let TakeReceiverRequest::Remote { result_sender, .. } = request {
487                        let _ = result_sender.send(Err(anyhow!("partial graph reset").into()));
488                    }
489                }
490                async move { ResetPartialGraphOutput { root_err: None } }.boxed()
491            }
492            PartialGraphStatus::Running(state) => {
493                assert_eq!(partial_graph_id, state.partial_graph_id);
494                info!(
495                    %partial_graph_id,
496                    "start partial graph reset from Running"
497                );
498                table_ids_to_clear.extend(state.table_ids.iter().copied());
499                SuspendedPartialGraphState::new(state, None, completing_futures)
500                    .reset()
501                    .boxed()
502            }
503            PartialGraphStatus::Suspended(state) => {
504                assert!(
505                    completing_futures.is_none(),
506                    "should have been clear when suspended"
507                );
508                assert_eq!(partial_graph_id, state.inner.partial_graph_id);
509                info!(
510                    %partial_graph_id,
511                    suspend_elapsed = ?state.suspend_time.elapsed(),
512                    "start partial graph reset after suspended"
513                );
514                table_ids_to_clear.extend(state.inner.table_ids.iter().copied());
515                state.reset().boxed()
516            }
517            PartialGraphStatus::Resetting => {
518                unreachable!("should not reset for twice");
519            }
520            PartialGraphStatus::Unspecified => {
521                unreachable!()
522            }
523        }
524    }
525}
526
527#[derive(Default)]
528pub(in crate::task) struct ManagedBarrierState {
529    pub(super) partial_graphs: HashMap<PartialGraphId, PartialGraphStatus>,
530    pub(super) resetting_graphs:
531        FuturesUnordered<JoinHandle<Vec<(PartialGraphId, ResetPartialGraphOutput)>>>,
532}
533
534pub(super) enum ManagedBarrierStateEvent {
535    BarrierCollected {
536        partial_graph_id: PartialGraphId,
537        barrier: Barrier,
538    },
539    ActorError {
540        partial_graph_id: PartialGraphId,
541        actor_id: ActorId,
542        err: StreamError,
543    },
544    PartialGraphsReset(Vec<(PartialGraphId, ResetPartialGraphOutput)>),
545    RegisterLocalUpstreamOutput {
546        actor_id: ActorId,
547        upstream_actor_id: ActorId,
548        upstream_partial_graph_id: PartialGraphId,
549        tx: permit::Sender,
550    },
551}
552
553impl ManagedBarrierState {
554    pub(super) fn next_event(&mut self) -> impl Future<Output = ManagedBarrierStateEvent> + '_ {
555        poll_fn(|cx| {
556            for graph in self.partial_graphs.values_mut() {
557                if let Poll::Ready(event) = graph.poll_next_event(cx) {
558                    return Poll::Ready(event);
559                }
560            }
561            if let Poll::Ready(Some(result)) = self.resetting_graphs.poll_next_unpin(cx) {
562                let outputs = result.expect("failed to join resetting future");
563                for (partial_graph_id, _) in &outputs {
564                    let PartialGraphStatus::Resetting = self
565                        .partial_graphs
566                        .remove(partial_graph_id)
567                        .expect("should exist")
568                    else {
569                        panic!("should be resetting")
570                    };
571                }
572                return Poll::Ready(ManagedBarrierStateEvent::PartialGraphsReset(outputs));
573            }
574            Poll::Pending
575        })
576    }
577}
578
579/// Per-partial-graph barrier state manager. Handles barriers for one specific partial graph.
580/// Part of [`ManagedBarrierState`] in [`super::LocalBarrierWorker`].
581///
582/// See [`crate::task`] for architecture overview.
583pub(crate) struct PartialGraphState {
584    partial_graph_id: PartialGraphId,
585    pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
586    pub(super) actor_pending_new_output_requests:
587        HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
588
589    pub(crate) graph_state: PartialGraphManagedBarrierState,
590
591    table_ids: HashSet<TableId>,
592
593    actor_manager: Arc<StreamActorManager>,
594
595    pub(super) local_barrier_manager: LocalBarrierManager,
596
597    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
598    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
599}
600
601impl PartialGraphState {
602    /// Create a barrier manager state. This will be called only once.
603    pub(super) fn new(
604        partial_graph_id: PartialGraphId,
605        term_id: String,
606        actor_manager: Arc<StreamActorManager>,
607    ) -> Self {
608        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
609            LocalBarrierManager::new(term_id, actor_manager.env.clone());
610        Self {
611            partial_graph_id,
612            actor_states: Default::default(),
613            actor_pending_new_output_requests: Default::default(),
614            graph_state: PartialGraphManagedBarrierState::new(&actor_manager),
615            table_ids: Default::default(),
616            actor_manager,
617            local_barrier_manager,
618            barrier_event_rx,
619            actor_failure_rx,
620        }
621    }
622
623    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
624        ManagedBarrierStateDebugInfo {
625            running_actors: self.actor_states.keys().cloned().collect(),
626            graph_state: &self.graph_state,
627        }
628    }
629
630    async fn abort_and_wait_actors(&mut self) {
631        for (actor_id, state) in &self.actor_states {
632            tracing::debug!("force stopping actor {}", actor_id);
633            state.join_handle.abort();
634            if let Some(monitor_task_handle) = &state.monitor_task_handle {
635                monitor_task_handle.abort();
636            }
637        }
638
639        for (actor_id, state) in self.actor_states.drain() {
640            tracing::debug!("join actor {}", actor_id);
641            let result = state.join_handle.await;
642            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
643        }
644    }
645}
646
647impl InflightActorState {
648    pub(super) fn register_barrier_sender(
649        &mut self,
650        tx: mpsc::UnboundedSender<Barrier>,
651    ) -> StreamResult<()> {
652        match &self.status {
653            InflightActorStatus::IssuedFirst(pending_barriers) => {
654                for barrier in pending_barriers {
655                    tx.send(barrier.clone()).map_err(|_| {
656                        StreamError::barrier_send(
657                            barrier.clone(),
658                            self.actor_id,
659                            "failed to send pending barriers to newly registered sender",
660                        )
661                    })?;
662                }
663                self.barrier_senders.push(tx);
664            }
665            InflightActorStatus::Running(_) => {
666                unreachable!("should not register barrier sender when entering Running status")
667            }
668        }
669        Ok(())
670    }
671}
672
673impl PartialGraphState {
674    pub(super) fn register_barrier_sender(
675        &mut self,
676        actor_id: ActorId,
677        tx: mpsc::UnboundedSender<Barrier>,
678    ) -> StreamResult<()> {
679        self.actor_states
680            .get_mut(&actor_id)
681            .expect("should exist")
682            .register_barrier_sender(tx)
683    }
684}
685
686impl PartialGraphState {
687    pub(super) fn transform_to_issued(
688        &mut self,
689        barrier: &Barrier,
690        request: InjectBarrierRequest,
691    ) -> StreamResult<()> {
692        assert_eq!(self.partial_graph_id, request.partial_graph_id);
693        let actor_to_stop = barrier.all_stop_actors();
694        let is_stop_actor = |actor_id| {
695            actor_to_stop
696                .map(|actors| actors.contains(&actor_id))
697                .unwrap_or(false)
698        };
699
700        let table_ids = HashSet::from_iter(request.table_ids_to_sync);
701        self.table_ids.extend(table_ids.iter().cloned());
702
703        self.graph_state.transform_to_issued(
704            barrier,
705            request.actor_ids_to_collect.iter().copied(),
706            table_ids,
707        );
708
709        let mut new_actors = HashSet::new();
710        for (node, fragment_id, actor) in
711            request
712                .actors_to_build
713                .into_iter()
714                .flat_map(|fragment_actors| {
715                    let node = Arc::new(fragment_actors.node.unwrap());
716                    fragment_actors
717                        .actors
718                        .into_iter()
719                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
720                })
721        {
722            let actor_id = actor.actor_id;
723            assert!(!is_stop_actor(actor_id));
724            assert!(new_actors.insert(actor_id));
725            assert!(request.actor_ids_to_collect.contains(&actor_id));
726            let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
727            if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
728            {
729                for request in pending_requests {
730                    let _ = new_output_request_tx.send(request);
731                }
732            }
733            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
734                actor,
735                fragment_id,
736                node,
737                self.local_barrier_manager.clone(),
738                new_output_request_rx,
739            );
740            assert!(
741                self.actor_states
742                    .try_insert(
743                        actor_id,
744                        InflightActorState::start(
745                            actor_id,
746                            barrier,
747                            new_output_request_tx,
748                            join_handle,
749                            monitor_join_handle
750                        )
751                    )
752                    .is_ok()
753            );
754        }
755
756        // Spawn a trivial join handle to be compatible with the unit test. In the unit tests that involve local barrier manager,
757        // actors are spawned in the local test logic, but we assume that there is an entry for each spawned actor in ·actor_states`,
758        // so under cfg!(test) we add a dummy entry for each new actor.
759        if cfg!(test) {
760            for &actor_id in &request.actor_ids_to_collect {
761                if !self.actor_states.contains_key(&actor_id) {
762                    let (tx, rx) = unbounded_channel();
763                    let join_handle = self.actor_manager.runtime.spawn(async move {
764                        // The rx is spawned so that tx.send() will not fail.
765                        let _ = rx;
766                        pending().await
767                    });
768                    assert!(
769                        self.actor_states
770                            .try_insert(
771                                actor_id,
772                                InflightActorState::start(actor_id, barrier, tx, join_handle, None,)
773                            )
774                            .is_ok()
775                    );
776                    new_actors.insert(actor_id);
777                }
778            }
779        }
780
781        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
782        // we call `start_epoch` on the graph before the actors receive the barrier
783        for &actor_id in &request.actor_ids_to_collect {
784            if new_actors.contains(&actor_id) {
785                continue;
786            }
787            self.actor_states
788                .get_mut(&actor_id)
789                .unwrap_or_else(|| {
790                    panic!(
791                        "should exist: {} {:?}",
792                        actor_id, request.actor_ids_to_collect
793                    );
794                })
795                .issue_barrier(barrier, is_stop_actor(actor_id))?;
796        }
797
798        Ok(())
799    }
800
801    pub(super) fn new_actor_output_request(
802        &mut self,
803        actor_id: ActorId,
804        upstream_actor_id: ActorId,
805        request: TakeReceiverRequest,
806    ) {
807        let request = match request {
808            TakeReceiverRequest::Remote {
809                result_sender,
810                upstream_fragment_id,
811            } => {
812                let upstream_fragment_id_str = upstream_fragment_id.to_string();
813                let fragment_channel_buffered_bytes = self
814                    .actor_manager
815                    .streaming_metrics
816                    .fragment_channel_buffered_bytes
817                    .with_guarded_label_values(&[&upstream_fragment_id_str]);
818                let (tx, rx) = permit::channel_from_config_with_metrics(
819                    self.local_barrier_manager.env.global_config(),
820                    permit::ChannelMetrics {
821                        sender_actor_channel_buffered_bytes: fragment_channel_buffered_bytes
822                            .clone(),
823                        receiver_actor_channel_buffered_bytes: fragment_channel_buffered_bytes,
824                    },
825                );
826                let _ = result_sender.send(Ok(rx));
827                NewOutputRequest::Remote(tx)
828            }
829            TakeReceiverRequest::Local(tx) => NewOutputRequest::Local(tx),
830        };
831        if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
832            let _ = actor.new_output_request_tx.send((actor_id, request));
833        } else {
834            self.actor_pending_new_output_requests
835                .entry(upstream_actor_id)
836                .or_default()
837                .push((actor_id, request));
838        }
839    }
840
841    /// Handles [`LocalBarrierEvent`] from [`crate::task::barrier_manager::LocalBarrierManager`].
842    pub(super) fn poll_next_event(
843        &mut self,
844        cx: &mut Context<'_>,
845    ) -> Poll<ManagedBarrierStateEvent> {
846        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
847            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
848            return Poll::Ready(ManagedBarrierStateEvent::ActorError {
849                actor_id,
850                err,
851                partial_graph_id: self.partial_graph_id,
852            });
853        }
854        // yield some pending collected epochs
855        {
856            if let Some(barrier) = self.graph_state.may_have_collected_all() {
857                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
858                    barrier,
859                    partial_graph_id: self.partial_graph_id,
860                });
861            }
862        }
863        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
864            match event.expect("non-empty when tx in local_barrier_manager") {
865                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
866                    if let Some(barrier) = self.collect(actor_id, epoch) {
867                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
868                            barrier,
869                            partial_graph_id: self.partial_graph_id,
870                        });
871                    }
872                }
873                LocalBarrierEvent::ReportCreateProgress {
874                    epoch,
875                    fragment_id,
876                    actor,
877                    state,
878                } => {
879                    self.update_create_mview_progress(epoch, fragment_id, actor, state);
880                }
881                LocalBarrierEvent::ReportSourceListFinished {
882                    epoch,
883                    actor_id,
884                    table_id,
885                    associated_source_id,
886                } => {
887                    self.report_source_list_finished(
888                        epoch,
889                        actor_id,
890                        table_id,
891                        associated_source_id,
892                    );
893                }
894                LocalBarrierEvent::ReportSourceLoadFinished {
895                    epoch,
896                    actor_id,
897                    table_id,
898                    associated_source_id,
899                } => {
900                    self.report_source_load_finished(
901                        epoch,
902                        actor_id,
903                        table_id,
904                        associated_source_id,
905                    );
906                }
907                LocalBarrierEvent::RefreshFinished {
908                    epoch,
909                    actor_id,
910                    table_id,
911                    staging_table_id,
912                } => {
913                    self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
914                }
915                LocalBarrierEvent::RegisterBarrierSender {
916                    actor_id,
917                    barrier_sender,
918                } => {
919                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
920                        return Poll::Ready(ManagedBarrierStateEvent::ActorError {
921                            actor_id,
922                            err,
923                            partial_graph_id: self.partial_graph_id,
924                        });
925                    }
926                }
927                LocalBarrierEvent::RegisterLocalUpstreamOutput {
928                    actor_id,
929                    upstream_actor_id,
930                    upstream_partial_graph_id,
931                    tx,
932                } => {
933                    return Poll::Ready(ManagedBarrierStateEvent::RegisterLocalUpstreamOutput {
934                        actor_id,
935                        upstream_actor_id,
936                        upstream_partial_graph_id,
937                        tx,
938                    });
939                }
940                LocalBarrierEvent::ReportCdcTableBackfillProgress {
941                    actor_id,
942                    epoch,
943                    state,
944                } => {
945                    self.update_cdc_table_backfill_progress(epoch, actor_id, state);
946                }
947                LocalBarrierEvent::ReportCdcSourceOffsetUpdated {
948                    epoch,
949                    actor_id,
950                    source_id,
951                } => {
952                    self.report_cdc_source_offset_updated(epoch, actor_id, source_id);
953                }
954            }
955        }
956
957        debug_assert!(self.graph_state.may_have_collected_all().is_none());
958        Poll::Pending
959    }
960}
961
962impl PartialGraphState {
963    #[must_use]
964    pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) -> Option<Barrier> {
965        let is_finished = self
966            .actor_states
967            .get_mut(&actor_id)
968            .expect("should exist")
969            .collect(epoch);
970        if is_finished {
971            let state = self.actor_states.remove(&actor_id).expect("should exist");
972            if let Some(monitor_task_handle) = state.monitor_task_handle {
973                monitor_task_handle.abort();
974            }
975        }
976        self.graph_state.collect(actor_id, epoch);
977        self.graph_state.may_have_collected_all()
978    }
979
980    pub(super) fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
981        self.graph_state.pop_barrier_to_complete(prev_epoch)
982    }
983
984    /// Collect actor errors for a while and find the one that might be the root cause.
985    ///
986    /// Returns `None` if there's no actor error received.
987    async fn try_find_root_actor_failure(
988        &mut self,
989        first_failure: Option<(Option<ActorId>, StreamError)>,
990    ) -> Option<ScoredStreamError> {
991        let mut later_errs = vec![];
992        // fetch more actor errors within a timeout
993        let _ = tokio::time::timeout(Duration::from_secs(3), async {
994            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
995            if let Some((Some(failed_actor), _)) = &first_failure {
996                uncollected_actors.remove(failed_actor);
997            }
998            while !uncollected_actors.is_empty()
999                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1000            {
1001                uncollected_actors.remove(&actor_id);
1002                later_errs.push(error);
1003            }
1004        })
1005        .await;
1006
1007        first_failure
1008            .into_iter()
1009            .map(|(_, err)| err)
1010            .chain(later_errs.into_iter())
1011            .map(|e| e.with_score())
1012            .max_by_key(|e| e.score)
1013    }
1014
1015    /// Report that a source has finished listing for a specific epoch
1016    pub(super) fn report_source_list_finished(
1017        &mut self,
1018        epoch: EpochPair,
1019        actor_id: ActorId,
1020        table_id: TableId,
1021        associated_source_id: SourceId,
1022    ) {
1023        // Find the correct partial graph state by matching the actor's partial graph id
1024        if let Some(actor_state) = self.actor_states.get(&actor_id)
1025            && actor_state.inflight_barriers.contains(&epoch.prev)
1026        {
1027            self.graph_state
1028                .list_finished_source_ids
1029                .entry(epoch.curr)
1030                .or_default()
1031                .push(PbListFinishedSource {
1032                    reporter_actor_id: actor_id,
1033                    table_id,
1034                    associated_source_id,
1035                });
1036        } else {
1037            warn!(
1038                ?epoch,
1039                %actor_id, %table_id, %associated_source_id, "ignore source list finished"
1040            );
1041        }
1042    }
1043
1044    /// Report that a source has finished loading for a specific epoch
1045    pub(super) fn report_source_load_finished(
1046        &mut self,
1047        epoch: EpochPair,
1048        actor_id: ActorId,
1049        table_id: TableId,
1050        associated_source_id: SourceId,
1051    ) {
1052        // Find the correct partial graph state by matching the actor's partial graph id
1053        if let Some(actor_state) = self.actor_states.get(&actor_id)
1054            && actor_state.inflight_barriers.contains(&epoch.prev)
1055        {
1056            self.graph_state
1057                .load_finished_source_ids
1058                .entry(epoch.curr)
1059                .or_default()
1060                .push(PbLoadFinishedSource {
1061                    reporter_actor_id: actor_id,
1062                    table_id,
1063                    associated_source_id,
1064                });
1065        } else {
1066            warn!(
1067                ?epoch,
1068                %actor_id, %table_id, %associated_source_id, "ignore source load finished"
1069            );
1070        }
1071    }
1072
1073    /// Report that a CDC source has updated its offset at least once
1074    pub(super) fn report_cdc_source_offset_updated(
1075        &mut self,
1076        epoch: EpochPair,
1077        actor_id: ActorId,
1078        source_id: SourceId,
1079    ) {
1080        if let Some(actor_state) = self.actor_states.get(&actor_id)
1081            && actor_state.inflight_barriers.contains(&epoch.prev)
1082        {
1083            self.graph_state
1084                .cdc_source_offset_updated
1085                .entry(epoch.curr)
1086                .or_default()
1087                .push(PbCdcSourceOffsetUpdated {
1088                    reporter_actor_id: actor_id.as_raw_id(),
1089                    source_id: source_id.as_raw_id(),
1090                });
1091        } else {
1092            warn!(
1093                ?epoch,
1094                %actor_id, %source_id, "ignore cdc source offset updated"
1095            );
1096        }
1097    }
1098
1099    /// Report that a table has finished refreshing for a specific epoch
1100    pub(super) fn report_refresh_finished(
1101        &mut self,
1102        epoch: EpochPair,
1103        actor_id: ActorId,
1104        table_id: TableId,
1105        staging_table_id: TableId,
1106    ) {
1107        // Find the correct partial graph state by matching the actor's partial graph id
1108        let Some(actor_state) = self.actor_states.get(&actor_id) else {
1109            warn!(
1110                ?epoch,
1111                %actor_id, %table_id, "ignore refresh finished table: actor_state not found"
1112            );
1113            return;
1114        };
1115        if !actor_state.inflight_barriers.contains(&epoch.prev) {
1116            warn!(
1117                ?epoch,
1118                %actor_id,
1119                %table_id,
1120                inflight_barriers = ?actor_state.inflight_barriers,
1121                "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1122            );
1123            return;
1124        };
1125        self.graph_state
1126            .refresh_finished_tables
1127            .entry(epoch.curr)
1128            .or_default()
1129            .insert(table_id);
1130        self.graph_state
1131            .truncate_tables
1132            .entry(epoch.curr)
1133            .or_default()
1134            .insert(staging_table_id);
1135    }
1136}
1137
1138impl PartialGraphManagedBarrierState {
1139    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
1140    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
1141    /// has been collected from all actors for an `Issued` barrier.
1142    fn may_have_collected_all(&mut self) -> Option<Barrier> {
1143        for barrier_state in self.epoch_barrier_state_map.values_mut() {
1144            match &barrier_state.inner {
1145                ManagedBarrierStateInner::Issued(IssuedState {
1146                    remaining_actors, ..
1147                }) if remaining_actors.is_empty() => {}
1148                ManagedBarrierStateInner::AllCollected { .. } => {
1149                    continue;
1150                }
1151                ManagedBarrierStateInner::Issued(_) => {
1152                    break;
1153                }
1154            }
1155
1156            self.streaming_metrics.barrier_manager_progress.inc();
1157
1158            let create_mview_progress = self
1159                .create_mview_progress
1160                .remove(&barrier_state.barrier.epoch.curr)
1161                .unwrap_or_default()
1162                .into_iter()
1163                .map(|(actor, (fragment_id, state))| state.to_pb(fragment_id, actor))
1164                .collect();
1165
1166            let list_finished_source_ids = self
1167                .list_finished_source_ids
1168                .remove(&barrier_state.barrier.epoch.curr)
1169                .unwrap_or_default();
1170
1171            let load_finished_source_ids = self
1172                .load_finished_source_ids
1173                .remove(&barrier_state.barrier.epoch.curr)
1174                .unwrap_or_default();
1175
1176            let cdc_table_backfill_progress = self
1177                .cdc_table_backfill_progress
1178                .remove(&barrier_state.barrier.epoch.curr)
1179                .unwrap_or_default()
1180                .into_iter()
1181                .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1182                .collect();
1183
1184            let cdc_source_offset_updated = self
1185                .cdc_source_offset_updated
1186                .remove(&barrier_state.barrier.epoch.curr)
1187                .unwrap_or_default();
1188
1189            let truncate_tables = self
1190                .truncate_tables
1191                .remove(&barrier_state.barrier.epoch.curr)
1192                .unwrap_or_default()
1193                .into_iter()
1194                .collect();
1195
1196            let refresh_finished_tables = self
1197                .refresh_finished_tables
1198                .remove(&barrier_state.barrier.epoch.curr)
1199                .unwrap_or_default()
1200                .into_iter()
1201                .collect();
1202            let prev_state = replace(
1203                &mut barrier_state.inner,
1204                ManagedBarrierStateInner::AllCollected {
1205                    create_mview_progress,
1206                    list_finished_source_ids,
1207                    load_finished_source_ids,
1208                    truncate_tables,
1209                    refresh_finished_tables,
1210                    cdc_table_backfill_progress,
1211                    cdc_source_offset_updated,
1212                },
1213            );
1214
1215            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1216                barrier_inflight_latency: timer,
1217                ..
1218            }) => {
1219                timer.observe_duration();
1220            });
1221
1222            return Some(barrier_state.barrier.clone());
1223        }
1224        None
1225    }
1226
1227    fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1228        let (popped_prev_epoch, barrier_state) = self
1229            .epoch_barrier_state_map
1230            .pop_first()
1231            .expect("should exist");
1232
1233        assert_eq!(prev_epoch, popped_prev_epoch);
1234
1235        let (
1236            create_mview_progress,
1237            list_finished_source_ids,
1238            load_finished_source_ids,
1239            cdc_table_backfill_progress,
1240            cdc_source_offset_updated,
1241            truncate_tables,
1242            refresh_finished_tables,
1243        ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1244            create_mview_progress,
1245            list_finished_source_ids,
1246            load_finished_source_ids,
1247            truncate_tables,
1248            refresh_finished_tables,
1249            cdc_table_backfill_progress,
1250            cdc_source_offset_updated,
1251        } => {
1252            (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, cdc_source_offset_updated, truncate_tables, refresh_finished_tables)
1253        });
1254        BarrierToComplete {
1255            barrier: barrier_state.barrier,
1256            table_ids: barrier_state.table_ids,
1257            create_mview_progress,
1258            list_finished_source_ids,
1259            load_finished_source_ids,
1260            truncate_tables,
1261            refresh_finished_tables,
1262            cdc_table_backfill_progress,
1263            cdc_source_offset_updated,
1264        }
1265    }
1266}
1267
1268pub(crate) struct BarrierToComplete {
1269    pub barrier: Barrier,
1270    pub table_ids: Option<HashSet<TableId>>,
1271    pub create_mview_progress: Vec<PbCreateMviewProgress>,
1272    pub list_finished_source_ids: Vec<PbListFinishedSource>,
1273    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
1274    pub truncate_tables: Vec<TableId>,
1275    pub refresh_finished_tables: Vec<TableId>,
1276    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1277    pub cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
1278}
1279
1280impl PartialGraphManagedBarrierState {
1281    /// Collect a `barrier` from the actor with `actor_id`.
1282    pub(super) fn collect(&mut self, actor_id: impl Into<ActorId>, epoch: EpochPair) {
1283        let actor_id = actor_id.into();
1284        tracing::debug!(
1285            target: "events::stream::barrier::manager::collect",
1286            ?epoch, %actor_id, state = ?self.epoch_barrier_state_map,
1287            "collect_barrier",
1288        );
1289
1290        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1291            None => {
1292                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1293                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1294                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1295                panic!(
1296                    "cannot collect new actor barrier {:?} at current state: None",
1297                    epoch,
1298                )
1299            }
1300            Some(&mut BarrierState {
1301                ref barrier,
1302                inner:
1303                    ManagedBarrierStateInner::Issued(IssuedState {
1304                        ref mut remaining_actors,
1305                        ..
1306                    }),
1307                ..
1308            }) => {
1309                let exist = remaining_actors.remove(&actor_id);
1310                assert!(
1311                    exist,
1312                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1313                    actor_id, epoch.curr
1314                );
1315                assert_eq!(barrier.epoch.curr, epoch.curr);
1316            }
1317            Some(BarrierState { inner, .. }) => {
1318                panic!(
1319                    "cannot collect new actor barrier {:?} at current state: {:?}",
1320                    epoch, inner
1321                )
1322            }
1323        }
1324    }
1325
1326    /// When the meta service issues a `send_barrier` request, call this function to transform to
1327    /// `Issued` and start to collect or to notify.
1328    pub(super) fn transform_to_issued(
1329        &mut self,
1330        barrier: &Barrier,
1331        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1332        table_ids: HashSet<TableId>,
1333    ) {
1334        let timer = self
1335            .streaming_metrics
1336            .barrier_inflight_latency
1337            .start_timer();
1338
1339        if let Some(hummock) = self.state_store.as_hummock() {
1340            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1341        }
1342
1343        let table_ids = match barrier.kind {
1344            BarrierKind::Unspecified => {
1345                unreachable!()
1346            }
1347            BarrierKind::Initial => {
1348                assert!(
1349                    self.prev_barrier_table_ids.is_none(),
1350                    "non empty table_ids at initial barrier: {:?}",
1351                    self.prev_barrier_table_ids
1352                );
1353                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1354                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1355                None
1356            }
1357            BarrierKind::Barrier => {
1358                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1359                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1360                    assert_eq!(prev_table_ids, &table_ids);
1361                    *prev_epoch = barrier.epoch;
1362                } else {
1363                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1364                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1365                }
1366                None
1367            }
1368            BarrierKind::Checkpoint => Some(
1369                if let Some((prev_epoch, prev_table_ids)) = self
1370                    .prev_barrier_table_ids
1371                    .replace((barrier.epoch, table_ids))
1372                    && prev_epoch.curr == barrier.epoch.prev
1373                {
1374                    prev_table_ids
1375                } else {
1376                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1377                    HashSet::new()
1378                },
1379            ),
1380        };
1381
1382        if let Some(&mut BarrierState { ref inner, .. }) =
1383            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1384        {
1385            {
1386                panic!(
1387                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1388                    barrier.epoch, inner
1389                );
1390            }
1391        };
1392
1393        self.epoch_barrier_state_map.insert(
1394            barrier.epoch.prev,
1395            BarrierState {
1396                barrier: barrier.clone(),
1397                inner: ManagedBarrierStateInner::Issued(IssuedState {
1398                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1399                    barrier_inflight_latency: timer,
1400                }),
1401                table_ids,
1402            },
1403        );
1404    }
1405
1406    #[cfg(test)]
1407    async fn pop_next_completed_epoch(&mut self) -> u64 {
1408        if let Some(barrier) = self.may_have_collected_all() {
1409            self.pop_barrier_to_complete(barrier.epoch.prev);
1410            return barrier.epoch.prev;
1411        }
1412        pending().await
1413    }
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418    use std::collections::HashSet;
1419
1420    use risingwave_common::util::epoch::test_epoch;
1421
1422    use crate::executor::Barrier;
1423    use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1424
1425    #[tokio::test]
1426    async fn test_managed_state_add_actor() {
1427        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1428        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1429        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1430        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1431        let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into()]);
1432        let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into()]);
1433        let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into(), 3.into()]);
1434        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1435        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1436        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1437        managed_barrier_state.collect(1, barrier1.epoch);
1438        managed_barrier_state.collect(2, barrier1.epoch);
1439        assert_eq!(
1440            managed_barrier_state.pop_next_completed_epoch().await,
1441            test_epoch(0)
1442        );
1443        assert_eq!(
1444            managed_barrier_state
1445                .epoch_barrier_state_map
1446                .first_key_value()
1447                .unwrap()
1448                .0,
1449            &test_epoch(1)
1450        );
1451        managed_barrier_state.collect(1, barrier2.epoch);
1452        managed_barrier_state.collect(1, barrier3.epoch);
1453        managed_barrier_state.collect(2, barrier2.epoch);
1454        assert_eq!(
1455            managed_barrier_state.pop_next_completed_epoch().await,
1456            test_epoch(1)
1457        );
1458        assert_eq!(
1459            managed_barrier_state
1460                .epoch_barrier_state_map
1461                .first_key_value()
1462                .unwrap()
1463                .0,
1464            &test_epoch(2)
1465        );
1466        managed_barrier_state.collect(2, barrier3.epoch);
1467        managed_barrier_state.collect(3, barrier3.epoch);
1468        assert_eq!(
1469            managed_barrier_state.pop_next_completed_epoch().await,
1470            test_epoch(2)
1471        );
1472        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1473    }
1474
1475    #[tokio::test]
1476    async fn test_managed_state_stop_actor() {
1477        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1478        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1479        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1480        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1481        let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into(), 3.into(), 4.into()]);
1482        let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into(), 3.into()]);
1483        let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into()]);
1484        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1485        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1486        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1487
1488        managed_barrier_state.collect(1, barrier1.epoch);
1489        managed_barrier_state.collect(1, barrier2.epoch);
1490        managed_barrier_state.collect(1, barrier3.epoch);
1491        managed_barrier_state.collect(2, barrier1.epoch);
1492        managed_barrier_state.collect(2, barrier2.epoch);
1493        managed_barrier_state.collect(2, barrier3.epoch);
1494        assert_eq!(
1495            managed_barrier_state
1496                .epoch_barrier_state_map
1497                .first_key_value()
1498                .unwrap()
1499                .0,
1500            &0
1501        );
1502        managed_barrier_state.collect(3, barrier1.epoch);
1503        managed_barrier_state.collect(3, barrier2.epoch);
1504        assert_eq!(
1505            managed_barrier_state
1506                .epoch_barrier_state_map
1507                .first_key_value()
1508                .unwrap()
1509                .0,
1510            &0
1511        );
1512        managed_barrier_state.collect(4, barrier1.epoch);
1513        assert_eq!(
1514            managed_barrier_state.pop_next_completed_epoch().await,
1515            test_epoch(0)
1516        );
1517        assert_eq!(
1518            managed_barrier_state.pop_next_completed_epoch().await,
1519            test_epoch(1)
1520        );
1521        assert_eq!(
1522            managed_barrier_state.pop_next_completed_epoch().await,
1523            test_epoch(2)
1524        );
1525        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1526    }
1527}