risingwave_meta/barrier/
partial_graph.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::sync::Arc;
19use std::time::Instant;
20
21use educe::Educe;
22use itertools::Itertools;
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::id::{ActorId, PartialGraphId, TableId, WorkerId};
26use risingwave_pb::stream_plan::barrier_mutation::Mutation;
27use risingwave_pb::stream_service::BarrierCompleteResponse;
28use risingwave_pb::stream_service::streaming_control_stream_response::{
29    ResetPartialGraphResponse, Response,
30};
31use tracing::{debug, warn};
32use uuid::Uuid;
33
34use crate::barrier::BarrierKind;
35use crate::barrier::command::PostCollectCommand;
36use crate::barrier::context::GlobalBarrierWorkerContext;
37use crate::barrier::info::BarrierInfo;
38use crate::barrier::notifier::Notifier;
39use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent};
40use crate::barrier::utils::{BarrierItemCollector, NodeToCollect, is_valid_after_worker_err};
41use crate::manager::MetaSrvEnv;
42use crate::model::StreamJobActorsToCreate;
43use crate::{MetaError, MetaResult};
44
45#[derive(Debug)]
46pub(super) struct PartialGraphBarrierInfo {
47    enqueue_time: Instant,
48    pub(super) post_collect_command: PostCollectCommand,
49    pub(super) barrier_info: BarrierInfo,
50    pub(super) notifiers: Vec<Notifier>,
51    pub(super) table_ids_to_commit: HashSet<TableId>,
52}
53
54impl PartialGraphBarrierInfo {
55    pub(super) fn new(
56        post_collect_command: PostCollectCommand,
57        barrier_info: BarrierInfo,
58        notifiers: Vec<Notifier>,
59        table_ids_to_commit: HashSet<TableId>,
60    ) -> Self {
61        Self {
62            enqueue_time: Instant::now(),
63            post_collect_command,
64            barrier_info,
65            notifiers,
66            table_ids_to_commit,
67        }
68    }
69
70    pub(super) fn elapsed_secs(&self) -> f64 {
71        self.enqueue_time.elapsed().as_secs_f64()
72    }
73}
74
75pub(super) trait PartialGraphStat: Send + Sync + 'static {
76    fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64);
77    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize);
78}
79
80#[derive(Educe)]
81#[educe(Debug)]
82struct PartialGraphRunningState {
83    barrier_item_collector:
84        BarrierItemCollector<WorkerId, BarrierCompleteResponse, PartialGraphBarrierInfo>,
85    #[educe(Debug(ignore))]
86    stat: Box<dyn PartialGraphStat>,
87}
88
89impl PartialGraphRunningState {
90    fn new(stat: Box<dyn PartialGraphStat>) -> Self {
91        Self {
92            barrier_item_collector: BarrierItemCollector::new(),
93            stat,
94        }
95    }
96
97    fn is_empty(&self) -> bool {
98        self.barrier_item_collector.is_empty()
99    }
100
101    fn enqueue(&mut self, node_to_collect: NodeToCollect, mut info: PartialGraphBarrierInfo) {
102        let epoch = info.barrier_info.epoch();
103        assert_ne!(info.barrier_info.kind, BarrierKind::Initial);
104        if info.post_collect_command.should_checkpoint() {
105            assert!(info.barrier_info.kind.is_checkpoint());
106        }
107        info.notifiers.iter_mut().for_each(|n| n.notify_started());
108        self.barrier_item_collector
109            .enqueue(epoch, node_to_collect, info);
110        self.stat.observe_barrier_num(
111            self.barrier_item_collector.inflight_barrier_num(),
112            self.barrier_item_collector.collected_barrier_num(),
113        );
114    }
115
116    fn collect(&mut self, resp: BarrierCompleteResponse) {
117        debug!(
118            epoch = resp.epoch,
119            worker_id = %resp.worker_id,
120            partial_graph_id = %resp.partial_graph_id,
121            "collect barrier from worker"
122        );
123        self.barrier_item_collector
124            .collect(resp.epoch, resp.worker_id, resp);
125    }
126
127    fn barrier_collected<'a>(
128        &mut self,
129        temp_ref: &'a CollectedBarrierTempRef,
130    ) -> Option<CollectedBarrier<'a>> {
131        if let Some((epoch, info)) = self.barrier_item_collector.barrier_collected() {
132            self.stat
133                .observe_barrier_latency(epoch, info.elapsed_secs());
134            self.stat.observe_barrier_num(
135                self.barrier_item_collector.inflight_barrier_num(),
136                self.barrier_item_collector.collected_barrier_num(),
137            );
138            Some(temp_ref.collected_barrier(epoch))
139        } else {
140            None
141        }
142    }
143}
144
145#[derive(Debug)]
146struct ResetPartialGraphCollector {
147    remaining_workers: HashSet<WorkerId>,
148    reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
149}
150
151impl ResetPartialGraphCollector {
152    fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) -> bool {
153        assert!(self.remaining_workers.remove(&worker_id));
154        self.reset_resps
155            .try_insert(worker_id, resp)
156            .expect("non-duplicate");
157        self.remaining_workers.is_empty()
158    }
159}
160
161#[derive(Educe)]
162#[educe(Debug)]
163enum PartialGraphStatus {
164    Running(PartialGraphRunningState),
165    Resetting(ResetPartialGraphCollector),
166    Initializing {
167        epoch: EpochPair,
168        node_to_collect: NodeToCollect,
169        #[educe(Debug(ignore))]
170        stat: Option<Box<dyn PartialGraphStat>>,
171    },
172}
173
174impl PartialGraphStatus {
175    fn collect<'a>(
176        &mut self,
177        worker_id: WorkerId,
178        resp: BarrierCompleteResponse,
179        temp_ref: &'a CollectedBarrierTempRef,
180    ) -> Option<PartialGraphEvent<'a>> {
181        assert_eq!(worker_id, resp.worker_id);
182        match self {
183            PartialGraphStatus::Running(state) => {
184                state.collect(resp);
185                state
186                    .barrier_collected(temp_ref)
187                    .map(PartialGraphEvent::BarrierCollected)
188            }
189            PartialGraphStatus::Resetting(_) => None,
190            PartialGraphStatus::Initializing {
191                epoch,
192                node_to_collect,
193                stat,
194            } => {
195                assert_eq!(epoch.prev, resp.epoch);
196                assert!(node_to_collect.remove(&worker_id));
197                if node_to_collect.is_empty() {
198                    *self = PartialGraphStatus::Running(PartialGraphRunningState::new(
199                        stat.take().expect("should be taken for once"),
200                    ));
201                    Some(PartialGraphEvent::Initialized)
202                } else {
203                    None
204                }
205            }
206        }
207    }
208}
209
210/// Holding the reference to a static empty map as a temporary workaround for borrow checker limitation on `CollectedBarrier`.
211/// Mod local method can create a `CollectedBarrierTempRef` locally, and the created `CollectedBarrier` will temporarily reference
212/// to the `CollectedBarrierTempRef`. The method must fill in the correct reference before returning in a mod-pub method.
213///
214/// This struct *must* be private to ensure that the invalid lifetime won't be leaked outside.
215struct CollectedBarrierTempRef {
216    resps: &'static HashMap<WorkerId, BarrierCompleteResponse>,
217}
218
219impl CollectedBarrierTempRef {
220    fn new() -> Self {
221        static EMPTY: std::sync::LazyLock<HashMap<WorkerId, BarrierCompleteResponse>> =
222            std::sync::LazyLock::new(HashMap::new);
223        CollectedBarrierTempRef { resps: &EMPTY }
224    }
225
226    fn collected_barrier(&self, epoch: EpochPair) -> CollectedBarrier<'_> {
227        CollectedBarrier {
228            epoch,
229            resps: self.resps,
230        }
231    }
232
233    fn correct_lifetime<'a>(
234        &self,
235        event: PartialGraphManagerEvent<'_>,
236        manager: &'a PartialGraphManager,
237    ) -> PartialGraphManagerEvent<'a> {
238        match event {
239            PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
240                let event = match event {
241                    PartialGraphEvent::BarrierCollected(collected) => {
242                        let state = manager.running_graph(partial_graph_id);
243                        let (epoch, resps, _) = state
244                            .barrier_item_collector
245                            .last_collected()
246                            .expect("should exist");
247                        assert_eq!(epoch, collected.epoch);
248                        PartialGraphEvent::BarrierCollected(CollectedBarrier { epoch, resps })
249                    }
250                    PartialGraphEvent::Reset(resps) => PartialGraphEvent::Reset(resps),
251                    PartialGraphEvent::Initialized => PartialGraphEvent::Initialized,
252                    PartialGraphEvent::Error(worker_id) => PartialGraphEvent::Error(worker_id),
253                };
254                PartialGraphManagerEvent::PartialGraph(partial_graph_id, event)
255            }
256            PartialGraphManagerEvent::Worker(worker_id, event) => {
257                PartialGraphManagerEvent::Worker(worker_id, event)
258            }
259        }
260    }
261}
262
263#[derive(Debug)]
264pub(super) struct CollectedBarrier<'a> {
265    pub epoch: EpochPair,
266    pub resps: &'a HashMap<WorkerId, BarrierCompleteResponse>,
267}
268
269pub(super) enum PartialGraphEvent<'a> {
270    BarrierCollected(CollectedBarrier<'a>),
271    Reset(HashMap<WorkerId, ResetPartialGraphResponse>),
272    Initialized,
273    Error(WorkerId),
274}
275
276pub(super) enum WorkerEvent {
277    WorkerError {
278        err: MetaError,
279        affected_partial_graphs: HashSet<PartialGraphId>,
280    },
281    WorkerConnected,
282}
283
284fn existing_graphs(
285    graphs: &HashMap<PartialGraphId, PartialGraphStatus>,
286) -> impl Iterator<Item = PartialGraphId> + '_ {
287    graphs
288        .iter()
289        .filter_map(|(partial_graph_id, status)| match status {
290            PartialGraphStatus::Running(_) | PartialGraphStatus::Initializing { .. } => {
291                Some(*partial_graph_id)
292            }
293            PartialGraphStatus::Resetting(_) => None,
294        })
295}
296
297pub(super) struct PartialGraphManager {
298    control_stream_manager: ControlStreamManager,
299    term_id: String,
300    graphs: HashMap<PartialGraphId, PartialGraphStatus>,
301}
302
303impl PartialGraphManager {
304    pub(super) fn uninitialized(env: MetaSrvEnv) -> Self {
305        Self {
306            control_stream_manager: ControlStreamManager::new(env),
307            term_id: "uninitialized".to_owned(),
308            graphs: HashMap::new(),
309        }
310    }
311
312    pub(super) async fn recover(
313        env: MetaSrvEnv,
314        nodes: &HashMap<WorkerId, WorkerNode>,
315        context: Arc<impl GlobalBarrierWorkerContext>,
316    ) -> Self {
317        let term_id = Uuid::new_v4().to_string();
318        let control_stream_manager =
319            ControlStreamManager::recover(env, nodes, &term_id, context).await;
320        Self {
321            control_stream_manager,
322            term_id,
323            graphs: Default::default(),
324        }
325    }
326
327    pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
328        &self.control_stream_manager
329    }
330
331    pub(super) async fn add_worker(
332        &mut self,
333        node: WorkerNode,
334        context: &impl GlobalBarrierWorkerContext,
335    ) {
336        self.control_stream_manager
337            .add_worker(node, existing_graphs(&self.graphs), &self.term_id, context)
338            .await
339    }
340
341    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
342        self.control_stream_manager.remove_worker(node);
343    }
344
345    pub(super) fn clear_worker(&mut self) {
346        self.control_stream_manager.clear();
347    }
348
349    pub(crate) fn notify_all_err(&mut self, err: &MetaError) {
350        for (_, graph) in self.graphs.drain() {
351            if let PartialGraphStatus::Running(graph) = graph {
352                for info in graph.barrier_item_collector.into_infos() {
353                    for notifier in info.notifiers {
354                        notifier.notify_collection_failed(err.clone());
355                    }
356                }
357            }
358        }
359    }
360}
361
362#[must_use]
363pub(super) struct PartialGraphAdder<'a> {
364    partial_graph_id: PartialGraphId,
365    manager: &'a mut PartialGraphManager,
366    consumed: bool,
367}
368
369impl PartialGraphAdder<'_> {
370    pub(super) fn added(mut self) {
371        self.consumed = true;
372    }
373
374    pub(super) fn failed(mut self) {
375        self.manager.reset_partial_graphs([self.partial_graph_id]);
376        self.consumed = true;
377    }
378
379    pub(super) fn manager(&mut self) -> &mut PartialGraphManager {
380        self.manager
381    }
382}
383
384impl Drop for PartialGraphAdder<'_> {
385    fn drop(&mut self) {
386        debug_assert!(self.consumed, "unconsumed graph adder");
387        if !self.consumed {
388            warn!(partial_graph_id = %self.partial_graph_id, "unconsumed graph adder");
389        }
390    }
391}
392
393impl PartialGraphManager {
394    pub(super) fn add_partial_graph(
395        &mut self,
396        partial_graph_id: PartialGraphId,
397        stat: impl PartialGraphStat,
398    ) -> PartialGraphAdder<'_> {
399        self.graphs
400            .try_insert(
401                partial_graph_id,
402                PartialGraphStatus::Running(PartialGraphRunningState::new(Box::new(stat))),
403            )
404            .expect("non-duplicated");
405        self.control_stream_manager
406            .add_partial_graph(partial_graph_id);
407        PartialGraphAdder {
408            partial_graph_id,
409            manager: self,
410            consumed: false,
411        }
412    }
413
414    pub(super) fn remove_partial_graphs(&mut self, partial_graphs: Vec<PartialGraphId>) {
415        for partial_graph_id in &partial_graphs {
416            let graph = self.graphs.remove(partial_graph_id).expect("should exist");
417            let PartialGraphStatus::Running(state) = graph else {
418                panic!("graph to be explicitly removed should be running");
419            };
420            assert!(state.is_empty());
421        }
422        self.control_stream_manager
423            .remove_partial_graphs(partial_graphs);
424    }
425
426    pub(super) fn reset_partial_graphs(
427        &mut self,
428        partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
429    ) {
430        let partial_graph_ids = partial_graph_ids.into_iter().collect_vec();
431        let remaining_workers = self
432            .control_stream_manager
433            .reset_partial_graphs(partial_graph_ids.clone());
434        let new_collector = || ResetPartialGraphCollector {
435            remaining_workers: remaining_workers.clone(),
436            reset_resps: Default::default(),
437        };
438        for partial_graph_id in partial_graph_ids {
439            match self.graphs.entry(partial_graph_id) {
440                Entry::Vacant(entry) => {
441                    entry.insert(PartialGraphStatus::Resetting(new_collector()));
442                }
443                Entry::Occupied(mut entry) => {
444                    let graph = entry.get_mut();
445                    match graph {
446                        PartialGraphStatus::Resetting(_) => {
447                            unreachable!("should not reset again")
448                        }
449                        PartialGraphStatus::Running(_)
450                        | PartialGraphStatus::Initializing { .. } => {
451                            *graph = PartialGraphStatus::Resetting(new_collector());
452                        }
453                    }
454                }
455            }
456        }
457    }
458
459    pub(super) fn assert_resetting(&self, partial_graph_id: PartialGraphId) {
460        let graph = self.graphs.get(&partial_graph_id).expect("should exist");
461        let PartialGraphStatus::Resetting(..) = graph else {
462            panic!("should be at resetting but at {:?}", graph);
463        };
464    }
465
466    pub(super) fn inject_barrier(
467        &mut self,
468        partial_graph_id: PartialGraphId,
469        mutation: Option<Mutation>,
470        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
471        table_ids_to_sync: impl Iterator<Item = TableId>,
472        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
473        new_actors: Option<StreamJobActorsToCreate>,
474        info: PartialGraphBarrierInfo,
475    ) -> MetaResult<()> {
476        let graph = self
477            .graphs
478            .get_mut(&partial_graph_id)
479            .expect("should exist");
480        let node_to_collect = self.control_stream_manager.inject_barrier(
481            partial_graph_id,
482            mutation,
483            &info.barrier_info,
484            node_actors,
485            table_ids_to_sync,
486            nodes_to_sync_table,
487            new_actors,
488        )?;
489        let PartialGraphStatus::Running(state) = graph else {
490            panic!("should not inject barrier on non-running status: {graph:?}")
491        };
492        state.enqueue(node_to_collect, info);
493        Ok(())
494    }
495
496    fn running_graph(&self, partial_graph_id: PartialGraphId) -> &PartialGraphRunningState {
497        let PartialGraphStatus::Running(graph) = &self.graphs[&partial_graph_id] else {
498            unreachable!("should be running")
499        };
500        graph
501    }
502
503    fn running_graph_mut(
504        &mut self,
505        partial_graph_id: PartialGraphId,
506    ) -> &mut PartialGraphRunningState {
507        let PartialGraphStatus::Running(graph) = self
508            .graphs
509            .get_mut(&partial_graph_id)
510            .expect("should exist")
511        else {
512            unreachable!("should be running")
513        };
514        graph
515    }
516
517    pub(super) fn take_collected_barrier(
518        &mut self,
519        partial_graph_id: PartialGraphId,
520        epoch: u64,
521    ) -> (Vec<BarrierCompleteResponse>, PartialGraphBarrierInfo) {
522        let graph = self.running_graph_mut(partial_graph_id);
523        let (_, resps, info) = graph
524            .barrier_item_collector
525            .take_collected_if(|barrier_epoch| {
526                assert_eq!(barrier_epoch.prev, epoch);
527                true
528            })
529            .expect("true cond");
530        (resps.into_values().collect(), info)
531    }
532
533    pub(super) fn has_pending_checkpoint_barrier(&self, partial_graph_id: PartialGraphId) -> bool {
534        self.running_graph(partial_graph_id)
535            .barrier_item_collector
536            .iter_infos()
537            .any(|info| info.barrier_info.kind.is_checkpoint())
538    }
539
540    pub(super) fn start_recover(&mut self) -> PartialGraphRecoverer<'_> {
541        PartialGraphRecoverer {
542            added_partial_graphs: Default::default(),
543            manager: self,
544            consumed: false,
545        }
546    }
547}
548
549#[must_use]
550pub(super) struct PartialGraphRecoverer<'a> {
551    added_partial_graphs: HashSet<PartialGraphId>,
552    manager: &'a mut PartialGraphManager,
553    consumed: bool,
554}
555
556impl PartialGraphRecoverer<'_> {
557    pub(super) fn recover_graph(
558        &mut self,
559        partial_graph_id: PartialGraphId,
560        mutation: Mutation,
561        barrier_info: &BarrierInfo,
562        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
563        table_ids_to_sync: impl Iterator<Item = TableId>,
564        new_actors: StreamJobActorsToCreate,
565        stat: impl PartialGraphStat,
566    ) -> MetaResult<()> {
567        assert!(
568            self.added_partial_graphs.insert(partial_graph_id),
569            "duplicated recover graph {partial_graph_id}"
570        );
571        self.manager
572            .control_stream_manager
573            .add_partial_graph(partial_graph_id);
574        assert!(barrier_info.kind.is_initial());
575        let node_to_collect = self.manager.control_stream_manager.inject_barrier(
576            partial_graph_id,
577            Some(mutation),
578            barrier_info,
579            node_actors,
580            table_ids_to_sync,
581            node_actors.keys().copied(),
582            Some(new_actors),
583        )?;
584        self.manager
585            .graphs
586            .try_insert(
587                partial_graph_id,
588                PartialGraphStatus::Initializing {
589                    epoch: barrier_info.epoch(),
590                    node_to_collect,
591                    stat: Some(Box::new(stat)),
592                },
593            )
594            .expect("non-duplicated");
595        Ok(())
596    }
597
598    pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
599        &self.manager.control_stream_manager
600    }
601
602    pub(super) fn all_initializing(mut self) -> HashSet<PartialGraphId> {
603        self.consumed = true;
604        take(&mut self.added_partial_graphs)
605    }
606
607    pub(super) fn failed(mut self) -> HashSet<PartialGraphId> {
608        self.manager
609            .reset_partial_graphs(self.added_partial_graphs.iter().copied());
610        self.consumed = true;
611        take(&mut self.added_partial_graphs)
612    }
613}
614
615impl Drop for PartialGraphRecoverer<'_> {
616    fn drop(&mut self) {
617        debug_assert!(self.consumed, "unconsumed graph recoverer");
618        if !self.consumed {
619            warn!(partial_graph_ids = ?self.added_partial_graphs, "unconsumed graph recoverer");
620        }
621    }
622}
623
624#[must_use]
625pub(super) enum PartialGraphManagerEvent<'a> {
626    PartialGraph(PartialGraphId, PartialGraphEvent<'a>),
627    Worker(WorkerId, WorkerEvent),
628}
629
630impl PartialGraphManager {
631    pub(super) async fn next_event<'a>(
632        &'a mut self,
633        context: &Arc<impl GlobalBarrierWorkerContext>,
634    ) -> PartialGraphManagerEvent<'a> {
635        let temp_ref = CollectedBarrierTempRef::new();
636        let event = self.next_event_inner(context, &temp_ref).await;
637        temp_ref.correct_lifetime(event, self)
638    }
639
640    async fn next_event_inner<'a>(
641        &mut self,
642        context: &Arc<impl GlobalBarrierWorkerContext>,
643        temp_ref: &'a CollectedBarrierTempRef,
644    ) -> PartialGraphManagerEvent<'a> {
645        for (&partial_graph_id, graph) in &mut self.graphs {
646            match graph {
647                PartialGraphStatus::Running(state) => {
648                    if let Some(collected) = state.barrier_collected(temp_ref) {
649                        return PartialGraphManagerEvent::PartialGraph(
650                            partial_graph_id,
651                            PartialGraphEvent::BarrierCollected(collected),
652                        );
653                    }
654                }
655                PartialGraphStatus::Resetting(collector) => {
656                    if collector.remaining_workers.is_empty() {
657                        let resps = take(&mut collector.reset_resps);
658                        self.graphs.remove(&partial_graph_id);
659                        return PartialGraphManagerEvent::PartialGraph(
660                            partial_graph_id,
661                            PartialGraphEvent::Reset(resps),
662                        );
663                    }
664                }
665                PartialGraphStatus::Initializing {
666                    node_to_collect,
667                    stat,
668                    ..
669                } => {
670                    if node_to_collect.is_empty() {
671                        *graph = PartialGraphStatus::Running(PartialGraphRunningState::new(
672                            stat.take().expect("should be taken once"),
673                        ));
674                        return PartialGraphManagerEvent::PartialGraph(
675                            partial_graph_id,
676                            PartialGraphEvent::Initialized,
677                        );
678                    }
679                }
680            }
681        }
682        loop {
683            let (worker_id, event) = self
684                .control_stream_manager
685                .next_event(&self.term_id, context)
686                .await;
687            match event {
688                WorkerNodeEvent::Response(result) => match result {
689                    Ok(resp) => match resp {
690                        Response::CompleteBarrier(resp) => {
691                            let partial_graph_id = resp.partial_graph_id;
692                            if let Some(event) = self
693                                .graphs
694                                .get_mut(&partial_graph_id)
695                                .expect("should exist")
696                                .collect(worker_id, resp, temp_ref)
697                            {
698                                return PartialGraphManagerEvent::PartialGraph(
699                                    partial_graph_id,
700                                    event,
701                                );
702                            }
703                        }
704                        Response::ReportPartialGraphFailure(resp) => {
705                            let partial_graph_id = resp.partial_graph_id;
706                            let graph = self
707                                .graphs
708                                .get_mut(&partial_graph_id)
709                                .expect("should exist");
710                            match graph {
711                                PartialGraphStatus::Resetting(_) => {
712                                    // ignore reported error when resetting
713                                }
714                                PartialGraphStatus::Running(_)
715                                | PartialGraphStatus::Initializing { .. } => {
716                                    return PartialGraphManagerEvent::PartialGraph(
717                                        partial_graph_id,
718                                        PartialGraphEvent::Error(worker_id),
719                                    );
720                                }
721                            }
722                        }
723                        Response::ResetPartialGraph(resp) => {
724                            let partial_graph_id = resp.partial_graph_id;
725                            let graph = self
726                                .graphs
727                                .get_mut(&partial_graph_id)
728                                .expect("should exist");
729                            match graph {
730                                PartialGraphStatus::Running(_)
731                                | PartialGraphStatus::Initializing { .. } => {
732                                    if cfg!(debug_assertions) {
733                                        unreachable!(
734                                            "should not have reset request when not in resetting state"
735                                        )
736                                    } else {
737                                        warn!(
738                                            ?resp,
739                                            "ignore reset resp when not in Resetting state"
740                                        );
741                                    }
742                                }
743                                PartialGraphStatus::Resetting(collector) => {
744                                    if collector.collect(worker_id, resp) {
745                                        let resps = take(&mut collector.reset_resps);
746                                        self.graphs.remove(&partial_graph_id);
747                                        return PartialGraphManagerEvent::PartialGraph(
748                                            partial_graph_id,
749                                            PartialGraphEvent::Reset(resps),
750                                        );
751                                    }
752                                }
753                            }
754                        }
755                        Response::Init(_) | Response::Shutdown(_) => {
756                            unreachable!("should be handled in control stream manager")
757                        }
758                    },
759                    Err(error) => {
760                        let affected_partial_graphs = self
761                            .graphs
762                            .iter_mut()
763                            .filter_map(|(partial_graph_id, graph)| match graph {
764                                PartialGraphStatus::Running(state) => state
765                                    .barrier_item_collector
766                                    .iter_to_collect()
767                                    .any(|to_collect| {
768                                        !is_valid_after_worker_err(to_collect, worker_id)
769                                    })
770                                    .then_some(*partial_graph_id),
771                                PartialGraphStatus::Resetting(collector) => {
772                                    collector.remaining_workers.remove(&worker_id);
773                                    None
774                                }
775                                PartialGraphStatus::Initializing {
776                                    node_to_collect, ..
777                                } => (!is_valid_after_worker_err(node_to_collect, worker_id))
778                                    .then_some(*partial_graph_id),
779                            })
780                            .collect();
781                        return PartialGraphManagerEvent::Worker(
782                            worker_id,
783                            WorkerEvent::WorkerError {
784                                err: error,
785                                affected_partial_graphs,
786                            },
787                        );
788                    }
789                },
790                WorkerNodeEvent::Connected(connected) => {
791                    connected.initialize(existing_graphs(&self.graphs));
792                    return PartialGraphManagerEvent::Worker(
793                        worker_id,
794                        WorkerEvent::WorkerConnected,
795                    );
796                }
797            }
798        }
799    }
800}