risingwave_meta/barrier/
partial_graph.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::Bound::Unbounded;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::ops::{Bound, RangeBounds};
20use std::sync::Arc;
21use std::time::Instant;
22
23use educe::Educe;
24use itertools::Itertools;
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::id::{ActorId, PartialGraphId, TableId, WorkerId};
28use risingwave_pb::stream_plan::barrier_mutation::Mutation;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::{
31    ResetPartialGraphResponse, Response,
32};
33use tracing::{debug, warn};
34use uuid::Uuid;
35
36use crate::barrier::BarrierKind;
37use crate::barrier::command::PostCollectCommand;
38use crate::barrier::context::GlobalBarrierWorkerContext;
39use crate::barrier::info::BarrierInfo;
40use crate::barrier::notifier::Notifier;
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent};
42use crate::barrier::utils::{BarrierItemCollector, NodeToCollect, is_valid_after_worker_err};
43use crate::manager::MetaSrvEnv;
44use crate::model::StreamJobActorsToCreate;
45use crate::{MetaError, MetaResult};
46
47#[derive(Debug)]
48pub(super) struct PartialGraphBarrierInfo {
49    enqueue_time: Instant,
50    pub(super) post_collect_command: PostCollectCommand,
51    pub(super) barrier_info: BarrierInfo,
52    pub(super) notifiers: Vec<Notifier>,
53    pub(super) table_ids_to_commit: HashSet<TableId>,
54}
55
56impl PartialGraphBarrierInfo {
57    pub(super) fn new(
58        post_collect_command: PostCollectCommand,
59        barrier_info: BarrierInfo,
60        notifiers: Vec<Notifier>,
61        table_ids_to_commit: HashSet<TableId>,
62    ) -> Self {
63        Self {
64            enqueue_time: Instant::now(),
65            post_collect_command,
66            barrier_info,
67            notifiers,
68            table_ids_to_commit,
69        }
70    }
71
72    pub(super) fn elapsed_secs(&self) -> f64 {
73        self.enqueue_time.elapsed().as_secs_f64()
74    }
75}
76
77pub(super) trait PartialGraphStat: Send + Sync + 'static {
78    fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64);
79    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize);
80}
81
82#[derive(Educe)]
83#[educe(Debug)]
84struct PartialGraphRunningState {
85    barrier_item_collector:
86        BarrierItemCollector<WorkerId, BarrierCompleteResponse, PartialGraphBarrierInfo>,
87    completing_epoch: Option<u64>,
88    #[educe(Debug(ignore))]
89    stat: Box<dyn PartialGraphStat>,
90}
91
92impl PartialGraphRunningState {
93    fn new(stat: Box<dyn PartialGraphStat>) -> Self {
94        Self {
95            barrier_item_collector: BarrierItemCollector::new(),
96            completing_epoch: None,
97            stat,
98        }
99    }
100
101    fn is_empty(&self) -> bool {
102        self.barrier_item_collector.is_empty() && self.completing_epoch.is_none()
103    }
104
105    fn enqueue(&mut self, node_to_collect: NodeToCollect, mut info: PartialGraphBarrierInfo) {
106        let epoch = info.barrier_info.epoch();
107        assert_ne!(info.barrier_info.kind, BarrierKind::Initial);
108        if info.post_collect_command.should_checkpoint() {
109            assert!(info.barrier_info.kind.is_checkpoint());
110        }
111        info.notifiers.iter_mut().for_each(|n| n.notify_started());
112        self.barrier_item_collector
113            .enqueue(epoch, node_to_collect, info);
114        self.stat.observe_barrier_num(
115            self.barrier_item_collector.inflight_barrier_num(),
116            self.barrier_item_collector.collected_barrier_num(),
117        );
118    }
119
120    fn collect(&mut self, resp: BarrierCompleteResponse) {
121        debug!(
122            epoch = resp.epoch,
123            worker_id = %resp.worker_id,
124            partial_graph_id = %resp.partial_graph_id,
125            "collect barrier from worker"
126        );
127        self.barrier_item_collector
128            .collect(resp.epoch, resp.worker_id, resp);
129    }
130
131    fn barrier_collected<'a>(
132        &mut self,
133        temp_ref: &'a CollectedBarrierTempRef,
134    ) -> Option<CollectedBarrier<'a>> {
135        if let Some((epoch, info)) = self.barrier_item_collector.barrier_collected() {
136            self.stat
137                .observe_barrier_latency(epoch, info.elapsed_secs());
138            self.stat.observe_barrier_num(
139                self.barrier_item_collector.inflight_barrier_num(),
140                self.barrier_item_collector.collected_barrier_num(),
141            );
142            Some(temp_ref.collected_barrier(epoch))
143        } else {
144            None
145        }
146    }
147}
148
149#[derive(Debug)]
150struct ResetPartialGraphCollector {
151    remaining_workers: HashSet<WorkerId>,
152    reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
153}
154
155impl ResetPartialGraphCollector {
156    fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) -> bool {
157        assert!(self.remaining_workers.remove(&worker_id));
158        self.reset_resps
159            .try_insert(worker_id, resp)
160            .expect("non-duplicate");
161        self.remaining_workers.is_empty()
162    }
163}
164
165#[derive(Educe)]
166#[educe(Debug)]
167enum PartialGraphStatus {
168    Running(PartialGraphRunningState),
169    Resetting(ResetPartialGraphCollector),
170    Initializing {
171        epoch: EpochPair,
172        node_to_collect: NodeToCollect,
173        #[educe(Debug(ignore))]
174        stat: Option<Box<dyn PartialGraphStat>>,
175    },
176}
177
178impl PartialGraphStatus {
179    fn collect<'a>(
180        &mut self,
181        worker_id: WorkerId,
182        resp: BarrierCompleteResponse,
183        temp_ref: &'a CollectedBarrierTempRef,
184    ) -> Option<PartialGraphEvent<'a>> {
185        assert_eq!(worker_id, resp.worker_id);
186        match self {
187            PartialGraphStatus::Running(state) => {
188                state.collect(resp);
189                state
190                    .barrier_collected(temp_ref)
191                    .map(PartialGraphEvent::BarrierCollected)
192            }
193            PartialGraphStatus::Resetting(_) => None,
194            PartialGraphStatus::Initializing {
195                epoch,
196                node_to_collect,
197                stat,
198            } => {
199                assert_eq!(epoch.prev, resp.epoch);
200                assert!(node_to_collect.remove(&worker_id));
201                if node_to_collect.is_empty() {
202                    *self = PartialGraphStatus::Running(PartialGraphRunningState::new(
203                        stat.take().expect("should be taken for once"),
204                    ));
205                    Some(PartialGraphEvent::Initialized)
206                } else {
207                    None
208                }
209            }
210        }
211    }
212}
213
214/// Holding the reference to a static empty map as a temporary workaround for borrow checker limitation on `CollectedBarrier`.
215/// Mod local method can create a `CollectedBarrierTempRef` locally, and the created `CollectedBarrier` will temporarily reference
216/// to the `CollectedBarrierTempRef`. The method must fill in the correct reference before returning in a mod-pub method.
217///
218/// This struct *must* be private to ensure that the invalid lifetime won't be leaked outside.
219struct CollectedBarrierTempRef {
220    resps: &'static HashMap<WorkerId, BarrierCompleteResponse>,
221}
222
223impl CollectedBarrierTempRef {
224    fn new() -> Self {
225        static EMPTY: std::sync::LazyLock<HashMap<WorkerId, BarrierCompleteResponse>> =
226            std::sync::LazyLock::new(HashMap::new);
227        CollectedBarrierTempRef { resps: &EMPTY }
228    }
229
230    fn collected_barrier(&self, epoch: EpochPair) -> CollectedBarrier<'_> {
231        CollectedBarrier {
232            epoch,
233            resps: self.resps,
234        }
235    }
236
237    fn correct_lifetime<'a>(
238        &self,
239        event: PartialGraphManagerEvent<'_>,
240        manager: &'a PartialGraphManager,
241    ) -> PartialGraphManagerEvent<'a> {
242        match event {
243            PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
244                let event = match event {
245                    PartialGraphEvent::BarrierCollected(collected) => {
246                        let state = manager.running_graph(partial_graph_id);
247                        let (epoch, resps, _) = state
248                            .barrier_item_collector
249                            .last_collected()
250                            .expect("should exist");
251                        assert_eq!(epoch, collected.epoch);
252                        PartialGraphEvent::BarrierCollected(CollectedBarrier { epoch, resps })
253                    }
254                    PartialGraphEvent::Reset(resps) => PartialGraphEvent::Reset(resps),
255                    PartialGraphEvent::Initialized => PartialGraphEvent::Initialized,
256                    PartialGraphEvent::Error(worker_id) => PartialGraphEvent::Error(worker_id),
257                };
258                PartialGraphManagerEvent::PartialGraph(partial_graph_id, event)
259            }
260            PartialGraphManagerEvent::Worker(worker_id, event) => {
261                PartialGraphManagerEvent::Worker(worker_id, event)
262            }
263        }
264    }
265}
266
267#[derive(Debug)]
268pub(super) struct CollectedBarrier<'a> {
269    pub epoch: EpochPair,
270    pub resps: &'a HashMap<WorkerId, BarrierCompleteResponse>,
271}
272
273pub(super) enum PartialGraphEvent<'a> {
274    BarrierCollected(CollectedBarrier<'a>),
275    Reset(HashMap<WorkerId, ResetPartialGraphResponse>),
276    Initialized,
277    Error(WorkerId),
278}
279
280pub(super) enum WorkerEvent {
281    WorkerError {
282        err: MetaError,
283        affected_partial_graphs: HashSet<PartialGraphId>,
284    },
285    WorkerConnected,
286}
287
288fn existing_graphs(
289    graphs: &HashMap<PartialGraphId, PartialGraphStatus>,
290) -> impl Iterator<Item = PartialGraphId> + '_ {
291    graphs
292        .iter()
293        .filter_map(|(partial_graph_id, status)| match status {
294            PartialGraphStatus::Running(_) | PartialGraphStatus::Initializing { .. } => {
295                Some(*partial_graph_id)
296            }
297            PartialGraphStatus::Resetting(_) => None,
298        })
299}
300
301pub(super) struct PartialGraphManager {
302    control_stream_manager: ControlStreamManager,
303    term_id: String,
304    graphs: HashMap<PartialGraphId, PartialGraphStatus>,
305}
306
307impl PartialGraphManager {
308    pub(super) fn uninitialized(env: MetaSrvEnv) -> Self {
309        Self {
310            control_stream_manager: ControlStreamManager::new(env),
311            term_id: "uninitialized".to_owned(),
312            graphs: HashMap::new(),
313        }
314    }
315
316    pub(super) async fn recover(
317        env: MetaSrvEnv,
318        nodes: &HashMap<WorkerId, WorkerNode>,
319        context: Arc<impl GlobalBarrierWorkerContext>,
320    ) -> Self {
321        let term_id = Uuid::new_v4().to_string();
322        let control_stream_manager =
323            ControlStreamManager::recover(env, nodes, &term_id, context).await;
324        Self {
325            control_stream_manager,
326            term_id,
327            graphs: Default::default(),
328        }
329    }
330
331    pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
332        &self.control_stream_manager
333    }
334
335    pub(super) async fn add_worker(
336        &mut self,
337        node: WorkerNode,
338        context: &impl GlobalBarrierWorkerContext,
339    ) {
340        self.control_stream_manager
341            .add_worker(node, existing_graphs(&self.graphs), &self.term_id, context)
342            .await
343    }
344
345    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
346        self.control_stream_manager.remove_worker(node);
347    }
348
349    pub(super) fn clear_worker(&mut self) {
350        self.control_stream_manager.clear();
351    }
352
353    pub(crate) fn notify_all_err(&mut self, err: &MetaError) {
354        for (_, graph) in self.graphs.drain() {
355            if let PartialGraphStatus::Running(graph) = graph {
356                for info in graph.barrier_item_collector.into_infos() {
357                    for notifier in info.notifiers {
358                        notifier.notify_collection_failed(err.clone());
359                    }
360                }
361            }
362        }
363    }
364}
365
366#[must_use]
367pub(super) struct PartialGraphAdder<'a> {
368    partial_graph_id: PartialGraphId,
369    manager: &'a mut PartialGraphManager,
370    consumed: bool,
371}
372
373impl PartialGraphAdder<'_> {
374    pub(super) fn added(mut self) {
375        self.consumed = true;
376    }
377
378    pub(super) fn failed(mut self) {
379        self.manager.reset_partial_graphs([self.partial_graph_id]);
380        self.consumed = true;
381    }
382
383    pub(super) fn manager(&mut self) -> &mut PartialGraphManager {
384        self.manager
385    }
386}
387
388impl Drop for PartialGraphAdder<'_> {
389    fn drop(&mut self) {
390        debug_assert!(self.consumed, "unconsumed graph adder");
391        if !self.consumed {
392            warn!(partial_graph_id = %self.partial_graph_id, "unconsumed graph adder");
393        }
394    }
395}
396
397impl PartialGraphManager {
398    pub(super) fn add_partial_graph(
399        &mut self,
400        partial_graph_id: PartialGraphId,
401        stat: impl PartialGraphStat,
402    ) -> PartialGraphAdder<'_> {
403        self.graphs
404            .try_insert(
405                partial_graph_id,
406                PartialGraphStatus::Running(PartialGraphRunningState::new(Box::new(stat))),
407            )
408            .expect("non-duplicated");
409        self.control_stream_manager
410            .add_partial_graph(partial_graph_id);
411        PartialGraphAdder {
412            partial_graph_id,
413            manager: self,
414            consumed: false,
415        }
416    }
417
418    pub(super) fn remove_partial_graphs(&mut self, partial_graphs: Vec<PartialGraphId>) {
419        for partial_graph_id in &partial_graphs {
420            let graph = self.graphs.remove(partial_graph_id).expect("should exist");
421            let PartialGraphStatus::Running(state) = graph else {
422                panic!("graph to be explicitly removed should be running");
423            };
424            assert!(state.is_empty());
425        }
426        self.control_stream_manager
427            .remove_partial_graphs(partial_graphs);
428    }
429
430    pub(super) fn reset_partial_graphs(
431        &mut self,
432        partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
433    ) {
434        let partial_graph_ids = partial_graph_ids.into_iter().collect_vec();
435        let remaining_workers = self
436            .control_stream_manager
437            .reset_partial_graphs(partial_graph_ids.clone());
438        let new_collector = || ResetPartialGraphCollector {
439            remaining_workers: remaining_workers.clone(),
440            reset_resps: Default::default(),
441        };
442        for partial_graph_id in partial_graph_ids {
443            match self.graphs.entry(partial_graph_id) {
444                Entry::Vacant(entry) => {
445                    entry.insert(PartialGraphStatus::Resetting(new_collector()));
446                }
447                Entry::Occupied(mut entry) => {
448                    let graph = entry.get_mut();
449                    match graph {
450                        PartialGraphStatus::Resetting(_) => {
451                            unreachable!("should not reset again")
452                        }
453                        PartialGraphStatus::Running(_)
454                        | PartialGraphStatus::Initializing { .. } => {
455                            *graph = PartialGraphStatus::Resetting(new_collector());
456                        }
457                    }
458                }
459            }
460        }
461    }
462
463    pub(super) fn assert_resetting(&self, partial_graph_id: PartialGraphId) {
464        let graph = self.graphs.get(&partial_graph_id).expect("should exist");
465        let PartialGraphStatus::Resetting(..) = graph else {
466            panic!("should be at resetting but at {:?}", graph);
467        };
468    }
469
470    pub(super) fn inject_barrier(
471        &mut self,
472        partial_graph_id: PartialGraphId,
473        mutation: Option<Mutation>,
474        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
475        table_ids_to_sync: impl Iterator<Item = TableId>,
476        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
477        new_actors: Option<StreamJobActorsToCreate>,
478        info: PartialGraphBarrierInfo,
479    ) -> MetaResult<()> {
480        let graph = self
481            .graphs
482            .get_mut(&partial_graph_id)
483            .expect("should exist");
484        let node_to_collect = self.control_stream_manager.inject_barrier(
485            partial_graph_id,
486            mutation,
487            &info.barrier_info,
488            node_actors,
489            table_ids_to_sync,
490            nodes_to_sync_table,
491            new_actors,
492        )?;
493        let PartialGraphStatus::Running(state) = graph else {
494            panic!("should not inject barrier on non-running status: {graph:?}")
495        };
496        state.enqueue(node_to_collect, info);
497        Ok(())
498    }
499
500    fn running_graph(&self, partial_graph_id: PartialGraphId) -> &PartialGraphRunningState {
501        let PartialGraphStatus::Running(graph) = &self.graphs[&partial_graph_id] else {
502            unreachable!("should be running")
503        };
504        graph
505    }
506
507    fn running_graph_mut(
508        &mut self,
509        partial_graph_id: PartialGraphId,
510    ) -> &mut PartialGraphRunningState {
511        let PartialGraphStatus::Running(graph) = self
512            .graphs
513            .get_mut(&partial_graph_id)
514            .expect("should exist")
515        else {
516            unreachable!("should be running")
517        };
518        graph
519    }
520
521    pub(super) fn inflight_barrier_num(&self, partial_graph_id: PartialGraphId) -> usize {
522        self.running_graph(partial_graph_id)
523            .barrier_item_collector
524            .inflight_barrier_num()
525    }
526
527    pub(super) fn first_inflight_barrier(
528        &self,
529        partial_graph_id: PartialGraphId,
530    ) -> Option<EpochPair> {
531        self.running_graph(partial_graph_id)
532            .barrier_item_collector
533            .first_inflight_epoch()
534    }
535
536    pub(super) fn start_completing(
537        &mut self,
538        partial_graph_id: PartialGraphId,
539        epoch_end_bound: Bound<u64>,
540        mut on_non_checkpoint_epoch: impl FnMut(
541            EpochPair,
542            HashMap<WorkerId, BarrierCompleteResponse>,
543            PostCollectCommand,
544        ),
545    ) -> Option<(
546        u64,
547        HashMap<WorkerId, BarrierCompleteResponse>,
548        PartialGraphBarrierInfo,
549    )> {
550        let graph = self.running_graph_mut(partial_graph_id);
551        assert!(graph.completing_epoch.is_none());
552        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
553        while let Some((epoch, resps, info)) = graph
554            .barrier_item_collector
555            .take_collected_if(|epoch| epoch_range.contains(&epoch.prev))
556        {
557            if info.post_collect_command.should_checkpoint() {
558                assert!(info.barrier_info.kind.is_checkpoint());
559            } else if !info.barrier_info.kind.is_checkpoint() {
560                info.notifiers
561                    .into_iter()
562                    .for_each(Notifier::notify_collected);
563                on_non_checkpoint_epoch(epoch, resps, info.post_collect_command);
564                continue;
565            }
566            let prev_epoch = info.barrier_info.prev_epoch();
567            graph.completing_epoch = Some(prev_epoch);
568            return Some((prev_epoch, resps, info));
569        }
570        None
571    }
572
573    pub(super) fn ack_completed(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) {
574        assert_eq!(
575            self.running_graph_mut(partial_graph_id)
576                .completing_epoch
577                .take(),
578            Some(prev_epoch)
579        );
580    }
581
582    pub(super) fn has_pending_checkpoint_barrier(&self, partial_graph_id: PartialGraphId) -> bool {
583        self.running_graph(partial_graph_id)
584            .barrier_item_collector
585            .iter_infos()
586            .any(|info| info.barrier_info.kind.is_checkpoint())
587    }
588
589    pub(super) fn start_recover(&mut self) -> PartialGraphRecoverer<'_> {
590        PartialGraphRecoverer {
591            added_partial_graphs: Default::default(),
592            manager: self,
593            consumed: false,
594        }
595    }
596}
597
598#[must_use]
599pub(super) struct PartialGraphRecoverer<'a> {
600    added_partial_graphs: HashSet<PartialGraphId>,
601    manager: &'a mut PartialGraphManager,
602    consumed: bool,
603}
604
605impl PartialGraphRecoverer<'_> {
606    pub(super) fn recover_graph(
607        &mut self,
608        partial_graph_id: PartialGraphId,
609        mutation: Mutation,
610        barrier_info: &BarrierInfo,
611        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
612        table_ids_to_sync: impl Iterator<Item = TableId>,
613        new_actors: StreamJobActorsToCreate,
614        stat: impl PartialGraphStat,
615    ) -> MetaResult<()> {
616        assert!(
617            self.added_partial_graphs.insert(partial_graph_id),
618            "duplicated recover graph {partial_graph_id}"
619        );
620        self.manager
621            .control_stream_manager
622            .add_partial_graph(partial_graph_id);
623        assert!(barrier_info.kind.is_initial());
624        let node_to_collect = self.manager.control_stream_manager.inject_barrier(
625            partial_graph_id,
626            Some(mutation),
627            barrier_info,
628            node_actors,
629            table_ids_to_sync,
630            node_actors.keys().copied(),
631            Some(new_actors),
632        )?;
633        self.manager
634            .graphs
635            .try_insert(
636                partial_graph_id,
637                PartialGraphStatus::Initializing {
638                    epoch: barrier_info.epoch(),
639                    node_to_collect,
640                    stat: Some(Box::new(stat)),
641                },
642            )
643            .expect("non-duplicated");
644        Ok(())
645    }
646
647    pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
648        &self.manager.control_stream_manager
649    }
650
651    pub(super) fn all_initializing(mut self) -> HashSet<PartialGraphId> {
652        self.consumed = true;
653        take(&mut self.added_partial_graphs)
654    }
655
656    pub(super) fn failed(mut self) -> HashSet<PartialGraphId> {
657        self.manager
658            .reset_partial_graphs(self.added_partial_graphs.iter().copied());
659        self.consumed = true;
660        take(&mut self.added_partial_graphs)
661    }
662}
663
664impl Drop for PartialGraphRecoverer<'_> {
665    fn drop(&mut self) {
666        debug_assert!(self.consumed, "unconsumed graph recoverer");
667        if !self.consumed {
668            warn!(partial_graph_ids = ?self.added_partial_graphs, "unconsumed graph recoverer");
669        }
670    }
671}
672
673#[must_use]
674pub(super) enum PartialGraphManagerEvent<'a> {
675    PartialGraph(PartialGraphId, PartialGraphEvent<'a>),
676    Worker(WorkerId, WorkerEvent),
677}
678
679impl PartialGraphManager {
680    pub(super) async fn next_event<'a>(
681        &'a mut self,
682        context: &Arc<impl GlobalBarrierWorkerContext>,
683    ) -> PartialGraphManagerEvent<'a> {
684        let temp_ref = CollectedBarrierTempRef::new();
685        let event = self.next_event_inner(context, &temp_ref).await;
686        temp_ref.correct_lifetime(event, self)
687    }
688
689    async fn next_event_inner<'a>(
690        &mut self,
691        context: &Arc<impl GlobalBarrierWorkerContext>,
692        temp_ref: &'a CollectedBarrierTempRef,
693    ) -> PartialGraphManagerEvent<'a> {
694        for (&partial_graph_id, graph) in &mut self.graphs {
695            match graph {
696                PartialGraphStatus::Running(state) => {
697                    if let Some(collected) = state.barrier_collected(temp_ref) {
698                        return PartialGraphManagerEvent::PartialGraph(
699                            partial_graph_id,
700                            PartialGraphEvent::BarrierCollected(collected),
701                        );
702                    }
703                }
704                PartialGraphStatus::Resetting(collector) => {
705                    if collector.remaining_workers.is_empty() {
706                        let resps = take(&mut collector.reset_resps);
707                        self.graphs.remove(&partial_graph_id);
708                        return PartialGraphManagerEvent::PartialGraph(
709                            partial_graph_id,
710                            PartialGraphEvent::Reset(resps),
711                        );
712                    }
713                }
714                PartialGraphStatus::Initializing {
715                    node_to_collect,
716                    stat,
717                    ..
718                } => {
719                    if node_to_collect.is_empty() {
720                        *graph = PartialGraphStatus::Running(PartialGraphRunningState::new(
721                            stat.take().expect("should be taken once"),
722                        ));
723                        return PartialGraphManagerEvent::PartialGraph(
724                            partial_graph_id,
725                            PartialGraphEvent::Initialized,
726                        );
727                    }
728                }
729            }
730        }
731        loop {
732            let (worker_id, event) = self
733                .control_stream_manager
734                .next_event(&self.term_id, context)
735                .await;
736            match event {
737                WorkerNodeEvent::Response(result) => match result {
738                    Ok(resp) => match resp {
739                        Response::CompleteBarrier(resp) => {
740                            let partial_graph_id = resp.partial_graph_id;
741                            if let Some(event) = self
742                                .graphs
743                                .get_mut(&partial_graph_id)
744                                .expect("should exist")
745                                .collect(worker_id, resp, temp_ref)
746                            {
747                                return PartialGraphManagerEvent::PartialGraph(
748                                    partial_graph_id,
749                                    event,
750                                );
751                            }
752                        }
753                        Response::ReportPartialGraphFailure(resp) => {
754                            let partial_graph_id = resp.partial_graph_id;
755                            let graph = self
756                                .graphs
757                                .get_mut(&partial_graph_id)
758                                .expect("should exist");
759                            match graph {
760                                PartialGraphStatus::Resetting(_) => {
761                                    // ignore reported error when resetting
762                                }
763                                PartialGraphStatus::Running(_)
764                                | PartialGraphStatus::Initializing { .. } => {
765                                    return PartialGraphManagerEvent::PartialGraph(
766                                        partial_graph_id,
767                                        PartialGraphEvent::Error(worker_id),
768                                    );
769                                }
770                            }
771                        }
772                        Response::ResetPartialGraph(resp) => {
773                            let partial_graph_id = resp.partial_graph_id;
774                            let graph = self
775                                .graphs
776                                .get_mut(&partial_graph_id)
777                                .expect("should exist");
778                            match graph {
779                                PartialGraphStatus::Running(_)
780                                | PartialGraphStatus::Initializing { .. } => {
781                                    if cfg!(debug_assertions) {
782                                        unreachable!(
783                                            "should not have reset request when not in resetting state"
784                                        )
785                                    } else {
786                                        warn!(
787                                            ?resp,
788                                            "ignore reset resp when not in Resetting state"
789                                        );
790                                    }
791                                }
792                                PartialGraphStatus::Resetting(collector) => {
793                                    if collector.collect(worker_id, resp) {
794                                        let resps = take(&mut collector.reset_resps);
795                                        self.graphs.remove(&partial_graph_id);
796                                        return PartialGraphManagerEvent::PartialGraph(
797                                            partial_graph_id,
798                                            PartialGraphEvent::Reset(resps),
799                                        );
800                                    }
801                                }
802                            }
803                        }
804                        Response::Init(_) | Response::Shutdown(_) => {
805                            unreachable!("should be handled in control stream manager")
806                        }
807                    },
808                    Err(error) => {
809                        let affected_partial_graphs = self
810                            .graphs
811                            .iter_mut()
812                            .filter_map(|(partial_graph_id, graph)| match graph {
813                                PartialGraphStatus::Running(state) => state
814                                    .barrier_item_collector
815                                    .iter_to_collect()
816                                    .any(|to_collect| {
817                                        !is_valid_after_worker_err(to_collect, worker_id)
818                                    })
819                                    .then_some(*partial_graph_id),
820                                PartialGraphStatus::Resetting(collector) => {
821                                    collector.remaining_workers.remove(&worker_id);
822                                    None
823                                }
824                                PartialGraphStatus::Initializing {
825                                    node_to_collect, ..
826                                } => (!is_valid_after_worker_err(node_to_collect, worker_id))
827                                    .then_some(*partial_graph_id),
828                            })
829                            .collect();
830                        return PartialGraphManagerEvent::Worker(
831                            worker_id,
832                            WorkerEvent::WorkerError {
833                                err: error,
834                                affected_partial_graphs,
835                            },
836                        );
837                    }
838                },
839                WorkerNodeEvent::Connected(connected) => {
840                    connected.initialize(existing_graphs(&self.graphs));
841                    return PartialGraphManagerEvent::Worker(
842                        worker_id,
843                        WorkerEvent::WorkerConnected,
844                    );
845                }
846            }
847        }
848    }
849}