1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::mem::take;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::id::{ActorId, PartialGraphId, TableId, WorkerId};
24use risingwave_pb::stream_plan::barrier_mutation::Mutation;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::{
27 ResetPartialGraphResponse, Response,
28};
29use tracing::{debug, warn};
30use uuid::Uuid;
31
32use crate::barrier::context::GlobalBarrierWorkerContext;
33use crate::barrier::info::BarrierInfo;
34use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent};
35use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
36use crate::manager::MetaSrvEnv;
37use crate::model::StreamJobActorsToCreate;
38use crate::{MetaError, MetaResult};
39
40#[derive(Debug)]
41struct PartialGraphInflightBarrier {
42 epoch: EpochPair,
43 node_to_collect: NodeToCollect,
44 resps: HashMap<WorkerId, BarrierCompleteResponse>,
45}
46
47#[derive(Debug)]
48struct PartialGraphRunningState {
49 inflight_barriers: BTreeMap<u64, PartialGraphInflightBarrier>,
51}
52
53impl PartialGraphRunningState {
54 fn new() -> Self {
55 Self {
56 inflight_barriers: Default::default(),
57 }
58 }
59
60 fn enqueue(&mut self, epoch: EpochPair, node_to_collect: NodeToCollect) {
61 if let Some((last_prev_epoch, last_barrier)) = self.inflight_barriers.last_key_value() {
62 assert_eq!(last_barrier.epoch.curr, epoch.prev);
63 assert!(*last_prev_epoch < epoch.prev);
64 }
65 self.inflight_barriers
66 .try_insert(
67 epoch.prev,
68 PartialGraphInflightBarrier {
69 epoch,
70 node_to_collect,
71 resps: Default::default(),
72 },
73 )
74 .expect("non-duplicated");
75 }
76
77 fn collect(&mut self, resp: BarrierCompleteResponse) {
78 debug!(
79 epoch = resp.epoch,
80 worker_id = %resp.worker_id,
81 partial_graph_id = %resp.partial_graph_id,
82 "collect barrier from worker"
83 );
84 let inflight_barrier = self
85 .inflight_barriers
86 .get_mut(&resp.epoch)
87 .expect("should exist");
88 assert!(inflight_barrier.node_to_collect.remove(&resp.worker_id));
89 inflight_barrier
90 .resps
91 .try_insert(resp.worker_id, resp)
92 .expect("non-duplicate");
93 }
94
95 fn barrier_collected(&mut self) -> Option<CollectedBarrier> {
96 if let Some(entry) = self.inflight_barriers.first_entry()
97 && entry.get().node_to_collect.is_empty()
98 {
99 let PartialGraphInflightBarrier { epoch, resps, .. } = entry.remove();
100 Some(CollectedBarrier { epoch, resps })
101 } else {
102 None
103 }
104 }
105}
106
107#[derive(Debug)]
108struct ResetPartialGraphCollector {
109 remaining_workers: HashSet<WorkerId>,
110 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
111}
112
113impl ResetPartialGraphCollector {
114 fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) -> bool {
115 assert!(self.remaining_workers.remove(&worker_id));
116 self.reset_resps
117 .try_insert(worker_id, resp)
118 .expect("non-duplicate");
119 self.remaining_workers.is_empty()
120 }
121}
122
123#[derive(Debug)]
124enum PartialGraphStatus {
125 Running(PartialGraphRunningState),
126 Resetting(ResetPartialGraphCollector),
127 Initializing {
128 epoch: EpochPair,
129 node_to_collect: NodeToCollect,
130 },
131}
132
133impl PartialGraphStatus {
134 fn collect(
135 &mut self,
136 worker_id: WorkerId,
137 resp: BarrierCompleteResponse,
138 ) -> Option<PartialGraphEvent> {
139 assert_eq!(worker_id, resp.worker_id);
140 match self {
141 PartialGraphStatus::Running(state) => {
142 state.collect(resp);
143 state
144 .barrier_collected()
145 .map(PartialGraphEvent::BarrierCollected)
146 }
147 PartialGraphStatus::Resetting(_) => None,
148 PartialGraphStatus::Initializing {
149 epoch,
150 node_to_collect,
151 } => {
152 assert_eq!(epoch.prev, resp.epoch);
153 assert!(node_to_collect.remove(&worker_id));
154 if node_to_collect.is_empty() {
155 *self = PartialGraphStatus::Running(PartialGraphRunningState::new());
156 Some(PartialGraphEvent::Initialized)
157 } else {
158 None
159 }
160 }
161 }
162 }
163}
164
165#[derive(Debug)]
166pub(super) struct CollectedBarrier {
167 pub epoch: EpochPair,
168 pub resps: HashMap<WorkerId, BarrierCompleteResponse>,
169}
170
171pub(super) enum PartialGraphEvent {
172 BarrierCollected(CollectedBarrier),
173 Reset(HashMap<WorkerId, ResetPartialGraphResponse>),
174 Initialized,
175 Error(WorkerId),
176}
177
178pub(super) enum WorkerEvent {
179 WorkerError {
180 err: MetaError,
181 affected_partial_graphs: HashSet<PartialGraphId>,
182 },
183 WorkerConnected,
184}
185
186fn existing_graphs(
187 graphs: &HashMap<PartialGraphId, PartialGraphStatus>,
188) -> impl Iterator<Item = PartialGraphId> + '_ {
189 graphs
190 .iter()
191 .filter_map(|(partial_graph_id, status)| match status {
192 PartialGraphStatus::Running(_) | PartialGraphStatus::Initializing { .. } => {
193 Some(*partial_graph_id)
194 }
195 PartialGraphStatus::Resetting(_) => None,
196 })
197}
198
199pub(super) struct PartialGraphManager {
200 control_stream_manager: ControlStreamManager,
201 term_id: String,
202 graphs: HashMap<PartialGraphId, PartialGraphStatus>,
203}
204
205impl PartialGraphManager {
206 pub(super) fn uninitialized(env: MetaSrvEnv) -> Self {
207 Self {
208 control_stream_manager: ControlStreamManager::new(env),
209 term_id: "uninitialized".to_owned(),
210 graphs: HashMap::new(),
211 }
212 }
213
214 pub(super) async fn recover(
215 env: MetaSrvEnv,
216 nodes: &HashMap<WorkerId, WorkerNode>,
217 context: Arc<impl GlobalBarrierWorkerContext>,
218 ) -> Self {
219 let term_id = Uuid::new_v4().to_string();
220 let control_stream_manager =
221 ControlStreamManager::recover(env, nodes, &term_id, context).await;
222 Self {
223 control_stream_manager,
224 term_id,
225 graphs: Default::default(),
226 }
227 }
228
229 pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
230 &self.control_stream_manager
231 }
232
233 pub(super) async fn add_worker(
234 &mut self,
235 node: WorkerNode,
236 context: &impl GlobalBarrierWorkerContext,
237 ) {
238 self.control_stream_manager
239 .add_worker(node, existing_graphs(&self.graphs), &self.term_id, context)
240 .await
241 }
242
243 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
244 self.control_stream_manager.remove_worker(node);
245 }
246
247 pub(super) fn clear_worker(&mut self) {
248 self.control_stream_manager.clear();
249 }
250}
251
252#[must_use]
253pub(super) struct PartialGraphAdder<'a> {
254 partial_graph_id: PartialGraphId,
255 manager: &'a mut PartialGraphManager,
256 consumed: bool,
257}
258
259impl PartialGraphAdder<'_> {
260 pub(super) fn added(mut self) {
261 self.consumed = true;
262 }
263
264 pub(super) fn failed(mut self) {
265 self.manager.reset_partial_graphs([self.partial_graph_id]);
266 self.consumed = true;
267 }
268
269 pub(super) fn manager(&mut self) -> &mut PartialGraphManager {
270 self.manager
271 }
272}
273
274impl Drop for PartialGraphAdder<'_> {
275 fn drop(&mut self) {
276 debug_assert!(self.consumed, "unconsumed graph adder");
277 if !self.consumed {
278 warn!(partial_graph_id = %self.partial_graph_id, "unconsumed graph adder");
279 }
280 }
281}
282
283impl PartialGraphManager {
284 pub(super) fn add_partial_graph(
285 &mut self,
286 partial_graph_id: PartialGraphId,
287 ) -> PartialGraphAdder<'_> {
288 self.graphs
289 .try_insert(
290 partial_graph_id,
291 PartialGraphStatus::Running(PartialGraphRunningState::new()),
292 )
293 .expect("non-duplicated");
294 self.control_stream_manager
295 .add_partial_graph(partial_graph_id);
296 PartialGraphAdder {
297 partial_graph_id,
298 manager: self,
299 consumed: false,
300 }
301 }
302
303 pub(super) fn remove_partial_graphs(&mut self, partial_graphs: Vec<PartialGraphId>) {
304 for partial_graph_id in &partial_graphs {
305 let graph = self.graphs.remove(partial_graph_id).expect("should exist");
306 let PartialGraphStatus::Running(state) = graph else {
307 panic!("graph to be explicitly removed should be running");
308 };
309 assert!(state.inflight_barriers.is_empty());
310 }
311 self.control_stream_manager
312 .remove_partial_graphs(partial_graphs);
313 }
314
315 pub(super) fn reset_partial_graphs(
316 &mut self,
317 partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
318 ) {
319 let partial_graph_ids = partial_graph_ids.into_iter().collect_vec();
320 let remaining_workers = self
321 .control_stream_manager
322 .reset_partial_graphs(partial_graph_ids.clone());
323 let new_collector = || ResetPartialGraphCollector {
324 remaining_workers: remaining_workers.clone(),
325 reset_resps: Default::default(),
326 };
327 for partial_graph_id in partial_graph_ids {
328 match self.graphs.entry(partial_graph_id) {
329 Entry::Vacant(entry) => {
330 entry.insert(PartialGraphStatus::Resetting(new_collector()));
331 }
332 Entry::Occupied(mut entry) => {
333 let graph = entry.get_mut();
334 match graph {
335 PartialGraphStatus::Resetting(_) => {
336 unreachable!("should not reset again")
337 }
338 PartialGraphStatus::Running(_)
339 | PartialGraphStatus::Initializing { .. } => {
340 *graph = PartialGraphStatus::Resetting(new_collector());
341 }
342 }
343 }
344 }
345 }
346 }
347
348 pub(super) fn assert_resetting(&self, partial_graph_id: PartialGraphId) {
349 let graph = self.graphs.get(&partial_graph_id).expect("should exist");
350 let PartialGraphStatus::Resetting(..) = graph else {
351 panic!("should be at resetting but at {:?}", graph);
352 };
353 }
354
355 pub(super) fn inject_barrier(
356 &mut self,
357 partial_graph_id: PartialGraphId,
358 mutation: Option<Mutation>,
359 barrier_info: &BarrierInfo,
360 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
361 table_ids_to_sync: impl Iterator<Item = TableId>,
362 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
363 new_actors: Option<StreamJobActorsToCreate>,
364 ) -> MetaResult<()> {
365 let graph = self
366 .graphs
367 .get_mut(&partial_graph_id)
368 .expect("should exist");
369 let node_to_collect = self.control_stream_manager.inject_barrier(
370 partial_graph_id,
371 mutation,
372 barrier_info,
373 node_actors,
374 table_ids_to_sync,
375 nodes_to_sync_table,
376 new_actors,
377 )?;
378 let PartialGraphStatus::Running(state) = graph else {
379 panic!("should not inject barrier on non-running status: {graph:?}")
380 };
381 state.enqueue(barrier_info.epoch(), node_to_collect);
382 Ok(())
383 }
384
385 pub(super) fn start_recover(&mut self) -> PartialGraphRecoverer<'_> {
386 PartialGraphRecoverer {
387 added_partial_graphs: Default::default(),
388 manager: self,
389 consumed: false,
390 }
391 }
392}
393
394#[must_use]
395pub(super) struct PartialGraphRecoverer<'a> {
396 added_partial_graphs: HashSet<PartialGraphId>,
397 manager: &'a mut PartialGraphManager,
398 consumed: bool,
399}
400
401impl PartialGraphRecoverer<'_> {
402 pub(super) fn recover_graph(
403 &mut self,
404 partial_graph_id: PartialGraphId,
405 mutation: Mutation,
406 barrier_info: &BarrierInfo,
407 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
408 table_ids_to_sync: impl Iterator<Item = TableId>,
409 new_actors: StreamJobActorsToCreate,
410 ) -> MetaResult<()> {
411 assert!(
412 self.added_partial_graphs.insert(partial_graph_id),
413 "duplicated recover graph {partial_graph_id}"
414 );
415 self.manager
416 .control_stream_manager
417 .add_partial_graph(partial_graph_id);
418 assert!(barrier_info.kind.is_initial());
419 let node_to_collect = self.manager.control_stream_manager.inject_barrier(
420 partial_graph_id,
421 Some(mutation),
422 barrier_info,
423 node_actors,
424 table_ids_to_sync,
425 node_actors.keys().copied(),
426 Some(new_actors),
427 )?;
428 self.manager
429 .graphs
430 .try_insert(
431 partial_graph_id,
432 PartialGraphStatus::Initializing {
433 epoch: barrier_info.epoch(),
434 node_to_collect,
435 },
436 )
437 .expect("non-duplicated");
438 Ok(())
439 }
440
441 pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
442 &self.manager.control_stream_manager
443 }
444
445 pub(super) fn all_initializing(mut self) -> HashSet<PartialGraphId> {
446 self.consumed = true;
447 take(&mut self.added_partial_graphs)
448 }
449
450 pub(super) fn failed(mut self) -> HashSet<PartialGraphId> {
451 self.manager
452 .reset_partial_graphs(self.added_partial_graphs.iter().copied());
453 self.consumed = true;
454 take(&mut self.added_partial_graphs)
455 }
456}
457
458impl Drop for PartialGraphRecoverer<'_> {
459 fn drop(&mut self) {
460 debug_assert!(self.consumed, "unconsumed graph recoverer");
461 if !self.consumed {
462 warn!(partial_graph_ids = ?self.added_partial_graphs, "unconsumed graph recoverer");
463 }
464 }
465}
466
467#[must_use]
468pub(super) enum PartialGraphManagerEvent {
469 PartialGraph(PartialGraphId, PartialGraphEvent),
470 Worker(WorkerId, WorkerEvent),
471}
472
473impl PartialGraphManager {
474 pub(super) async fn next_event(
475 &mut self,
476 context: &Arc<impl GlobalBarrierWorkerContext>,
477 ) -> PartialGraphManagerEvent {
478 for (&partial_graph_id, graph) in &mut self.graphs {
479 match graph {
480 PartialGraphStatus::Running(state) => {
481 if let Some(collected) = state.barrier_collected() {
482 return PartialGraphManagerEvent::PartialGraph(
483 partial_graph_id,
484 PartialGraphEvent::BarrierCollected(collected),
485 );
486 }
487 }
488 PartialGraphStatus::Resetting(collector) => {
489 if collector.remaining_workers.is_empty() {
490 let resps = take(&mut collector.reset_resps);
491 self.graphs.remove(&partial_graph_id);
492 return PartialGraphManagerEvent::PartialGraph(
493 partial_graph_id,
494 PartialGraphEvent::Reset(resps),
495 );
496 }
497 }
498 PartialGraphStatus::Initializing {
499 node_to_collect, ..
500 } => {
501 if node_to_collect.is_empty() {
502 *graph = PartialGraphStatus::Running(PartialGraphRunningState::new());
503 return PartialGraphManagerEvent::PartialGraph(
504 partial_graph_id,
505 PartialGraphEvent::Initialized,
506 );
507 }
508 }
509 }
510 }
511 loop {
512 let (worker_id, event) = self
513 .control_stream_manager
514 .next_event(&self.term_id, context)
515 .await;
516 match event {
517 WorkerNodeEvent::Response(result) => match result {
518 Ok(resp) => match resp {
519 Response::CompleteBarrier(resp) => {
520 let partial_graph_id = resp.partial_graph_id;
521 if let Some(event) = self
522 .graphs
523 .get_mut(&partial_graph_id)
524 .expect("should exist")
525 .collect(worker_id, resp)
526 {
527 return PartialGraphManagerEvent::PartialGraph(
528 partial_graph_id,
529 event,
530 );
531 }
532 }
533 Response::ReportPartialGraphFailure(resp) => {
534 let partial_graph_id = resp.partial_graph_id;
535 let graph = self
536 .graphs
537 .get_mut(&partial_graph_id)
538 .expect("should exist");
539 match graph {
540 PartialGraphStatus::Resetting(_) => {
541 }
543 PartialGraphStatus::Running(_)
544 | PartialGraphStatus::Initializing { .. } => {
545 return PartialGraphManagerEvent::PartialGraph(
546 partial_graph_id,
547 PartialGraphEvent::Error(worker_id),
548 );
549 }
550 }
551 }
552 Response::ResetPartialGraph(resp) => {
553 let partial_graph_id = resp.partial_graph_id;
554 let graph = self
555 .graphs
556 .get_mut(&partial_graph_id)
557 .expect("should exist");
558 match graph {
559 PartialGraphStatus::Running(_)
560 | PartialGraphStatus::Initializing { .. } => {
561 if cfg!(debug_assertions) {
562 unreachable!(
563 "should not have reset request when not in resetting state"
564 )
565 } else {
566 warn!(
567 ?resp,
568 "ignore reset resp when not in Resetting state"
569 );
570 }
571 }
572 PartialGraphStatus::Resetting(collector) => {
573 if collector.collect(worker_id, resp) {
574 let resps = take(&mut collector.reset_resps);
575 self.graphs.remove(&partial_graph_id);
576 return PartialGraphManagerEvent::PartialGraph(
577 partial_graph_id,
578 PartialGraphEvent::Reset(resps),
579 );
580 }
581 }
582 }
583 }
584 Response::Init(_) | Response::Shutdown(_) => {
585 unreachable!("should be handled in control stream manager")
586 }
587 },
588 Err(error) => {
589 let affected_partial_graphs = self
590 .graphs
591 .iter_mut()
592 .filter_map(|(partial_graph_id, graph)| match graph {
593 PartialGraphStatus::Running(state) => state
594 .inflight_barriers
595 .values()
596 .any(|inflight_barrier| {
597 !is_valid_after_worker_err(
598 &inflight_barrier.node_to_collect,
599 worker_id,
600 )
601 })
602 .then_some(*partial_graph_id),
603 PartialGraphStatus::Resetting(collector) => {
604 collector.remaining_workers.remove(&worker_id);
605 None
606 }
607 PartialGraphStatus::Initializing {
608 node_to_collect, ..
609 } => (!is_valid_after_worker_err(node_to_collect, worker_id))
610 .then_some(*partial_graph_id),
611 })
612 .collect();
613 return PartialGraphManagerEvent::Worker(
614 worker_id,
615 WorkerEvent::WorkerError {
616 err: error,
617 affected_partial_graphs,
618 },
619 );
620 }
621 },
622 WorkerNodeEvent::Connected(connected) => {
623 connected.initialize(existing_graphs(&self.graphs));
624 return PartialGraphManagerEvent::Worker(
625 worker_id,
626 WorkerEvent::WorkerConnected,
627 );
628 }
629 }
630 }
631 }
632}