1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::sync::Arc;
19use std::time::Instant;
20
21use educe::Educe;
22use itertools::Itertools;
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::id::{ActorId, PartialGraphId, TableId, WorkerId};
26use risingwave_pb::stream_plan::barrier_mutation::Mutation;
27use risingwave_pb::stream_service::BarrierCompleteResponse;
28use risingwave_pb::stream_service::streaming_control_stream_response::{
29 ResetPartialGraphResponse, Response,
30};
31use tracing::{debug, warn};
32use uuid::Uuid;
33
34use crate::barrier::BarrierKind;
35use crate::barrier::command::PostCollectCommand;
36use crate::barrier::context::GlobalBarrierWorkerContext;
37use crate::barrier::info::BarrierInfo;
38use crate::barrier::notifier::Notifier;
39use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent};
40use crate::barrier::utils::{BarrierItemCollector, NodeToCollect, is_valid_after_worker_err};
41use crate::manager::MetaSrvEnv;
42use crate::model::StreamJobActorsToCreate;
43use crate::{MetaError, MetaResult};
44
45#[derive(Debug)]
46pub(super) struct PartialGraphBarrierInfo {
47 enqueue_time: Instant,
48 pub(super) post_collect_command: PostCollectCommand,
49 pub(super) barrier_info: BarrierInfo,
50 pub(super) notifiers: Vec<Notifier>,
51 pub(super) table_ids_to_commit: HashSet<TableId>,
52}
53
54impl PartialGraphBarrierInfo {
55 pub(super) fn new(
56 post_collect_command: PostCollectCommand,
57 barrier_info: BarrierInfo,
58 notifiers: Vec<Notifier>,
59 table_ids_to_commit: HashSet<TableId>,
60 ) -> Self {
61 Self {
62 enqueue_time: Instant::now(),
63 post_collect_command,
64 barrier_info,
65 notifiers,
66 table_ids_to_commit,
67 }
68 }
69
70 pub(super) fn elapsed_secs(&self) -> f64 {
71 self.enqueue_time.elapsed().as_secs_f64()
72 }
73}
74
75pub(super) trait PartialGraphStat: Send + Sync + 'static {
76 fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64);
77 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize);
78}
79
80#[derive(Educe)]
81#[educe(Debug)]
82struct PartialGraphRunningState {
83 barrier_item_collector:
84 BarrierItemCollector<WorkerId, BarrierCompleteResponse, PartialGraphBarrierInfo>,
85 #[educe(Debug(ignore))]
86 stat: Box<dyn PartialGraphStat>,
87}
88
89impl PartialGraphRunningState {
90 fn new(stat: Box<dyn PartialGraphStat>) -> Self {
91 Self {
92 barrier_item_collector: BarrierItemCollector::new(),
93 stat,
94 }
95 }
96
97 fn is_empty(&self) -> bool {
98 self.barrier_item_collector.is_empty()
99 }
100
101 fn enqueue(&mut self, node_to_collect: NodeToCollect, mut info: PartialGraphBarrierInfo) {
102 let epoch = info.barrier_info.epoch();
103 assert_ne!(info.barrier_info.kind, BarrierKind::Initial);
104 if info.post_collect_command.should_checkpoint() {
105 assert!(info.barrier_info.kind.is_checkpoint());
106 }
107 info.notifiers.iter_mut().for_each(|n| n.notify_started());
108 self.barrier_item_collector
109 .enqueue(epoch, node_to_collect, info);
110 self.stat.observe_barrier_num(
111 self.barrier_item_collector.inflight_barrier_num(),
112 self.barrier_item_collector.collected_barrier_num(),
113 );
114 }
115
116 fn collect(&mut self, resp: BarrierCompleteResponse) {
117 debug!(
118 epoch = resp.epoch,
119 worker_id = %resp.worker_id,
120 partial_graph_id = %resp.partial_graph_id,
121 "collect barrier from worker"
122 );
123 self.barrier_item_collector
124 .collect(resp.epoch, resp.worker_id, resp);
125 }
126
127 fn barrier_collected<'a>(
128 &mut self,
129 temp_ref: &'a CollectedBarrierTempRef,
130 ) -> Option<CollectedBarrier<'a>> {
131 if let Some((epoch, info)) = self.barrier_item_collector.barrier_collected() {
132 self.stat
133 .observe_barrier_latency(epoch, info.elapsed_secs());
134 self.stat.observe_barrier_num(
135 self.barrier_item_collector.inflight_barrier_num(),
136 self.barrier_item_collector.collected_barrier_num(),
137 );
138 Some(temp_ref.collected_barrier(epoch))
139 } else {
140 None
141 }
142 }
143}
144
145#[derive(Debug)]
146struct ResetPartialGraphCollector {
147 remaining_workers: HashSet<WorkerId>,
148 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
149}
150
151impl ResetPartialGraphCollector {
152 fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) -> bool {
153 assert!(self.remaining_workers.remove(&worker_id));
154 self.reset_resps
155 .try_insert(worker_id, resp)
156 .expect("non-duplicate");
157 self.remaining_workers.is_empty()
158 }
159}
160
161#[derive(Educe)]
162#[educe(Debug)]
163enum PartialGraphStatus {
164 Running(PartialGraphRunningState),
165 Resetting(ResetPartialGraphCollector),
166 Initializing {
167 epoch: EpochPair,
168 node_to_collect: NodeToCollect,
169 #[educe(Debug(ignore))]
170 stat: Option<Box<dyn PartialGraphStat>>,
171 },
172}
173
174impl PartialGraphStatus {
175 fn collect<'a>(
176 &mut self,
177 worker_id: WorkerId,
178 resp: BarrierCompleteResponse,
179 temp_ref: &'a CollectedBarrierTempRef,
180 ) -> Option<PartialGraphEvent<'a>> {
181 assert_eq!(worker_id, resp.worker_id);
182 match self {
183 PartialGraphStatus::Running(state) => {
184 state.collect(resp);
185 state
186 .barrier_collected(temp_ref)
187 .map(PartialGraphEvent::BarrierCollected)
188 }
189 PartialGraphStatus::Resetting(_) => None,
190 PartialGraphStatus::Initializing {
191 epoch,
192 node_to_collect,
193 stat,
194 } => {
195 assert_eq!(epoch.prev, resp.epoch);
196 assert!(node_to_collect.remove(&worker_id));
197 if node_to_collect.is_empty() {
198 *self = PartialGraphStatus::Running(PartialGraphRunningState::new(
199 stat.take().expect("should be taken for once"),
200 ));
201 Some(PartialGraphEvent::Initialized)
202 } else {
203 None
204 }
205 }
206 }
207 }
208}
209
210struct CollectedBarrierTempRef {
216 resps: &'static HashMap<WorkerId, BarrierCompleteResponse>,
217}
218
219impl CollectedBarrierTempRef {
220 fn new() -> Self {
221 static EMPTY: std::sync::LazyLock<HashMap<WorkerId, BarrierCompleteResponse>> =
222 std::sync::LazyLock::new(HashMap::new);
223 CollectedBarrierTempRef { resps: &EMPTY }
224 }
225
226 fn collected_barrier(&self, epoch: EpochPair) -> CollectedBarrier<'_> {
227 CollectedBarrier {
228 epoch,
229 resps: self.resps,
230 }
231 }
232
233 fn correct_lifetime<'a>(
234 &self,
235 event: PartialGraphManagerEvent<'_>,
236 manager: &'a PartialGraphManager,
237 ) -> PartialGraphManagerEvent<'a> {
238 match event {
239 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
240 let event = match event {
241 PartialGraphEvent::BarrierCollected(collected) => {
242 let state = manager.running_graph(partial_graph_id);
243 let (epoch, resps, _) = state
244 .barrier_item_collector
245 .last_collected()
246 .expect("should exist");
247 assert_eq!(epoch, collected.epoch);
248 PartialGraphEvent::BarrierCollected(CollectedBarrier { epoch, resps })
249 }
250 PartialGraphEvent::Reset(resps) => PartialGraphEvent::Reset(resps),
251 PartialGraphEvent::Initialized => PartialGraphEvent::Initialized,
252 PartialGraphEvent::Error(worker_id) => PartialGraphEvent::Error(worker_id),
253 };
254 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event)
255 }
256 PartialGraphManagerEvent::Worker(worker_id, event) => {
257 PartialGraphManagerEvent::Worker(worker_id, event)
258 }
259 }
260 }
261}
262
263#[derive(Debug)]
264pub(super) struct CollectedBarrier<'a> {
265 pub epoch: EpochPair,
266 pub resps: &'a HashMap<WorkerId, BarrierCompleteResponse>,
267}
268
269pub(super) enum PartialGraphEvent<'a> {
270 BarrierCollected(CollectedBarrier<'a>),
271 Reset(HashMap<WorkerId, ResetPartialGraphResponse>),
272 Initialized,
273 Error(WorkerId),
274}
275
276pub(super) enum WorkerEvent {
277 WorkerError {
278 err: MetaError,
279 affected_partial_graphs: HashSet<PartialGraphId>,
280 },
281 WorkerConnected,
282}
283
284fn existing_graphs(
285 graphs: &HashMap<PartialGraphId, PartialGraphStatus>,
286) -> impl Iterator<Item = PartialGraphId> + '_ {
287 graphs
288 .iter()
289 .filter_map(|(partial_graph_id, status)| match status {
290 PartialGraphStatus::Running(_) | PartialGraphStatus::Initializing { .. } => {
291 Some(*partial_graph_id)
292 }
293 PartialGraphStatus::Resetting(_) => None,
294 })
295}
296
297pub(super) struct PartialGraphManager {
298 control_stream_manager: ControlStreamManager,
299 term_id: String,
300 graphs: HashMap<PartialGraphId, PartialGraphStatus>,
301}
302
303impl PartialGraphManager {
304 pub(super) fn uninitialized(env: MetaSrvEnv) -> Self {
305 Self {
306 control_stream_manager: ControlStreamManager::new(env),
307 term_id: "uninitialized".to_owned(),
308 graphs: HashMap::new(),
309 }
310 }
311
312 pub(super) async fn recover(
313 env: MetaSrvEnv,
314 nodes: &HashMap<WorkerId, WorkerNode>,
315 context: Arc<impl GlobalBarrierWorkerContext>,
316 ) -> Self {
317 let term_id = Uuid::new_v4().to_string();
318 let control_stream_manager =
319 ControlStreamManager::recover(env, nodes, &term_id, context).await;
320 Self {
321 control_stream_manager,
322 term_id,
323 graphs: Default::default(),
324 }
325 }
326
327 pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
328 &self.control_stream_manager
329 }
330
331 pub(super) async fn add_worker(
332 &mut self,
333 node: WorkerNode,
334 context: &impl GlobalBarrierWorkerContext,
335 ) {
336 self.control_stream_manager
337 .add_worker(node, existing_graphs(&self.graphs), &self.term_id, context)
338 .await
339 }
340
341 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
342 self.control_stream_manager.remove_worker(node);
343 }
344
345 pub(super) fn clear_worker(&mut self) {
346 self.control_stream_manager.clear();
347 }
348
349 pub(crate) fn notify_all_err(&mut self, err: &MetaError) {
350 for (_, graph) in self.graphs.drain() {
351 if let PartialGraphStatus::Running(graph) = graph {
352 for info in graph.barrier_item_collector.into_infos() {
353 for notifier in info.notifiers {
354 notifier.notify_collection_failed(err.clone());
355 }
356 }
357 }
358 }
359 }
360}
361
362#[must_use]
363pub(super) struct PartialGraphAdder<'a> {
364 partial_graph_id: PartialGraphId,
365 manager: &'a mut PartialGraphManager,
366 consumed: bool,
367}
368
369impl PartialGraphAdder<'_> {
370 pub(super) fn added(mut self) {
371 self.consumed = true;
372 }
373
374 pub(super) fn failed(mut self) {
375 self.manager.reset_partial_graphs([self.partial_graph_id]);
376 self.consumed = true;
377 }
378
379 pub(super) fn manager(&mut self) -> &mut PartialGraphManager {
380 self.manager
381 }
382}
383
384impl Drop for PartialGraphAdder<'_> {
385 fn drop(&mut self) {
386 debug_assert!(self.consumed, "unconsumed graph adder");
387 if !self.consumed {
388 warn!(partial_graph_id = %self.partial_graph_id, "unconsumed graph adder");
389 }
390 }
391}
392
393impl PartialGraphManager {
394 pub(super) fn add_partial_graph(
395 &mut self,
396 partial_graph_id: PartialGraphId,
397 stat: impl PartialGraphStat,
398 ) -> PartialGraphAdder<'_> {
399 self.graphs
400 .try_insert(
401 partial_graph_id,
402 PartialGraphStatus::Running(PartialGraphRunningState::new(Box::new(stat))),
403 )
404 .expect("non-duplicated");
405 self.control_stream_manager
406 .add_partial_graph(partial_graph_id);
407 PartialGraphAdder {
408 partial_graph_id,
409 manager: self,
410 consumed: false,
411 }
412 }
413
414 pub(super) fn remove_partial_graphs(&mut self, partial_graphs: Vec<PartialGraphId>) {
415 for partial_graph_id in &partial_graphs {
416 let graph = self.graphs.remove(partial_graph_id).expect("should exist");
417 let PartialGraphStatus::Running(state) = graph else {
418 panic!("graph to be explicitly removed should be running");
419 };
420 assert!(state.is_empty());
421 }
422 self.control_stream_manager
423 .remove_partial_graphs(partial_graphs);
424 }
425
426 pub(super) fn reset_partial_graphs(
427 &mut self,
428 partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
429 ) {
430 let partial_graph_ids = partial_graph_ids.into_iter().collect_vec();
431 let remaining_workers = self
432 .control_stream_manager
433 .reset_partial_graphs(partial_graph_ids.clone());
434 let new_collector = || ResetPartialGraphCollector {
435 remaining_workers: remaining_workers.clone(),
436 reset_resps: Default::default(),
437 };
438 for partial_graph_id in partial_graph_ids {
439 match self.graphs.entry(partial_graph_id) {
440 Entry::Vacant(entry) => {
441 entry.insert(PartialGraphStatus::Resetting(new_collector()));
442 }
443 Entry::Occupied(mut entry) => {
444 let graph = entry.get_mut();
445 match graph {
446 PartialGraphStatus::Resetting(_) => {
447 unreachable!("should not reset again")
448 }
449 PartialGraphStatus::Running(_)
450 | PartialGraphStatus::Initializing { .. } => {
451 *graph = PartialGraphStatus::Resetting(new_collector());
452 }
453 }
454 }
455 }
456 }
457 }
458
459 pub(super) fn assert_resetting(&self, partial_graph_id: PartialGraphId) {
460 let graph = self.graphs.get(&partial_graph_id).expect("should exist");
461 let PartialGraphStatus::Resetting(..) = graph else {
462 panic!("should be at resetting but at {:?}", graph);
463 };
464 }
465
466 pub(super) fn inject_barrier(
467 &mut self,
468 partial_graph_id: PartialGraphId,
469 mutation: Option<Mutation>,
470 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
471 table_ids_to_sync: impl Iterator<Item = TableId>,
472 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
473 new_actors: Option<StreamJobActorsToCreate>,
474 info: PartialGraphBarrierInfo,
475 ) -> MetaResult<()> {
476 let graph = self
477 .graphs
478 .get_mut(&partial_graph_id)
479 .expect("should exist");
480 let node_to_collect = self.control_stream_manager.inject_barrier(
481 partial_graph_id,
482 mutation,
483 &info.barrier_info,
484 node_actors,
485 table_ids_to_sync,
486 nodes_to_sync_table,
487 new_actors,
488 )?;
489 let PartialGraphStatus::Running(state) = graph else {
490 panic!("should not inject barrier on non-running status: {graph:?}")
491 };
492 state.enqueue(node_to_collect, info);
493 Ok(())
494 }
495
496 fn running_graph(&self, partial_graph_id: PartialGraphId) -> &PartialGraphRunningState {
497 let PartialGraphStatus::Running(graph) = &self.graphs[&partial_graph_id] else {
498 unreachable!("should be running")
499 };
500 graph
501 }
502
503 fn running_graph_mut(
504 &mut self,
505 partial_graph_id: PartialGraphId,
506 ) -> &mut PartialGraphRunningState {
507 let PartialGraphStatus::Running(graph) = self
508 .graphs
509 .get_mut(&partial_graph_id)
510 .expect("should exist")
511 else {
512 unreachable!("should be running")
513 };
514 graph
515 }
516
517 pub(super) fn take_collected_barrier(
518 &mut self,
519 partial_graph_id: PartialGraphId,
520 epoch: u64,
521 ) -> (Vec<BarrierCompleteResponse>, PartialGraphBarrierInfo) {
522 let graph = self.running_graph_mut(partial_graph_id);
523 let (_, resps, info) = graph
524 .barrier_item_collector
525 .take_collected_if(|barrier_epoch| {
526 assert_eq!(barrier_epoch.prev, epoch);
527 true
528 })
529 .expect("true cond");
530 (resps.into_values().collect(), info)
531 }
532
533 pub(super) fn has_pending_checkpoint_barrier(&self, partial_graph_id: PartialGraphId) -> bool {
534 self.running_graph(partial_graph_id)
535 .barrier_item_collector
536 .iter_infos()
537 .any(|info| info.barrier_info.kind.is_checkpoint())
538 }
539
540 pub(super) fn start_recover(&mut self) -> PartialGraphRecoverer<'_> {
541 PartialGraphRecoverer {
542 added_partial_graphs: Default::default(),
543 manager: self,
544 consumed: false,
545 }
546 }
547}
548
549#[must_use]
550pub(super) struct PartialGraphRecoverer<'a> {
551 added_partial_graphs: HashSet<PartialGraphId>,
552 manager: &'a mut PartialGraphManager,
553 consumed: bool,
554}
555
556impl PartialGraphRecoverer<'_> {
557 pub(super) fn recover_graph(
558 &mut self,
559 partial_graph_id: PartialGraphId,
560 mutation: Mutation,
561 barrier_info: &BarrierInfo,
562 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
563 table_ids_to_sync: impl Iterator<Item = TableId>,
564 new_actors: StreamJobActorsToCreate,
565 stat: impl PartialGraphStat,
566 ) -> MetaResult<()> {
567 assert!(
568 self.added_partial_graphs.insert(partial_graph_id),
569 "duplicated recover graph {partial_graph_id}"
570 );
571 self.manager
572 .control_stream_manager
573 .add_partial_graph(partial_graph_id);
574 assert!(barrier_info.kind.is_initial());
575 let node_to_collect = self.manager.control_stream_manager.inject_barrier(
576 partial_graph_id,
577 Some(mutation),
578 barrier_info,
579 node_actors,
580 table_ids_to_sync,
581 node_actors.keys().copied(),
582 Some(new_actors),
583 )?;
584 self.manager
585 .graphs
586 .try_insert(
587 partial_graph_id,
588 PartialGraphStatus::Initializing {
589 epoch: barrier_info.epoch(),
590 node_to_collect,
591 stat: Some(Box::new(stat)),
592 },
593 )
594 .expect("non-duplicated");
595 Ok(())
596 }
597
598 pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
599 &self.manager.control_stream_manager
600 }
601
602 pub(super) fn all_initializing(mut self) -> HashSet<PartialGraphId> {
603 self.consumed = true;
604 take(&mut self.added_partial_graphs)
605 }
606
607 pub(super) fn failed(mut self) -> HashSet<PartialGraphId> {
608 self.manager
609 .reset_partial_graphs(self.added_partial_graphs.iter().copied());
610 self.consumed = true;
611 take(&mut self.added_partial_graphs)
612 }
613}
614
615impl Drop for PartialGraphRecoverer<'_> {
616 fn drop(&mut self) {
617 debug_assert!(self.consumed, "unconsumed graph recoverer");
618 if !self.consumed {
619 warn!(partial_graph_ids = ?self.added_partial_graphs, "unconsumed graph recoverer");
620 }
621 }
622}
623
624#[must_use]
625pub(super) enum PartialGraphManagerEvent<'a> {
626 PartialGraph(PartialGraphId, PartialGraphEvent<'a>),
627 Worker(WorkerId, WorkerEvent),
628}
629
630impl PartialGraphManager {
631 pub(super) async fn next_event<'a>(
632 &'a mut self,
633 context: &Arc<impl GlobalBarrierWorkerContext>,
634 ) -> PartialGraphManagerEvent<'a> {
635 let temp_ref = CollectedBarrierTempRef::new();
636 let event = self.next_event_inner(context, &temp_ref).await;
637 temp_ref.correct_lifetime(event, self)
638 }
639
640 async fn next_event_inner<'a>(
641 &mut self,
642 context: &Arc<impl GlobalBarrierWorkerContext>,
643 temp_ref: &'a CollectedBarrierTempRef,
644 ) -> PartialGraphManagerEvent<'a> {
645 for (&partial_graph_id, graph) in &mut self.graphs {
646 match graph {
647 PartialGraphStatus::Running(state) => {
648 if let Some(collected) = state.barrier_collected(temp_ref) {
649 return PartialGraphManagerEvent::PartialGraph(
650 partial_graph_id,
651 PartialGraphEvent::BarrierCollected(collected),
652 );
653 }
654 }
655 PartialGraphStatus::Resetting(collector) => {
656 if collector.remaining_workers.is_empty() {
657 let resps = take(&mut collector.reset_resps);
658 self.graphs.remove(&partial_graph_id);
659 return PartialGraphManagerEvent::PartialGraph(
660 partial_graph_id,
661 PartialGraphEvent::Reset(resps),
662 );
663 }
664 }
665 PartialGraphStatus::Initializing {
666 node_to_collect,
667 stat,
668 ..
669 } => {
670 if node_to_collect.is_empty() {
671 *graph = PartialGraphStatus::Running(PartialGraphRunningState::new(
672 stat.take().expect("should be taken once"),
673 ));
674 return PartialGraphManagerEvent::PartialGraph(
675 partial_graph_id,
676 PartialGraphEvent::Initialized,
677 );
678 }
679 }
680 }
681 }
682 loop {
683 let (worker_id, event) = self
684 .control_stream_manager
685 .next_event(&self.term_id, context)
686 .await;
687 match event {
688 WorkerNodeEvent::Response(result) => match result {
689 Ok(resp) => match resp {
690 Response::CompleteBarrier(resp) => {
691 let partial_graph_id = resp.partial_graph_id;
692 if let Some(event) = self
693 .graphs
694 .get_mut(&partial_graph_id)
695 .expect("should exist")
696 .collect(worker_id, resp, temp_ref)
697 {
698 return PartialGraphManagerEvent::PartialGraph(
699 partial_graph_id,
700 event,
701 );
702 }
703 }
704 Response::ReportPartialGraphFailure(resp) => {
705 let partial_graph_id = resp.partial_graph_id;
706 let graph = self
707 .graphs
708 .get_mut(&partial_graph_id)
709 .expect("should exist");
710 match graph {
711 PartialGraphStatus::Resetting(_) => {
712 }
714 PartialGraphStatus::Running(_)
715 | PartialGraphStatus::Initializing { .. } => {
716 return PartialGraphManagerEvent::PartialGraph(
717 partial_graph_id,
718 PartialGraphEvent::Error(worker_id),
719 );
720 }
721 }
722 }
723 Response::ResetPartialGraph(resp) => {
724 let partial_graph_id = resp.partial_graph_id;
725 let graph = self
726 .graphs
727 .get_mut(&partial_graph_id)
728 .expect("should exist");
729 match graph {
730 PartialGraphStatus::Running(_)
731 | PartialGraphStatus::Initializing { .. } => {
732 if cfg!(debug_assertions) {
733 unreachable!(
734 "should not have reset request when not in resetting state"
735 )
736 } else {
737 warn!(
738 ?resp,
739 "ignore reset resp when not in Resetting state"
740 );
741 }
742 }
743 PartialGraphStatus::Resetting(collector) => {
744 if collector.collect(worker_id, resp) {
745 let resps = take(&mut collector.reset_resps);
746 self.graphs.remove(&partial_graph_id);
747 return PartialGraphManagerEvent::PartialGraph(
748 partial_graph_id,
749 PartialGraphEvent::Reset(resps),
750 );
751 }
752 }
753 }
754 }
755 Response::Init(_) | Response::Shutdown(_) => {
756 unreachable!("should be handled in control stream manager")
757 }
758 },
759 Err(error) => {
760 let affected_partial_graphs = self
761 .graphs
762 .iter_mut()
763 .filter_map(|(partial_graph_id, graph)| match graph {
764 PartialGraphStatus::Running(state) => state
765 .barrier_item_collector
766 .iter_to_collect()
767 .any(|to_collect| {
768 !is_valid_after_worker_err(to_collect, worker_id)
769 })
770 .then_some(*partial_graph_id),
771 PartialGraphStatus::Resetting(collector) => {
772 collector.remaining_workers.remove(&worker_id);
773 None
774 }
775 PartialGraphStatus::Initializing {
776 node_to_collect, ..
777 } => (!is_valid_after_worker_err(node_to_collect, worker_id))
778 .then_some(*partial_graph_id),
779 })
780 .collect();
781 return PartialGraphManagerEvent::Worker(
782 worker_id,
783 WorkerEvent::WorkerError {
784 err: error,
785 affected_partial_graphs,
786 },
787 );
788 }
789 },
790 WorkerNodeEvent::Connected(connected) => {
791 connected.initialize(existing_graphs(&self.graphs));
792 return PartialGraphManagerEvent::Worker(
793 worker_id,
794 WorkerEvent::WorkerConnected,
795 );
796 }
797 }
798 }
799 }
800}