1use std::collections::Bound::Unbounded;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::ops::{Bound, RangeBounds};
20use std::sync::Arc;
21use std::time::Instant;
22
23use educe::Educe;
24use itertools::Itertools;
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_pb::common::WorkerNode;
27use risingwave_pb::id::{ActorId, PartialGraphId, TableId, WorkerId};
28use risingwave_pb::stream_plan::barrier_mutation::Mutation;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::{
31 ResetPartialGraphResponse, Response,
32};
33use tracing::{debug, warn};
34use uuid::Uuid;
35
36use crate::barrier::BarrierKind;
37use crate::barrier::command::PostCollectCommand;
38use crate::barrier::context::GlobalBarrierWorkerContext;
39use crate::barrier::info::BarrierInfo;
40use crate::barrier::notifier::Notifier;
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent};
42use crate::barrier::utils::{BarrierItemCollector, NodeToCollect, is_valid_after_worker_err};
43use crate::manager::MetaSrvEnv;
44use crate::model::StreamJobActorsToCreate;
45use crate::{MetaError, MetaResult};
46
47#[derive(Debug)]
48pub(super) struct PartialGraphBarrierInfo {
49 enqueue_time: Instant,
50 pub(super) post_collect_command: PostCollectCommand,
51 pub(super) barrier_info: BarrierInfo,
52 pub(super) notifiers: Vec<Notifier>,
53 pub(super) table_ids_to_commit: HashSet<TableId>,
54}
55
56impl PartialGraphBarrierInfo {
57 pub(super) fn new(
58 post_collect_command: PostCollectCommand,
59 barrier_info: BarrierInfo,
60 notifiers: Vec<Notifier>,
61 table_ids_to_commit: HashSet<TableId>,
62 ) -> Self {
63 Self {
64 enqueue_time: Instant::now(),
65 post_collect_command,
66 barrier_info,
67 notifiers,
68 table_ids_to_commit,
69 }
70 }
71
72 pub(super) fn elapsed_secs(&self) -> f64 {
73 self.enqueue_time.elapsed().as_secs_f64()
74 }
75}
76
77pub(super) trait PartialGraphStat: Send + Sync + 'static {
78 fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64);
79 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize);
80}
81
82#[derive(Educe)]
83#[educe(Debug)]
84struct PartialGraphRunningState {
85 barrier_item_collector:
86 BarrierItemCollector<WorkerId, BarrierCompleteResponse, PartialGraphBarrierInfo>,
87 completing_epoch: Option<u64>,
88 #[educe(Debug(ignore))]
89 stat: Box<dyn PartialGraphStat>,
90}
91
92impl PartialGraphRunningState {
93 fn new(stat: Box<dyn PartialGraphStat>) -> Self {
94 Self {
95 barrier_item_collector: BarrierItemCollector::new(),
96 completing_epoch: None,
97 stat,
98 }
99 }
100
101 fn is_empty(&self) -> bool {
102 self.barrier_item_collector.is_empty() && self.completing_epoch.is_none()
103 }
104
105 fn enqueue(&mut self, node_to_collect: NodeToCollect, mut info: PartialGraphBarrierInfo) {
106 let epoch = info.barrier_info.epoch();
107 assert_ne!(info.barrier_info.kind, BarrierKind::Initial);
108 if info.post_collect_command.should_checkpoint() {
109 assert!(info.barrier_info.kind.is_checkpoint());
110 }
111 info.notifiers.iter_mut().for_each(|n| n.notify_started());
112 self.barrier_item_collector
113 .enqueue(epoch, node_to_collect, info);
114 self.stat.observe_barrier_num(
115 self.barrier_item_collector.inflight_barrier_num(),
116 self.barrier_item_collector.collected_barrier_num(),
117 );
118 }
119
120 fn collect(&mut self, resp: BarrierCompleteResponse) {
121 debug!(
122 epoch = resp.epoch,
123 worker_id = %resp.worker_id,
124 partial_graph_id = %resp.partial_graph_id,
125 "collect barrier from worker"
126 );
127 self.barrier_item_collector
128 .collect(resp.epoch, resp.worker_id, resp);
129 }
130
131 fn barrier_collected<'a>(
132 &mut self,
133 temp_ref: &'a CollectedBarrierTempRef,
134 ) -> Option<CollectedBarrier<'a>> {
135 if let Some((epoch, info)) = self.barrier_item_collector.barrier_collected() {
136 self.stat
137 .observe_barrier_latency(epoch, info.elapsed_secs());
138 self.stat.observe_barrier_num(
139 self.barrier_item_collector.inflight_barrier_num(),
140 self.barrier_item_collector.collected_barrier_num(),
141 );
142 Some(temp_ref.collected_barrier(epoch))
143 } else {
144 None
145 }
146 }
147}
148
149#[derive(Debug)]
150struct ResetPartialGraphCollector {
151 remaining_workers: HashSet<WorkerId>,
152 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
153}
154
155impl ResetPartialGraphCollector {
156 fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) -> bool {
157 assert!(self.remaining_workers.remove(&worker_id));
158 self.reset_resps
159 .try_insert(worker_id, resp)
160 .expect("non-duplicate");
161 self.remaining_workers.is_empty()
162 }
163}
164
165#[derive(Educe)]
166#[educe(Debug)]
167enum PartialGraphStatus {
168 Running(PartialGraphRunningState),
169 Resetting(ResetPartialGraphCollector),
170 Initializing {
171 epoch: EpochPair,
172 node_to_collect: NodeToCollect,
173 #[educe(Debug(ignore))]
174 stat: Option<Box<dyn PartialGraphStat>>,
175 },
176}
177
178impl PartialGraphStatus {
179 fn collect<'a>(
180 &mut self,
181 worker_id: WorkerId,
182 resp: BarrierCompleteResponse,
183 temp_ref: &'a CollectedBarrierTempRef,
184 ) -> Option<PartialGraphEvent<'a>> {
185 assert_eq!(worker_id, resp.worker_id);
186 match self {
187 PartialGraphStatus::Running(state) => {
188 state.collect(resp);
189 state
190 .barrier_collected(temp_ref)
191 .map(PartialGraphEvent::BarrierCollected)
192 }
193 PartialGraphStatus::Resetting(_) => None,
194 PartialGraphStatus::Initializing {
195 epoch,
196 node_to_collect,
197 stat,
198 } => {
199 assert_eq!(epoch.prev, resp.epoch);
200 assert!(node_to_collect.remove(&worker_id));
201 if node_to_collect.is_empty() {
202 *self = PartialGraphStatus::Running(PartialGraphRunningState::new(
203 stat.take().expect("should be taken for once"),
204 ));
205 Some(PartialGraphEvent::Initialized)
206 } else {
207 None
208 }
209 }
210 }
211 }
212}
213
214struct CollectedBarrierTempRef {
220 resps: &'static HashMap<WorkerId, BarrierCompleteResponse>,
221}
222
223impl CollectedBarrierTempRef {
224 fn new() -> Self {
225 static EMPTY: std::sync::LazyLock<HashMap<WorkerId, BarrierCompleteResponse>> =
226 std::sync::LazyLock::new(HashMap::new);
227 CollectedBarrierTempRef { resps: &EMPTY }
228 }
229
230 fn collected_barrier(&self, epoch: EpochPair) -> CollectedBarrier<'_> {
231 CollectedBarrier {
232 epoch,
233 resps: self.resps,
234 }
235 }
236
237 fn correct_lifetime<'a>(
238 &self,
239 event: PartialGraphManagerEvent<'_>,
240 manager: &'a PartialGraphManager,
241 ) -> PartialGraphManagerEvent<'a> {
242 match event {
243 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
244 let event = match event {
245 PartialGraphEvent::BarrierCollected(collected) => {
246 let state = manager.running_graph(partial_graph_id);
247 let (epoch, resps, _) = state
248 .barrier_item_collector
249 .last_collected()
250 .expect("should exist");
251 assert_eq!(epoch, collected.epoch);
252 PartialGraphEvent::BarrierCollected(CollectedBarrier { epoch, resps })
253 }
254 PartialGraphEvent::Reset(resps) => PartialGraphEvent::Reset(resps),
255 PartialGraphEvent::Initialized => PartialGraphEvent::Initialized,
256 PartialGraphEvent::Error(worker_id) => PartialGraphEvent::Error(worker_id),
257 };
258 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event)
259 }
260 PartialGraphManagerEvent::Worker(worker_id, event) => {
261 PartialGraphManagerEvent::Worker(worker_id, event)
262 }
263 }
264 }
265}
266
267#[derive(Debug)]
268pub(super) struct CollectedBarrier<'a> {
269 pub epoch: EpochPair,
270 pub resps: &'a HashMap<WorkerId, BarrierCompleteResponse>,
271}
272
273pub(super) enum PartialGraphEvent<'a> {
274 BarrierCollected(CollectedBarrier<'a>),
275 Reset(HashMap<WorkerId, ResetPartialGraphResponse>),
276 Initialized,
277 Error(WorkerId),
278}
279
280pub(super) enum WorkerEvent {
281 WorkerError {
282 err: MetaError,
283 affected_partial_graphs: HashSet<PartialGraphId>,
284 },
285 WorkerConnected,
286}
287
288fn existing_graphs(
289 graphs: &HashMap<PartialGraphId, PartialGraphStatus>,
290) -> impl Iterator<Item = PartialGraphId> + '_ {
291 graphs
292 .iter()
293 .filter_map(|(partial_graph_id, status)| match status {
294 PartialGraphStatus::Running(_) | PartialGraphStatus::Initializing { .. } => {
295 Some(*partial_graph_id)
296 }
297 PartialGraphStatus::Resetting(_) => None,
298 })
299}
300
301pub(super) struct PartialGraphManager {
302 control_stream_manager: ControlStreamManager,
303 term_id: String,
304 graphs: HashMap<PartialGraphId, PartialGraphStatus>,
305}
306
307impl PartialGraphManager {
308 pub(super) fn uninitialized(env: MetaSrvEnv) -> Self {
309 Self {
310 control_stream_manager: ControlStreamManager::new(env),
311 term_id: "uninitialized".to_owned(),
312 graphs: HashMap::new(),
313 }
314 }
315
316 pub(super) async fn recover(
317 env: MetaSrvEnv,
318 nodes: &HashMap<WorkerId, WorkerNode>,
319 context: Arc<impl GlobalBarrierWorkerContext>,
320 ) -> Self {
321 let term_id = Uuid::new_v4().to_string();
322 let control_stream_manager =
323 ControlStreamManager::recover(env, nodes, &term_id, context).await;
324 Self {
325 control_stream_manager,
326 term_id,
327 graphs: Default::default(),
328 }
329 }
330
331 pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
332 &self.control_stream_manager
333 }
334
335 pub(super) async fn add_worker(
336 &mut self,
337 node: WorkerNode,
338 context: &impl GlobalBarrierWorkerContext,
339 ) {
340 self.control_stream_manager
341 .add_worker(node, existing_graphs(&self.graphs), &self.term_id, context)
342 .await
343 }
344
345 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
346 self.control_stream_manager.remove_worker(node);
347 }
348
349 pub(super) fn clear_worker(&mut self) {
350 self.control_stream_manager.clear();
351 }
352
353 pub(crate) fn notify_all_err(&mut self, err: &MetaError) {
354 for (_, graph) in self.graphs.drain() {
355 if let PartialGraphStatus::Running(graph) = graph {
356 for info in graph.barrier_item_collector.into_infos() {
357 for notifier in info.notifiers {
358 notifier.notify_collection_failed(err.clone());
359 }
360 }
361 }
362 }
363 }
364}
365
366#[must_use]
367pub(super) struct PartialGraphAdder<'a> {
368 partial_graph_id: PartialGraphId,
369 manager: &'a mut PartialGraphManager,
370 consumed: bool,
371}
372
373impl PartialGraphAdder<'_> {
374 pub(super) fn added(mut self) {
375 self.consumed = true;
376 }
377
378 pub(super) fn failed(mut self) {
379 self.manager.reset_partial_graphs([self.partial_graph_id]);
380 self.consumed = true;
381 }
382
383 pub(super) fn manager(&mut self) -> &mut PartialGraphManager {
384 self.manager
385 }
386}
387
388impl Drop for PartialGraphAdder<'_> {
389 fn drop(&mut self) {
390 debug_assert!(self.consumed, "unconsumed graph adder");
391 if !self.consumed {
392 warn!(partial_graph_id = %self.partial_graph_id, "unconsumed graph adder");
393 }
394 }
395}
396
397impl PartialGraphManager {
398 pub(super) fn add_partial_graph(
399 &mut self,
400 partial_graph_id: PartialGraphId,
401 stat: impl PartialGraphStat,
402 ) -> PartialGraphAdder<'_> {
403 self.graphs
404 .try_insert(
405 partial_graph_id,
406 PartialGraphStatus::Running(PartialGraphRunningState::new(Box::new(stat))),
407 )
408 .expect("non-duplicated");
409 self.control_stream_manager
410 .add_partial_graph(partial_graph_id);
411 PartialGraphAdder {
412 partial_graph_id,
413 manager: self,
414 consumed: false,
415 }
416 }
417
418 pub(super) fn remove_partial_graphs(&mut self, partial_graphs: Vec<PartialGraphId>) {
419 for partial_graph_id in &partial_graphs {
420 let graph = self.graphs.remove(partial_graph_id).expect("should exist");
421 let PartialGraphStatus::Running(state) = graph else {
422 panic!("graph to be explicitly removed should be running");
423 };
424 assert!(state.is_empty());
425 }
426 self.control_stream_manager
427 .remove_partial_graphs(partial_graphs);
428 }
429
430 pub(super) fn reset_partial_graphs(
431 &mut self,
432 partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
433 ) {
434 let partial_graph_ids = partial_graph_ids.into_iter().collect_vec();
435 let remaining_workers = self
436 .control_stream_manager
437 .reset_partial_graphs(partial_graph_ids.clone());
438 let new_collector = || ResetPartialGraphCollector {
439 remaining_workers: remaining_workers.clone(),
440 reset_resps: Default::default(),
441 };
442 for partial_graph_id in partial_graph_ids {
443 match self.graphs.entry(partial_graph_id) {
444 Entry::Vacant(entry) => {
445 entry.insert(PartialGraphStatus::Resetting(new_collector()));
446 }
447 Entry::Occupied(mut entry) => {
448 let graph = entry.get_mut();
449 match graph {
450 PartialGraphStatus::Resetting(_) => {
451 unreachable!("should not reset again")
452 }
453 PartialGraphStatus::Running(_)
454 | PartialGraphStatus::Initializing { .. } => {
455 *graph = PartialGraphStatus::Resetting(new_collector());
456 }
457 }
458 }
459 }
460 }
461 }
462
463 pub(super) fn assert_resetting(&self, partial_graph_id: PartialGraphId) {
464 let graph = self.graphs.get(&partial_graph_id).expect("should exist");
465 let PartialGraphStatus::Resetting(..) = graph else {
466 panic!("should be at resetting but at {:?}", graph);
467 };
468 }
469
470 pub(super) fn inject_barrier(
471 &mut self,
472 partial_graph_id: PartialGraphId,
473 mutation: Option<Mutation>,
474 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
475 table_ids_to_sync: impl Iterator<Item = TableId>,
476 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
477 new_actors: Option<StreamJobActorsToCreate>,
478 info: PartialGraphBarrierInfo,
479 ) -> MetaResult<()> {
480 let graph = self
481 .graphs
482 .get_mut(&partial_graph_id)
483 .expect("should exist");
484 let node_to_collect = self.control_stream_manager.inject_barrier(
485 partial_graph_id,
486 mutation,
487 &info.barrier_info,
488 node_actors,
489 table_ids_to_sync,
490 nodes_to_sync_table,
491 new_actors,
492 )?;
493 let PartialGraphStatus::Running(state) = graph else {
494 panic!("should not inject barrier on non-running status: {graph:?}")
495 };
496 state.enqueue(node_to_collect, info);
497 Ok(())
498 }
499
500 fn running_graph(&self, partial_graph_id: PartialGraphId) -> &PartialGraphRunningState {
501 let PartialGraphStatus::Running(graph) = &self.graphs[&partial_graph_id] else {
502 unreachable!("should be running")
503 };
504 graph
505 }
506
507 fn running_graph_mut(
508 &mut self,
509 partial_graph_id: PartialGraphId,
510 ) -> &mut PartialGraphRunningState {
511 let PartialGraphStatus::Running(graph) = self
512 .graphs
513 .get_mut(&partial_graph_id)
514 .expect("should exist")
515 else {
516 unreachable!("should be running")
517 };
518 graph
519 }
520
521 pub(super) fn inflight_barrier_num(&self, partial_graph_id: PartialGraphId) -> usize {
522 self.running_graph(partial_graph_id)
523 .barrier_item_collector
524 .inflight_barrier_num()
525 }
526
527 pub(super) fn first_inflight_barrier(
528 &self,
529 partial_graph_id: PartialGraphId,
530 ) -> Option<EpochPair> {
531 self.running_graph(partial_graph_id)
532 .barrier_item_collector
533 .first_inflight_epoch()
534 }
535
536 pub(super) fn start_completing(
537 &mut self,
538 partial_graph_id: PartialGraphId,
539 epoch_end_bound: Bound<u64>,
540 mut on_non_checkpoint_epoch: impl FnMut(
541 EpochPair,
542 HashMap<WorkerId, BarrierCompleteResponse>,
543 PostCollectCommand,
544 ),
545 ) -> Option<(
546 u64,
547 HashMap<WorkerId, BarrierCompleteResponse>,
548 PartialGraphBarrierInfo,
549 )> {
550 let graph = self.running_graph_mut(partial_graph_id);
551 assert!(graph.completing_epoch.is_none());
552 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
553 while let Some((epoch, resps, info)) = graph
554 .barrier_item_collector
555 .take_collected_if(|epoch| epoch_range.contains(&epoch.prev))
556 {
557 if info.post_collect_command.should_checkpoint() {
558 assert!(info.barrier_info.kind.is_checkpoint());
559 } else if !info.barrier_info.kind.is_checkpoint() {
560 info.notifiers
561 .into_iter()
562 .for_each(Notifier::notify_collected);
563 on_non_checkpoint_epoch(epoch, resps, info.post_collect_command);
564 continue;
565 }
566 let prev_epoch = info.barrier_info.prev_epoch();
567 graph.completing_epoch = Some(prev_epoch);
568 return Some((prev_epoch, resps, info));
569 }
570 None
571 }
572
573 pub(super) fn ack_completed(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) {
574 assert_eq!(
575 self.running_graph_mut(partial_graph_id)
576 .completing_epoch
577 .take(),
578 Some(prev_epoch)
579 );
580 }
581
582 pub(super) fn has_pending_checkpoint_barrier(&self, partial_graph_id: PartialGraphId) -> bool {
583 self.running_graph(partial_graph_id)
584 .barrier_item_collector
585 .iter_infos()
586 .any(|info| info.barrier_info.kind.is_checkpoint())
587 }
588
589 pub(super) fn start_recover(&mut self) -> PartialGraphRecoverer<'_> {
590 PartialGraphRecoverer {
591 added_partial_graphs: Default::default(),
592 manager: self,
593 consumed: false,
594 }
595 }
596}
597
598#[must_use]
599pub(super) struct PartialGraphRecoverer<'a> {
600 added_partial_graphs: HashSet<PartialGraphId>,
601 manager: &'a mut PartialGraphManager,
602 consumed: bool,
603}
604
605impl PartialGraphRecoverer<'_> {
606 pub(super) fn recover_graph(
607 &mut self,
608 partial_graph_id: PartialGraphId,
609 mutation: Mutation,
610 barrier_info: &BarrierInfo,
611 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
612 table_ids_to_sync: impl Iterator<Item = TableId>,
613 new_actors: StreamJobActorsToCreate,
614 stat: impl PartialGraphStat,
615 ) -> MetaResult<()> {
616 assert!(
617 self.added_partial_graphs.insert(partial_graph_id),
618 "duplicated recover graph {partial_graph_id}"
619 );
620 self.manager
621 .control_stream_manager
622 .add_partial_graph(partial_graph_id);
623 assert!(barrier_info.kind.is_initial());
624 let node_to_collect = self.manager.control_stream_manager.inject_barrier(
625 partial_graph_id,
626 Some(mutation),
627 barrier_info,
628 node_actors,
629 table_ids_to_sync,
630 node_actors.keys().copied(),
631 Some(new_actors),
632 )?;
633 self.manager
634 .graphs
635 .try_insert(
636 partial_graph_id,
637 PartialGraphStatus::Initializing {
638 epoch: barrier_info.epoch(),
639 node_to_collect,
640 stat: Some(Box::new(stat)),
641 },
642 )
643 .expect("non-duplicated");
644 Ok(())
645 }
646
647 pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
648 &self.manager.control_stream_manager
649 }
650
651 pub(super) fn all_initializing(mut self) -> HashSet<PartialGraphId> {
652 self.consumed = true;
653 take(&mut self.added_partial_graphs)
654 }
655
656 pub(super) fn failed(mut self) -> HashSet<PartialGraphId> {
657 self.manager
658 .reset_partial_graphs(self.added_partial_graphs.iter().copied());
659 self.consumed = true;
660 take(&mut self.added_partial_graphs)
661 }
662}
663
664impl Drop for PartialGraphRecoverer<'_> {
665 fn drop(&mut self) {
666 debug_assert!(self.consumed, "unconsumed graph recoverer");
667 if !self.consumed {
668 warn!(partial_graph_ids = ?self.added_partial_graphs, "unconsumed graph recoverer");
669 }
670 }
671}
672
673#[must_use]
674pub(super) enum PartialGraphManagerEvent<'a> {
675 PartialGraph(PartialGraphId, PartialGraphEvent<'a>),
676 Worker(WorkerId, WorkerEvent),
677}
678
679impl PartialGraphManager {
680 pub(super) async fn next_event<'a>(
681 &'a mut self,
682 context: &Arc<impl GlobalBarrierWorkerContext>,
683 ) -> PartialGraphManagerEvent<'a> {
684 let temp_ref = CollectedBarrierTempRef::new();
685 let event = self.next_event_inner(context, &temp_ref).await;
686 temp_ref.correct_lifetime(event, self)
687 }
688
689 async fn next_event_inner<'a>(
690 &mut self,
691 context: &Arc<impl GlobalBarrierWorkerContext>,
692 temp_ref: &'a CollectedBarrierTempRef,
693 ) -> PartialGraphManagerEvent<'a> {
694 for (&partial_graph_id, graph) in &mut self.graphs {
695 match graph {
696 PartialGraphStatus::Running(state) => {
697 if let Some(collected) = state.barrier_collected(temp_ref) {
698 return PartialGraphManagerEvent::PartialGraph(
699 partial_graph_id,
700 PartialGraphEvent::BarrierCollected(collected),
701 );
702 }
703 }
704 PartialGraphStatus::Resetting(collector) => {
705 if collector.remaining_workers.is_empty() {
706 let resps = take(&mut collector.reset_resps);
707 self.graphs.remove(&partial_graph_id);
708 return PartialGraphManagerEvent::PartialGraph(
709 partial_graph_id,
710 PartialGraphEvent::Reset(resps),
711 );
712 }
713 }
714 PartialGraphStatus::Initializing {
715 node_to_collect,
716 stat,
717 ..
718 } => {
719 if node_to_collect.is_empty() {
720 *graph = PartialGraphStatus::Running(PartialGraphRunningState::new(
721 stat.take().expect("should be taken once"),
722 ));
723 return PartialGraphManagerEvent::PartialGraph(
724 partial_graph_id,
725 PartialGraphEvent::Initialized,
726 );
727 }
728 }
729 }
730 }
731 loop {
732 let (worker_id, event) = self
733 .control_stream_manager
734 .next_event(&self.term_id, context)
735 .await;
736 match event {
737 WorkerNodeEvent::Response(result) => match result {
738 Ok(resp) => match resp {
739 Response::CompleteBarrier(resp) => {
740 let partial_graph_id = resp.partial_graph_id;
741 if let Some(event) = self
742 .graphs
743 .get_mut(&partial_graph_id)
744 .expect("should exist")
745 .collect(worker_id, resp, temp_ref)
746 {
747 return PartialGraphManagerEvent::PartialGraph(
748 partial_graph_id,
749 event,
750 );
751 }
752 }
753 Response::ReportPartialGraphFailure(resp) => {
754 let partial_graph_id = resp.partial_graph_id;
755 let graph = self
756 .graphs
757 .get_mut(&partial_graph_id)
758 .expect("should exist");
759 match graph {
760 PartialGraphStatus::Resetting(_) => {
761 }
763 PartialGraphStatus::Running(_)
764 | PartialGraphStatus::Initializing { .. } => {
765 return PartialGraphManagerEvent::PartialGraph(
766 partial_graph_id,
767 PartialGraphEvent::Error(worker_id),
768 );
769 }
770 }
771 }
772 Response::ResetPartialGraph(resp) => {
773 let partial_graph_id = resp.partial_graph_id;
774 let graph = self
775 .graphs
776 .get_mut(&partial_graph_id)
777 .expect("should exist");
778 match graph {
779 PartialGraphStatus::Running(_)
780 | PartialGraphStatus::Initializing { .. } => {
781 if cfg!(debug_assertions) {
782 unreachable!(
783 "should not have reset request when not in resetting state"
784 )
785 } else {
786 warn!(
787 ?resp,
788 "ignore reset resp when not in Resetting state"
789 );
790 }
791 }
792 PartialGraphStatus::Resetting(collector) => {
793 if collector.collect(worker_id, resp) {
794 let resps = take(&mut collector.reset_resps);
795 self.graphs.remove(&partial_graph_id);
796 return PartialGraphManagerEvent::PartialGraph(
797 partial_graph_id,
798 PartialGraphEvent::Reset(resps),
799 );
800 }
801 }
802 }
803 }
804 Response::Init(_) | Response::Shutdown(_) => {
805 unreachable!("should be handled in control stream manager")
806 }
807 },
808 Err(error) => {
809 let affected_partial_graphs = self
810 .graphs
811 .iter_mut()
812 .filter_map(|(partial_graph_id, graph)| match graph {
813 PartialGraphStatus::Running(state) => state
814 .barrier_item_collector
815 .iter_to_collect()
816 .any(|to_collect| {
817 !is_valid_after_worker_err(to_collect, worker_id)
818 })
819 .then_some(*partial_graph_id),
820 PartialGraphStatus::Resetting(collector) => {
821 collector.remaining_workers.remove(&worker_id);
822 None
823 }
824 PartialGraphStatus::Initializing {
825 node_to_collect, ..
826 } => (!is_valid_after_worker_err(node_to_collect, worker_id))
827 .then_some(*partial_graph_id),
828 })
829 .collect();
830 return PartialGraphManagerEvent::Worker(
831 worker_id,
832 WorkerEvent::WorkerError {
833 err: error,
834 affected_partial_graphs,
835 },
836 );
837 }
838 },
839 WorkerNodeEvent::Connected(connected) => {
840 connected.initialize(existing_graphs(&self.graphs));
841 return PartialGraphManagerEvent::Worker(
842 worker_id,
843 WorkerEvent::WorkerConnected,
844 );
845 }
846 }
847 }
848 }
849}