risingwave_meta/barrier/
partial_graph.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::mem::take;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::id::{ActorId, PartialGraphId, TableId, WorkerId};
24use risingwave_pb::stream_plan::barrier_mutation::Mutation;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::{
27    ResetPartialGraphResponse, Response,
28};
29use tracing::{debug, warn};
30use uuid::Uuid;
31
32use crate::barrier::context::GlobalBarrierWorkerContext;
33use crate::barrier::info::BarrierInfo;
34use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent};
35use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
36use crate::manager::MetaSrvEnv;
37use crate::model::StreamJobActorsToCreate;
38use crate::{MetaError, MetaResult};
39
40#[derive(Debug)]
41struct PartialGraphInflightBarrier {
42    epoch: EpochPair,
43    node_to_collect: NodeToCollect,
44    resps: HashMap<WorkerId, BarrierCompleteResponse>,
45}
46
47#[derive(Debug)]
48struct PartialGraphRunningState {
49    /// `prev_epoch` -> barrier
50    inflight_barriers: BTreeMap<u64, PartialGraphInflightBarrier>,
51}
52
53impl PartialGraphRunningState {
54    fn new() -> Self {
55        Self {
56            inflight_barriers: Default::default(),
57        }
58    }
59
60    fn enqueue(&mut self, epoch: EpochPair, node_to_collect: NodeToCollect) {
61        if let Some((last_prev_epoch, last_barrier)) = self.inflight_barriers.last_key_value() {
62            assert_eq!(last_barrier.epoch.curr, epoch.prev);
63            assert!(*last_prev_epoch < epoch.prev);
64        }
65        self.inflight_barriers
66            .try_insert(
67                epoch.prev,
68                PartialGraphInflightBarrier {
69                    epoch,
70                    node_to_collect,
71                    resps: Default::default(),
72                },
73            )
74            .expect("non-duplicated");
75    }
76
77    fn collect(&mut self, resp: BarrierCompleteResponse) {
78        debug!(
79            epoch = resp.epoch,
80            worker_id = %resp.worker_id,
81            partial_graph_id = %resp.partial_graph_id,
82            "collect barrier from worker"
83        );
84        let inflight_barrier = self
85            .inflight_barriers
86            .get_mut(&resp.epoch)
87            .expect("should exist");
88        assert!(inflight_barrier.node_to_collect.remove(&resp.worker_id));
89        inflight_barrier
90            .resps
91            .try_insert(resp.worker_id, resp)
92            .expect("non-duplicate");
93    }
94
95    fn barrier_collected(&mut self) -> Option<CollectedBarrier> {
96        if let Some(entry) = self.inflight_barriers.first_entry()
97            && entry.get().node_to_collect.is_empty()
98        {
99            let PartialGraphInflightBarrier { epoch, resps, .. } = entry.remove();
100            Some(CollectedBarrier { epoch, resps })
101        } else {
102            None
103        }
104    }
105}
106
107#[derive(Debug)]
108struct ResetPartialGraphCollector {
109    remaining_workers: HashSet<WorkerId>,
110    reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
111}
112
113impl ResetPartialGraphCollector {
114    fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) -> bool {
115        assert!(self.remaining_workers.remove(&worker_id));
116        self.reset_resps
117            .try_insert(worker_id, resp)
118            .expect("non-duplicate");
119        self.remaining_workers.is_empty()
120    }
121}
122
123#[derive(Debug)]
124enum PartialGraphStatus {
125    Running(PartialGraphRunningState),
126    Resetting(ResetPartialGraphCollector),
127    Initializing {
128        epoch: EpochPair,
129        node_to_collect: NodeToCollect,
130    },
131}
132
133impl PartialGraphStatus {
134    fn collect(
135        &mut self,
136        worker_id: WorkerId,
137        resp: BarrierCompleteResponse,
138    ) -> Option<PartialGraphEvent> {
139        assert_eq!(worker_id, resp.worker_id);
140        match self {
141            PartialGraphStatus::Running(state) => {
142                state.collect(resp);
143                state
144                    .barrier_collected()
145                    .map(PartialGraphEvent::BarrierCollected)
146            }
147            PartialGraphStatus::Resetting(_) => None,
148            PartialGraphStatus::Initializing {
149                epoch,
150                node_to_collect,
151            } => {
152                assert_eq!(epoch.prev, resp.epoch);
153                assert!(node_to_collect.remove(&worker_id));
154                if node_to_collect.is_empty() {
155                    *self = PartialGraphStatus::Running(PartialGraphRunningState::new());
156                    Some(PartialGraphEvent::Initialized)
157                } else {
158                    None
159                }
160            }
161        }
162    }
163}
164
165#[derive(Debug)]
166pub(super) struct CollectedBarrier {
167    pub epoch: EpochPair,
168    pub resps: HashMap<WorkerId, BarrierCompleteResponse>,
169}
170
171pub(super) enum PartialGraphEvent {
172    BarrierCollected(CollectedBarrier),
173    Reset(HashMap<WorkerId, ResetPartialGraphResponse>),
174    Initialized,
175    Error(WorkerId),
176}
177
178pub(super) enum WorkerEvent {
179    WorkerError {
180        err: MetaError,
181        affected_partial_graphs: HashSet<PartialGraphId>,
182    },
183    WorkerConnected,
184}
185
186fn existing_graphs(
187    graphs: &HashMap<PartialGraphId, PartialGraphStatus>,
188) -> impl Iterator<Item = PartialGraphId> + '_ {
189    graphs
190        .iter()
191        .filter_map(|(partial_graph_id, status)| match status {
192            PartialGraphStatus::Running(_) | PartialGraphStatus::Initializing { .. } => {
193                Some(*partial_graph_id)
194            }
195            PartialGraphStatus::Resetting(_) => None,
196        })
197}
198
199pub(super) struct PartialGraphManager {
200    control_stream_manager: ControlStreamManager,
201    term_id: String,
202    graphs: HashMap<PartialGraphId, PartialGraphStatus>,
203}
204
205impl PartialGraphManager {
206    pub(super) fn uninitialized(env: MetaSrvEnv) -> Self {
207        Self {
208            control_stream_manager: ControlStreamManager::new(env),
209            term_id: "uninitialized".to_owned(),
210            graphs: HashMap::new(),
211        }
212    }
213
214    pub(super) async fn recover(
215        env: MetaSrvEnv,
216        nodes: &HashMap<WorkerId, WorkerNode>,
217        context: Arc<impl GlobalBarrierWorkerContext>,
218    ) -> Self {
219        let term_id = Uuid::new_v4().to_string();
220        let control_stream_manager =
221            ControlStreamManager::recover(env, nodes, &term_id, context).await;
222        Self {
223            control_stream_manager,
224            term_id,
225            graphs: Default::default(),
226        }
227    }
228
229    pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
230        &self.control_stream_manager
231    }
232
233    pub(super) async fn add_worker(
234        &mut self,
235        node: WorkerNode,
236        context: &impl GlobalBarrierWorkerContext,
237    ) {
238        self.control_stream_manager
239            .add_worker(node, existing_graphs(&self.graphs), &self.term_id, context)
240            .await
241    }
242
243    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
244        self.control_stream_manager.remove_worker(node);
245    }
246
247    pub(super) fn clear_worker(&mut self) {
248        self.control_stream_manager.clear();
249    }
250}
251
252#[must_use]
253pub(super) struct PartialGraphAdder<'a> {
254    partial_graph_id: PartialGraphId,
255    manager: &'a mut PartialGraphManager,
256    consumed: bool,
257}
258
259impl PartialGraphAdder<'_> {
260    pub(super) fn added(mut self) {
261        self.consumed = true;
262    }
263
264    pub(super) fn failed(mut self) {
265        self.manager.reset_partial_graphs([self.partial_graph_id]);
266        self.consumed = true;
267    }
268
269    pub(super) fn manager(&mut self) -> &mut PartialGraphManager {
270        self.manager
271    }
272}
273
274impl Drop for PartialGraphAdder<'_> {
275    fn drop(&mut self) {
276        debug_assert!(self.consumed, "unconsumed graph adder");
277        if !self.consumed {
278            warn!(partial_graph_id = %self.partial_graph_id, "unconsumed graph adder");
279        }
280    }
281}
282
283impl PartialGraphManager {
284    pub(super) fn add_partial_graph(
285        &mut self,
286        partial_graph_id: PartialGraphId,
287    ) -> PartialGraphAdder<'_> {
288        self.graphs
289            .try_insert(
290                partial_graph_id,
291                PartialGraphStatus::Running(PartialGraphRunningState::new()),
292            )
293            .expect("non-duplicated");
294        self.control_stream_manager
295            .add_partial_graph(partial_graph_id);
296        PartialGraphAdder {
297            partial_graph_id,
298            manager: self,
299            consumed: false,
300        }
301    }
302
303    pub(super) fn remove_partial_graphs(&mut self, partial_graphs: Vec<PartialGraphId>) {
304        for partial_graph_id in &partial_graphs {
305            let graph = self.graphs.remove(partial_graph_id).expect("should exist");
306            let PartialGraphStatus::Running(state) = graph else {
307                panic!("graph to be explicitly removed should be running");
308            };
309            assert!(state.inflight_barriers.is_empty());
310        }
311        self.control_stream_manager
312            .remove_partial_graphs(partial_graphs);
313    }
314
315    pub(super) fn reset_partial_graphs(
316        &mut self,
317        partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
318    ) {
319        let partial_graph_ids = partial_graph_ids.into_iter().collect_vec();
320        let remaining_workers = self
321            .control_stream_manager
322            .reset_partial_graphs(partial_graph_ids.clone());
323        let new_collector = || ResetPartialGraphCollector {
324            remaining_workers: remaining_workers.clone(),
325            reset_resps: Default::default(),
326        };
327        for partial_graph_id in partial_graph_ids {
328            match self.graphs.entry(partial_graph_id) {
329                Entry::Vacant(entry) => {
330                    entry.insert(PartialGraphStatus::Resetting(new_collector()));
331                }
332                Entry::Occupied(mut entry) => {
333                    let graph = entry.get_mut();
334                    match graph {
335                        PartialGraphStatus::Resetting(_) => {
336                            unreachable!("should not reset again")
337                        }
338                        PartialGraphStatus::Running(_)
339                        | PartialGraphStatus::Initializing { .. } => {
340                            *graph = PartialGraphStatus::Resetting(new_collector());
341                        }
342                    }
343                }
344            }
345        }
346    }
347
348    pub(super) fn assert_resetting(&self, partial_graph_id: PartialGraphId) {
349        let graph = self.graphs.get(&partial_graph_id).expect("should exist");
350        let PartialGraphStatus::Resetting(..) = graph else {
351            panic!("should be at resetting but at {:?}", graph);
352        };
353    }
354
355    pub(super) fn inject_barrier(
356        &mut self,
357        partial_graph_id: PartialGraphId,
358        mutation: Option<Mutation>,
359        barrier_info: &BarrierInfo,
360        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
361        table_ids_to_sync: impl Iterator<Item = TableId>,
362        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
363        new_actors: Option<StreamJobActorsToCreate>,
364    ) -> MetaResult<()> {
365        let graph = self
366            .graphs
367            .get_mut(&partial_graph_id)
368            .expect("should exist");
369        let node_to_collect = self.control_stream_manager.inject_barrier(
370            partial_graph_id,
371            mutation,
372            barrier_info,
373            node_actors,
374            table_ids_to_sync,
375            nodes_to_sync_table,
376            new_actors,
377        )?;
378        let PartialGraphStatus::Running(state) = graph else {
379            panic!("should not inject barrier on non-running status: {graph:?}")
380        };
381        state.enqueue(barrier_info.epoch(), node_to_collect);
382        Ok(())
383    }
384
385    pub(super) fn start_recover(&mut self) -> PartialGraphRecoverer<'_> {
386        PartialGraphRecoverer {
387            added_partial_graphs: Default::default(),
388            manager: self,
389            consumed: false,
390        }
391    }
392}
393
394#[must_use]
395pub(super) struct PartialGraphRecoverer<'a> {
396    added_partial_graphs: HashSet<PartialGraphId>,
397    manager: &'a mut PartialGraphManager,
398    consumed: bool,
399}
400
401impl PartialGraphRecoverer<'_> {
402    pub(super) fn recover_graph(
403        &mut self,
404        partial_graph_id: PartialGraphId,
405        mutation: Mutation,
406        barrier_info: &BarrierInfo,
407        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
408        table_ids_to_sync: impl Iterator<Item = TableId>,
409        new_actors: StreamJobActorsToCreate,
410    ) -> MetaResult<()> {
411        assert!(
412            self.added_partial_graphs.insert(partial_graph_id),
413            "duplicated recover graph {partial_graph_id}"
414        );
415        self.manager
416            .control_stream_manager
417            .add_partial_graph(partial_graph_id);
418        assert!(barrier_info.kind.is_initial());
419        let node_to_collect = self.manager.control_stream_manager.inject_barrier(
420            partial_graph_id,
421            Some(mutation),
422            barrier_info,
423            node_actors,
424            table_ids_to_sync,
425            node_actors.keys().copied(),
426            Some(new_actors),
427        )?;
428        self.manager
429            .graphs
430            .try_insert(
431                partial_graph_id,
432                PartialGraphStatus::Initializing {
433                    epoch: barrier_info.epoch(),
434                    node_to_collect,
435                },
436            )
437            .expect("non-duplicated");
438        Ok(())
439    }
440
441    pub(super) fn control_stream_manager(&self) -> &ControlStreamManager {
442        &self.manager.control_stream_manager
443    }
444
445    pub(super) fn all_initializing(mut self) -> HashSet<PartialGraphId> {
446        self.consumed = true;
447        take(&mut self.added_partial_graphs)
448    }
449
450    pub(super) fn failed(mut self) -> HashSet<PartialGraphId> {
451        self.manager
452            .reset_partial_graphs(self.added_partial_graphs.iter().copied());
453        self.consumed = true;
454        take(&mut self.added_partial_graphs)
455    }
456}
457
458impl Drop for PartialGraphRecoverer<'_> {
459    fn drop(&mut self) {
460        debug_assert!(self.consumed, "unconsumed graph recoverer");
461        if !self.consumed {
462            warn!(partial_graph_ids = ?self.added_partial_graphs, "unconsumed graph recoverer");
463        }
464    }
465}
466
467#[must_use]
468pub(super) enum PartialGraphManagerEvent {
469    PartialGraph(PartialGraphId, PartialGraphEvent),
470    Worker(WorkerId, WorkerEvent),
471}
472
473impl PartialGraphManager {
474    pub(super) async fn next_event(
475        &mut self,
476        context: &Arc<impl GlobalBarrierWorkerContext>,
477    ) -> PartialGraphManagerEvent {
478        for (&partial_graph_id, graph) in &mut self.graphs {
479            match graph {
480                PartialGraphStatus::Running(state) => {
481                    if let Some(collected) = state.barrier_collected() {
482                        return PartialGraphManagerEvent::PartialGraph(
483                            partial_graph_id,
484                            PartialGraphEvent::BarrierCollected(collected),
485                        );
486                    }
487                }
488                PartialGraphStatus::Resetting(collector) => {
489                    if collector.remaining_workers.is_empty() {
490                        let resps = take(&mut collector.reset_resps);
491                        self.graphs.remove(&partial_graph_id);
492                        return PartialGraphManagerEvent::PartialGraph(
493                            partial_graph_id,
494                            PartialGraphEvent::Reset(resps),
495                        );
496                    }
497                }
498                PartialGraphStatus::Initializing {
499                    node_to_collect, ..
500                } => {
501                    if node_to_collect.is_empty() {
502                        *graph = PartialGraphStatus::Running(PartialGraphRunningState::new());
503                        return PartialGraphManagerEvent::PartialGraph(
504                            partial_graph_id,
505                            PartialGraphEvent::Initialized,
506                        );
507                    }
508                }
509            }
510        }
511        loop {
512            let (worker_id, event) = self
513                .control_stream_manager
514                .next_event(&self.term_id, context)
515                .await;
516            match event {
517                WorkerNodeEvent::Response(result) => match result {
518                    Ok(resp) => match resp {
519                        Response::CompleteBarrier(resp) => {
520                            let partial_graph_id = resp.partial_graph_id;
521                            if let Some(event) = self
522                                .graphs
523                                .get_mut(&partial_graph_id)
524                                .expect("should exist")
525                                .collect(worker_id, resp)
526                            {
527                                return PartialGraphManagerEvent::PartialGraph(
528                                    partial_graph_id,
529                                    event,
530                                );
531                            }
532                        }
533                        Response::ReportPartialGraphFailure(resp) => {
534                            let partial_graph_id = resp.partial_graph_id;
535                            let graph = self
536                                .graphs
537                                .get_mut(&partial_graph_id)
538                                .expect("should exist");
539                            match graph {
540                                PartialGraphStatus::Resetting(_) => {
541                                    // ignore reported error when resetting
542                                }
543                                PartialGraphStatus::Running(_)
544                                | PartialGraphStatus::Initializing { .. } => {
545                                    return PartialGraphManagerEvent::PartialGraph(
546                                        partial_graph_id,
547                                        PartialGraphEvent::Error(worker_id),
548                                    );
549                                }
550                            }
551                        }
552                        Response::ResetPartialGraph(resp) => {
553                            let partial_graph_id = resp.partial_graph_id;
554                            let graph = self
555                                .graphs
556                                .get_mut(&partial_graph_id)
557                                .expect("should exist");
558                            match graph {
559                                PartialGraphStatus::Running(_)
560                                | PartialGraphStatus::Initializing { .. } => {
561                                    if cfg!(debug_assertions) {
562                                        unreachable!(
563                                            "should not have reset request when not in resetting state"
564                                        )
565                                    } else {
566                                        warn!(
567                                            ?resp,
568                                            "ignore reset resp when not in Resetting state"
569                                        );
570                                    }
571                                }
572                                PartialGraphStatus::Resetting(collector) => {
573                                    if collector.collect(worker_id, resp) {
574                                        let resps = take(&mut collector.reset_resps);
575                                        self.graphs.remove(&partial_graph_id);
576                                        return PartialGraphManagerEvent::PartialGraph(
577                                            partial_graph_id,
578                                            PartialGraphEvent::Reset(resps),
579                                        );
580                                    }
581                                }
582                            }
583                        }
584                        Response::Init(_) | Response::Shutdown(_) => {
585                            unreachable!("should be handled in control stream manager")
586                        }
587                    },
588                    Err(error) => {
589                        let affected_partial_graphs = self
590                            .graphs
591                            .iter_mut()
592                            .filter_map(|(partial_graph_id, graph)| match graph {
593                                PartialGraphStatus::Running(state) => state
594                                    .inflight_barriers
595                                    .values()
596                                    .any(|inflight_barrier| {
597                                        !is_valid_after_worker_err(
598                                            &inflight_barrier.node_to_collect,
599                                            worker_id,
600                                        )
601                                    })
602                                    .then_some(*partial_graph_id),
603                                PartialGraphStatus::Resetting(collector) => {
604                                    collector.remaining_workers.remove(&worker_id);
605                                    None
606                                }
607                                PartialGraphStatus::Initializing {
608                                    node_to_collect, ..
609                                } => (!is_valid_after_worker_err(node_to_collect, worker_id))
610                                    .then_some(*partial_graph_id),
611                            })
612                            .collect();
613                        return PartialGraphManagerEvent::Worker(
614                            worker_id,
615                            WorkerEvent::WorkerError {
616                                err: error,
617                                affected_partial_graphs,
618                            },
619                        );
620                    }
621                },
622                WorkerNodeEvent::Connected(connected) => {
623                    connected.initialize(existing_graphs(&self.graphs));
624                    return PartialGraphManagerEvent::Worker(
625                        worker_id,
626                        WorkerEvent::WorkerConnected,
627                    );
628                }
629            }
630        }
631    }
632}