risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54    streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69};
70use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
71use crate::barrier::edge_builder::FragmentEdgeBuilder;
72use crate::barrier::info::{
73    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
74    SubscriberType,
75};
76use crate::barrier::partial_graph::PartialGraphRecoverer;
77use crate::barrier::progress::CreateMviewProgressTracker;
78use crate::barrier::utils::NodeToCollect;
79use crate::controller::fragment::InflightFragmentInfo;
80use crate::controller::utils::StreamingJobExtraInfo;
81use crate::manager::MetaSrvEnv;
82use crate::model::{
83    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
84    SubscriptionId,
85};
86use crate::stream::cdc::{
87    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
88};
89use crate::stream::{
90    ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
91    build_actor_connector_splits,
92};
93use crate::{MetaError, MetaResult};
94
95pub(super) fn to_partial_graph_id(
96    database_id: DatabaseId,
97    creating_job_id: Option<JobId>,
98) -> PartialGraphId {
99    let raw_job_id = creating_job_id
100        .map(|job_id| {
101            assert_ne!(job_id, u32::MAX);
102            job_id.as_raw_id()
103        })
104        .unwrap_or(u32::MAX);
105    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
106}
107
108pub(super) fn from_partial_graph_id(
109    partial_graph_id: PartialGraphId,
110) -> (DatabaseId, Option<JobId>) {
111    let id = partial_graph_id.as_raw_id();
112    let database_id = (id >> 32) as u32;
113    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
114    let creating_job_id = if raw_creating_job_id == u32::MAX {
115        None
116    } else {
117        Some(JobId::new(raw_creating_job_id))
118    };
119    (database_id.into(), creating_job_id)
120}
121
122pub(super) fn build_locality_fragment_state_table_mapping(
123    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
124) -> HashMap<FragmentId, Vec<TableId>> {
125    let mut mapping = HashMap::new();
126
127    for (fragment_id, fragment_info) in fragment_infos {
128        let mut state_table_ids = Vec::new();
129        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
130            if let Some(NodeBody::LocalityProvider(locality_provider)) =
131                stream_node.node_body.as_ref()
132            {
133                let state_table_id = locality_provider
134                    .state_table
135                    .as_ref()
136                    .expect("must have state table")
137                    .id;
138                state_table_ids.push(state_table_id);
139                false
140            } else {
141                true
142            }
143        });
144        if !state_table_ids.is_empty() {
145            mapping.insert(*fragment_id, state_table_ids);
146        }
147    }
148
149    mapping
150}
151
152pub(super) fn database_partial_graphs<'a>(
153    database_id: DatabaseId,
154    creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
155) -> impl Iterator<Item = PartialGraphId> + 'a {
156    creating_jobs
157        .map(Some)
158        .chain([None])
159        .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
160}
161
162struct ControlStreamNode {
163    worker_id: WorkerId,
164    host: HostAddress,
165    handle: StreamingControlHandle,
166}
167
168enum WorkerNodeState {
169    Connected {
170        control_stream: ControlStreamNode,
171        removed: bool,
172    },
173    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
174}
175
176pub(super) struct ControlStreamManager {
177    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
178    pub env: MetaSrvEnv,
179}
180
181impl ControlStreamManager {
182    pub(super) fn new(env: MetaSrvEnv) -> Self {
183        Self {
184            workers: Default::default(),
185            env,
186        }
187    }
188
189    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
190        self.workers[&worker_id].0.host.clone().unwrap()
191    }
192
193    pub(super) async fn add_worker(
194        &mut self,
195        node: WorkerNode,
196        partial_graphs: impl Iterator<Item = PartialGraphId>,
197        term_id: &String,
198        context: &impl GlobalBarrierWorkerContext,
199    ) {
200        let node_id = node.id;
201        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
202            let (existing_node, worker_state) = entry.get();
203            assert_eq!(existing_node.host, node.host);
204            warn!(id = %node.id, host = ?node.host, "node already exists");
205            match worker_state {
206                WorkerNodeState::Connected { .. } => {
207                    warn!(id = %node.id, host = ?node.host, "new node already connected");
208                    return;
209                }
210                WorkerNodeState::Reconnecting(_) => {
211                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
212                    entry.remove();
213                }
214            }
215        }
216        let node_host = node.host.clone().unwrap();
217        let mut backoff = ExponentialBackoff::from_millis(100)
218            .max_delay(Duration::from_secs(3))
219            .factor(5);
220        const MAX_RETRY: usize = 5;
221        for i in 1..=MAX_RETRY {
222            match context
223                .new_control_stream(
224                    &node,
225                    &PbInitRequest {
226                        term_id: term_id.clone(),
227                    },
228                )
229                .await
230            {
231                Ok(mut handle) => {
232                    WorkerNodeConnected {
233                        handle: &mut handle,
234                        node: &node,
235                    }
236                    .initialize(partial_graphs);
237                    info!(?node_host, "add control stream worker");
238                    assert!(
239                        self.workers
240                            .insert(
241                                node_id,
242                                (
243                                    node,
244                                    WorkerNodeState::Connected {
245                                        control_stream: ControlStreamNode {
246                                            worker_id: node_id as _,
247                                            host: node_host,
248                                            handle,
249                                        },
250                                        removed: false
251                                    }
252                                )
253                            )
254                            .is_none()
255                    );
256                    return;
257                }
258                Err(e) => {
259                    // It may happen that the dns information of newly registered worker node
260                    // has not been propagated to the meta node and cause error. Wait for a while and retry
261                    let delay = backoff.next().unwrap();
262                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
263                    sleep(delay).await;
264                }
265            }
266        }
267        error!(?node_host, "fail to create worker node after retry");
268    }
269
270    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
271        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
272            let (_, worker_state) = entry.get_mut();
273            match worker_state {
274                WorkerNodeState::Connected { removed, .. } => {
275                    info!(worker_id = %node.id, "mark connected worker as removed");
276                    *removed = true;
277                }
278                WorkerNodeState::Reconnecting(_) => {
279                    info!(worker_id = %node.id, "remove worker");
280                    entry.remove();
281                }
282            }
283        }
284    }
285
286    fn retry_connect(
287        node: WorkerNode,
288        term_id: String,
289        context: Arc<impl GlobalBarrierWorkerContext>,
290    ) -> BoxFuture<'static, StreamingControlHandle> {
291        async move {
292            let mut attempt = 0;
293            let backoff = ExponentialBackoff::from_millis(100)
294                .max_delay(Duration::from_mins(1))
295                .factor(5);
296            let init_request = PbInitRequest { term_id };
297            for delay in backoff {
298                attempt += 1;
299                sleep(delay).await;
300                match context.new_control_stream(&node, &init_request).await {
301                    Ok(handle) => {
302                        return handle;
303                    }
304                    Err(e) => {
305                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
306                    }
307                }
308            }
309            unreachable!("end of retry backoff")
310        }.boxed()
311    }
312
313    pub(super) async fn recover(
314        env: MetaSrvEnv,
315        nodes: &HashMap<WorkerId, WorkerNode>,
316        term_id: &str,
317        context: Arc<impl GlobalBarrierWorkerContext>,
318    ) -> Self {
319        let reset_start_time = Instant::now();
320        let init_request = PbInitRequest {
321            term_id: term_id.to_owned(),
322        };
323        let init_request = &init_request;
324        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
325            let result = context.new_control_stream(node, init_request).await;
326            (*worker_id, node.clone(), result)
327        }))
328        .await;
329        let mut unconnected_workers = HashSet::new();
330        let mut workers = HashMap::new();
331        for (worker_id, node, result) in nodes {
332            match result {
333                Ok(handle) => {
334                    let control_stream = ControlStreamNode {
335                        worker_id: node.id,
336                        host: node.host.clone().unwrap(),
337                        handle,
338                    };
339                    assert!(
340                        workers
341                            .insert(
342                                worker_id,
343                                (
344                                    node,
345                                    WorkerNodeState::Connected {
346                                        control_stream,
347                                        removed: false
348                                    }
349                                )
350                            )
351                            .is_none()
352                    );
353                }
354                Err(e) => {
355                    unconnected_workers.insert(worker_id);
356                    warn!(
357                        e = %e.as_report(),
358                        %worker_id,
359                        ?node,
360                        "failed to connect to node"
361                    );
362                    assert!(
363                        workers
364                            .insert(
365                                worker_id,
366                                (
367                                    node.clone(),
368                                    WorkerNodeState::Reconnecting(Self::retry_connect(
369                                        node,
370                                        term_id.to_owned(),
371                                        context.clone()
372                                    ))
373                                )
374                            )
375                            .is_none()
376                    );
377                }
378            }
379        }
380
381        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
382
383        Self { workers, env }
384    }
385
386    /// Clear all nodes and response streams in the manager.
387    pub(super) fn clear(&mut self) {
388        *self = Self::new(self.env.clone());
389    }
390}
391
392pub(super) struct WorkerNodeConnected<'a> {
393    node: &'a WorkerNode,
394    handle: &'a mut StreamingControlHandle,
395}
396
397impl<'a> WorkerNodeConnected<'a> {
398    pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
399        for partial_graph_id in partial_graphs {
400            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
401                request: Some(
402                    streaming_control_stream_request::Request::CreatePartialGraph(
403                        PbCreatePartialGraphRequest { partial_graph_id },
404                    ),
405                ),
406            }) {
407                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
408            }
409        }
410    }
411}
412
413pub(super) enum WorkerNodeEvent<'a> {
414    Response(MetaResult<streaming_control_stream_response::Response>),
415    Connected(WorkerNodeConnected<'a>),
416}
417
418impl ControlStreamManager {
419    fn poll_next_event<'a>(
420        this_opt: &mut Option<&'a mut Self>,
421        cx: &mut Context<'_>,
422        term_id: &str,
423        context: &Arc<impl GlobalBarrierWorkerContext>,
424        poll_reconnect: bool,
425    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
426        let this = this_opt.as_mut().expect("Future polled after completion");
427        if this.workers.is_empty() {
428            return Poll::Pending;
429        }
430        {
431            for (&worker_id, (node, worker_state)) in &mut this.workers {
432                let control_stream = match worker_state {
433                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
434                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
435                        continue;
436                    }
437                    WorkerNodeState::Reconnecting(join_handle) => {
438                        match join_handle.poll_unpin(cx) {
439                            Poll::Ready(handle) => {
440                                info!(id=%node.id, host=?node.host, "reconnected to worker");
441                                *worker_state = WorkerNodeState::Connected {
442                                    control_stream: ControlStreamNode {
443                                        worker_id: node.id,
444                                        host: node.host.clone().unwrap(),
445                                        handle,
446                                    },
447                                    removed: false,
448                                };
449                                let this = this_opt.take().expect("should exist");
450                                let (node, worker_state) =
451                                    this.workers.get_mut(&worker_id).expect("should exist");
452                                let WorkerNodeState::Connected { control_stream, .. } =
453                                    worker_state
454                                else {
455                                    unreachable!()
456                                };
457                                return Poll::Ready((
458                                    worker_id,
459                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
460                                        node,
461                                        handle: &mut control_stream.handle,
462                                    }),
463                                ));
464                            }
465                            Poll::Pending => {
466                                continue;
467                            }
468                        }
469                    }
470                };
471                match control_stream.handle.response_stream.poll_next_unpin(cx) {
472                    Poll::Ready(result) => {
473                        {
474                            let result = result
475                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
476                                .and_then(|result| {
477                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
478                                        match resp
479                                            .response
480                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
481                                        {
482                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
483                                                "worker node {worker_id} is shutting down"
484                                            )
485                                                .into())),
486                                            streaming_control_stream_response::Response::Init(_) => {
487                                                // This arm should be unreachable.
488                                                Err((false, anyhow!("get unexpected init response").into()))
489                                            }
490                                            resp => {
491                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
492                                                    assert_eq!(worker_id, barrier_resp.worker_id);
493                                                }
494                                                Ok(resp)
495                                            }
496                                        }
497                                    })
498                                });
499                            let result = match result {
500                                Ok(resp) => Ok(resp),
501                                Err((shutdown, err)) => {
502                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
503                                    let WorkerNodeState::Connected { removed, .. } = worker_state
504                                    else {
505                                        unreachable!("checked connected")
506                                    };
507                                    if *removed || shutdown {
508                                        this.workers.remove(&worker_id);
509                                    } else {
510                                        *worker_state = WorkerNodeState::Reconnecting(
511                                            ControlStreamManager::retry_connect(
512                                                node.clone(),
513                                                term_id.to_owned(),
514                                                context.clone(),
515                                            ),
516                                        );
517                                    }
518                                    Err(err)
519                                }
520                            };
521                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
522                        }
523                    }
524                    Poll::Pending => {
525                        continue;
526                    }
527                }
528            }
529        };
530
531        Poll::Pending
532    }
533
534    #[await_tree::instrument("control_stream_next_event")]
535    pub(super) async fn next_event<'a>(
536        &'a mut self,
537        term_id: &str,
538        context: &Arc<impl GlobalBarrierWorkerContext>,
539    ) -> (WorkerId, WorkerNodeEvent<'a>) {
540        let mut this = Some(self);
541        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
542    }
543
544    #[await_tree::instrument("control_stream_next_response")]
545    pub(super) async fn next_response(
546        &mut self,
547        term_id: &str,
548        context: &Arc<impl GlobalBarrierWorkerContext>,
549    ) -> (
550        WorkerId,
551        MetaResult<streaming_control_stream_response::Response>,
552    ) {
553        let mut this = Some(self);
554        let (worker_id, event) =
555            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
556        match event {
557            WorkerNodeEvent::Response(result) => (worker_id, result),
558            WorkerNodeEvent::Connected(_) => {
559                unreachable!("set poll_reconnect=false")
560            }
561        }
562    }
563}
564
565pub(super) struct DatabaseInitialBarrierCollector {
566    pub(super) database_id: DatabaseId,
567    pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
568    pub(super) database: DatabaseCheckpointControl,
569}
570
571impl Debug for DatabaseInitialBarrierCollector {
572    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
573        f.debug_struct("DatabaseInitialBarrierCollector")
574            .field("database_id", &self.database_id)
575            .field("initializing_graphs", &self.initializing_partial_graphs)
576            .finish()
577    }
578}
579
580impl DatabaseInitialBarrierCollector {
581    pub(super) fn is_collected(&self) -> bool {
582        self.initializing_partial_graphs.is_empty()
583    }
584
585    pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
586        assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
587    }
588
589    pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
590        database_partial_graphs(
591            self.database_id,
592            self.database
593                .creating_streaming_job_controls
594                .keys()
595                .copied(),
596        )
597    }
598
599    pub(super) fn finish(self) -> DatabaseCheckpointControl {
600        assert!(self.is_collected());
601        self.database
602    }
603
604    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
605        self.database.is_valid_after_worker_err(worker_id)
606    }
607}
608
609impl PartialGraphRecoverer<'_> {
610    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
611    #[expect(clippy::too_many_arguments)]
612    pub(super) fn inject_database_initial_barrier(
613        &mut self,
614        database_id: DatabaseId,
615        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
616        job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
617        state_table_committed_epochs: &mut HashMap<TableId, u64>,
618        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
619        fragment_relations: &FragmentDownstreamRelation,
620        stream_actors: &HashMap<ActorId, StreamActor>,
621        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
622        background_jobs: &mut HashSet<JobId>,
623        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
624        is_paused: bool,
625        hummock_version_stats: &HummockVersionStats,
626        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
627    ) -> MetaResult<DatabaseCheckpointControl> {
628        fn collect_source_splits(
629            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
630            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
631        ) -> HashMap<ActorId, Vec<SplitImpl>> {
632            fragment_infos
633                .flat_map(|info| info.actors.keys())
634                .filter_map(|actor_id| {
635                    let actor_id = *actor_id as ActorId;
636                    source_splits
637                        .remove(&actor_id)
638                        .map(|splits| (actor_id, splits))
639                })
640                .collect()
641        }
642        fn build_mutation(
643            splits: &HashMap<ActorId, Vec<SplitImpl>>,
644            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
645            backfill_orders: &ExtendedFragmentBackfillOrder,
646            is_paused: bool,
647        ) -> Mutation {
648            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
649                .into_iter()
650                .collect();
651            Mutation::Add(AddMutation {
652                // Actors built during recovery is not treated as newly added actors.
653                actor_dispatchers: Default::default(),
654                added_actors: Default::default(),
655                actor_splits: build_actor_connector_splits(splits),
656                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
657                    splits: cdc_table_snapshot_split_assignment,
658                }),
659                pause: is_paused,
660                subscriptions_to_add: Default::default(),
661                backfill_nodes_to_pause,
662                new_upstream_sinks: Default::default(),
663            })
664        }
665
666        fn resolve_jobs_committed_epoch<'a>(
667            state_table_committed_epochs: &mut HashMap<TableId, u64>,
668            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
669        ) -> u64 {
670            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
671                (
672                    table_id,
673                    state_table_committed_epochs
674                        .remove(&table_id)
675                        .expect("should exist"),
676                )
677            });
678            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
679            for (table_id, epoch) in epochs {
680                assert_eq!(
681                    prev_epoch, epoch,
682                    "{} has different committed epoch to {}",
683                    first_table_id, table_id
684                );
685            }
686            prev_epoch
687        }
688        fn job_backfill_orders(
689            job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
690            job_id: JobId,
691        ) -> UserDefinedFragmentBackfillOrder {
692            UserDefinedFragmentBackfillOrder::new(
693                job_extra_info
694                    .get(&job_id)
695                    .and_then(|info| info.backfill_orders.clone())
696                    .map_or_else(HashMap::new, |orders| orders.0),
697            )
698        }
699
700        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
701            .keys()
702            .filter_map(|job_id| {
703                mv_depended_subscriptions
704                    .remove(&job_id.as_mv_table_id())
705                    .map(|subscriptions| {
706                        (
707                            job_id.as_mv_table_id(),
708                            subscriptions
709                                .into_iter()
710                                .map(|(subscription_id, retention)| {
711                                    (
712                                        subscription_id.as_subscriber_id(),
713                                        SubscriberType::Subscription(retention),
714                                    )
715                                })
716                                .collect(),
717                        )
718                    })
719            })
720            .collect();
721
722        let mut database_jobs = HashMap::new();
723        let mut snapshot_backfill_jobs = HashMap::new();
724
725        for (job_id, job_fragments) in jobs {
726            if background_jobs.remove(&job_id) {
727                if job_fragments.values().any(|fragment| {
728                    fragment
729                        .fragment_type_mask
730                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
731                }) {
732                    debug!(%job_id, "recovered snapshot backfill job");
733                    snapshot_backfill_jobs.insert(job_id, job_fragments);
734                } else {
735                    database_jobs.insert(job_id, (job_fragments, true));
736                }
737            } else {
738                database_jobs.insert(job_id, (job_fragments, false));
739            }
740        }
741
742        let database_job_log_epochs: HashMap<_, _> = database_jobs
743            .keys()
744            .filter_map(|job_id| {
745                state_table_log_epochs
746                    .remove(&job_id.as_mv_table_id())
747                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
748            })
749            .collect();
750
751        let prev_epoch = resolve_jobs_committed_epoch(
752            state_table_committed_epochs,
753            database_jobs.values().flat_map(|(job, _)| job.values()),
754        );
755        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
756        // Use a different `curr_epoch` for each recovery attempt.
757        let curr_epoch = prev_epoch.next();
758        let barrier_info = BarrierInfo {
759            prev_epoch,
760            curr_epoch,
761            kind: BarrierKind::Initial,
762        };
763
764        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
765        for (job_id, fragment_infos) in snapshot_backfill_jobs {
766            let committed_epoch =
767                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
768            if committed_epoch == barrier_info.prev_epoch() {
769                info!(
770                    "recovered creating snapshot backfill job {} catch up with upstream already",
771                    job_id
772                );
773                database_jobs
774                    .try_insert(job_id, (fragment_infos, true))
775                    .expect("non-duplicate");
776                continue;
777            }
778            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
779                fragment_infos
780                    .values()
781                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
782            )?
783            .0
784            .ok_or_else(|| {
785                anyhow!(
786                    "recovered snapshot backfill job {} has no snapshot backfill info",
787                    job_id
788                )
789            })?;
790            let mut snapshot_epoch = None;
791            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
792                .upstream_mv_table_id_to_backfill_epoch
793                .keys()
794                .cloned()
795                .collect();
796            for (upstream_table_id, epoch) in
797                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
798            {
799                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
800                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
801                if *snapshot_epoch != epoch {
802                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
803                }
804            }
805            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
806                anyhow!(
807                    "snapshot backfill job {} has not set snapshot epoch",
808                    job_id
809                )
810            })?;
811            for upstream_table_id in &upstream_table_ids {
812                subscribers
813                    .entry(*upstream_table_id)
814                    .or_default()
815                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
816                    .expect("non-duplicate");
817            }
818            ongoing_snapshot_backfill_jobs
819                .try_insert(
820                    job_id,
821                    (
822                        fragment_infos,
823                        upstream_table_ids,
824                        committed_epoch,
825                        snapshot_epoch,
826                    ),
827                )
828                .expect("non-duplicated");
829        }
830
831        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
832            HashMap::new();
833
834        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
835            database_jobs
836                .into_iter()
837                .map(|(job_id, (fragment_infos, is_background_creating))| {
838                    let status = if is_background_creating {
839                        let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
840                        let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
841                            backfill_ordering,
842                            fragment_relations,
843                            || fragment_infos.iter().map(|(fragment_id, fragment)| {
844                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
845                        }));
846                        let locality_fragment_state_table_mapping =
847                            build_locality_fragment_state_table_mapping(&fragment_infos);
848                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
849                            &backfill_ordering,
850                            &fragment_infos,
851                            locality_fragment_state_table_mapping,
852                        );
853                        CreateStreamingJobStatus::Creating {
854                            tracker: CreateMviewProgressTracker::recover(
855                                job_id,
856                                &fragment_infos,
857                                backfill_order_state,
858                                hummock_version_stats,
859                            ),
860                        }
861                    } else {
862                        CreateStreamingJobStatus::Created
863                    };
864                    let cdc_table_backfill_tracker =
865                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
866                            let cdc_fragment = fragment_infos
867                                .values()
868                                .find(|fragment| {
869                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
870                                        fragment.fragment_type_mask,
871                                        &fragment.nodes,
872                                    )
873                                    .is_some()
874                                })
875                                .expect("should have parallel cdc fragment");
876                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
877                            let mut tracker =
878                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
879                            cdc_table_snapshot_split_assignment
880                                .extend(tracker.reassign_splits(cdc_actors)?);
881                            Some(tracker)
882                        } else {
883                            None
884                        };
885                    Ok((
886                        job_id,
887                        InflightStreamingJobInfo {
888                            job_id,
889                            fragment_infos,
890                            subscribers: subscribers
891                                .remove(&job_id.as_mv_table_id())
892                                .unwrap_or_default(),
893                            status,
894                            cdc_table_backfill_tracker,
895                        },
896                    ))
897                })
898                .try_collect::<_, _, MetaError>()
899        }?;
900
901        let mut builder = FragmentEdgeBuilder::new(
902            database_jobs
903                .values()
904                .flat_map(|job| {
905                    job.fragment_infos()
906                        .map(|info| (info, to_partial_graph_id(database_id, None)))
907                })
908                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
909                    |(job_id, (fragments, ..))| {
910                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
911                        fragments
912                            .values()
913                            .map(move |fragment| (fragment, partial_graph_id))
914                    },
915                )),
916            self.control_stream_manager(),
917        );
918        builder.add_relations(fragment_relations);
919        let mut edges = builder.build();
920
921        {
922            let new_actors =
923                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
924                    job.fragment_infos.values().map(move |fragment_infos| {
925                        (
926                            fragment_infos.fragment_id,
927                            &fragment_infos.nodes,
928                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
929                                (
930                                    stream_actors.get(actor_id).expect("should exist"),
931                                    actor.worker_id,
932                                )
933                            }),
934                            job.subscribers.keys().copied(),
935                        )
936                    })
937                }));
938
939            let nodes_actors =
940                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
941            let database_job_source_splits =
942                collect_source_splits(database_jobs.values().flatten(), source_splits);
943            let database_backfill_orders =
944                UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
945                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
946                        job_backfill_orders(job_extra_info, job.job_id)
947                    } else {
948                        UserDefinedFragmentBackfillOrder::default()
949                    }
950                }));
951            let database_backfill_orders =
952                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
953                    database_backfill_orders,
954                    fragment_relations,
955                    || {
956                        database_jobs.values().flat_map(|job_fragments| {
957                            job_fragments
958                                .fragment_infos
959                                .iter()
960                                .map(|(fragment_id, fragment)| {
961                                    (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
962                                })
963                        })
964                    },
965                );
966            let mutation = build_mutation(
967                &database_job_source_splits,
968                cdc_table_snapshot_split_assignment,
969                &database_backfill_orders,
970                is_paused,
971            );
972
973            let partial_graph_id = to_partial_graph_id(database_id, None);
974            self.recover_graph(
975                partial_graph_id,
976                mutation,
977                &barrier_info,
978                &nodes_actors,
979                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
980                new_actors,
981            )?;
982            debug!(
983                %database_id,
984                "inject initial barrier"
985            );
986        };
987
988        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
989            HashMap::new();
990        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
991            ongoing_snapshot_backfill_jobs
992        {
993            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
994                (
995                    fragment_infos.fragment_id,
996                    &fragment_infos.nodes,
997                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
998                        (
999                            stream_actors.get(actor_id).expect("should exist"),
1000                            actor.worker_id,
1001                        )
1002                    }),
1003                    vec![], // no subscribers for backfilling jobs,
1004                )
1005            }));
1006
1007            let database_job_source_splits =
1008                collect_source_splits(database_jobs.values().flatten(), source_splits);
1009            assert!(
1010                !cdc_table_snapshot_splits.contains_key(&job_id),
1011                "snapshot backfill job {job_id} should not have cdc backfill"
1012            );
1013            if is_paused {
1014                bail!("should not pause when having snapshot backfill job {job_id}");
1015            }
1016            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1017            let job_backfill_orders =
1018                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1019                    job_backfill_orders,
1020                    fragment_relations,
1021                    || {
1022                        info.iter().map(|(fragment_id, fragment)| {
1023                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1024                        })
1025                    },
1026                );
1027            let mutation = build_mutation(
1028                &database_job_source_splits,
1029                Default::default(), // no cdc backfill job for
1030                &job_backfill_orders,
1031                false,
1032            );
1033
1034            let job = CreatingStreamingJobControl::recover(
1035                database_id,
1036                job_id,
1037                upstream_table_ids,
1038                &database_job_log_epochs,
1039                snapshot_epoch,
1040                committed_epoch,
1041                &barrier_info,
1042                info,
1043                job_backfill_orders,
1044                fragment_relations,
1045                hummock_version_stats,
1046                node_actors,
1047                mutation.clone(),
1048                self,
1049            )?;
1050            creating_streaming_job_controls.insert(job_id, job);
1051        }
1052
1053        self.control_stream_manager()
1054            .env
1055            .shared_actor_infos()
1056            .recover_database(
1057                database_id,
1058                database_jobs
1059                    .values()
1060                    .flat_map(|info| {
1061                        info.fragment_infos()
1062                            .map(move |fragment| (fragment, info.job_id))
1063                    })
1064                    .chain(
1065                        creating_streaming_job_controls
1066                            .values()
1067                            .flat_map(|job| job.fragment_infos_with_job_id()),
1068                    ),
1069            );
1070
1071        let committed_epoch = barrier_info.prev_epoch();
1072        let new_epoch = barrier_info.curr_epoch;
1073        let database_info = InflightDatabaseInfo::recover(
1074            database_id,
1075            database_jobs.into_values(),
1076            self.control_stream_manager()
1077                .env
1078                .shared_actor_infos()
1079                .clone(),
1080        );
1081        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1082        Ok(DatabaseCheckpointControl::recovery(
1083            database_id,
1084            database_state,
1085            committed_epoch,
1086            database_info,
1087            creating_streaming_job_controls,
1088        ))
1089    }
1090}
1091
1092impl ControlStreamManager {
1093    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1094        self.workers
1095            .iter()
1096            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1097                WorkerNodeState::Connected { control_stream, .. } => {
1098                    Some((*worker_id, control_stream))
1099                }
1100                WorkerNodeState::Reconnecting(_) => None,
1101            })
1102    }
1103
1104    pub(super) fn inject_barrier(
1105        &mut self,
1106        partial_graph_id: PartialGraphId,
1107        mutation: Option<Mutation>,
1108        barrier_info: &BarrierInfo,
1109        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1110        table_ids_to_sync: impl Iterator<Item = TableId>,
1111        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1112        mut new_actors: Option<StreamJobActorsToCreate>,
1113    ) -> MetaResult<NodeToCollect> {
1114        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1115            "inject_barrier_err"
1116        ));
1117
1118        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1119
1120        nodes_to_sync_table.iter().for_each(|worker_id| {
1121            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1122        });
1123
1124        let mut node_need_collect = NodeToCollect::new();
1125        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1126
1127        node_actors.iter()
1128            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1129                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1130                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1131                    table_ids_to_sync.clone()
1132                } else {
1133                    vec![]
1134                };
1135
1136                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1137                    &&
1138                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1139                {
1140                    control_stream
1141                } else {
1142                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1143                };
1144
1145                {
1146                    let mutation = mutation.clone();
1147                    let barrier = Barrier {
1148                        epoch: Some(risingwave_pb::data::Epoch {
1149                            curr: barrier_info.curr_epoch(),
1150                            prev: barrier_info.prev_epoch(),
1151                        }),
1152                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1153                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1154                            .to_protobuf(),
1155                        kind: barrier_info.kind.to_protobuf() as i32,
1156                    };
1157
1158                    node.handle
1159                        .request_sender
1160                        .send(StreamingControlStreamRequest {
1161                            request: Some(
1162                                streaming_control_stream_request::Request::InjectBarrier(
1163                                    InjectBarrierRequest {
1164                                        request_id: Uuid::new_v4().to_string(),
1165                                        barrier: Some(barrier),
1166                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1167                                        table_ids_to_sync,
1168                                        partial_graph_id,
1169                                        actors_to_build: new_actors
1170                                            .as_mut()
1171                                            .map(|new_actors| new_actors.remove(worker_id))
1172                                            .into_iter()
1173                                            .flatten()
1174                                            .flatten()
1175                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1176                                                FragmentBuildActorInfo {
1177                                                    fragment_id,
1178                                                    node: Some(node),
1179                                                    actors: actors
1180                                                        .into_iter()
1181                                                        .map(|(actor, upstreams, dispatchers)| {
1182                                                            BuildActorInfo {
1183                                                                actor_id: actor.actor_id,
1184                                                                fragment_upstreams: upstreams
1185                                                                    .into_iter()
1186                                                                    .map(|(fragment_id, upstreams)| {
1187                                                                        (
1188                                                                            fragment_id,
1189                                                                            UpstreamActors {
1190                                                                                actors: upstreams
1191                                                                                    .into_values()
1192                                                                                    .collect(),
1193                                                                            },
1194                                                                        )
1195                                                                    })
1196                                                                    .collect(),
1197                                                                dispatchers,
1198                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1199                                                                mview_definition: actor.mview_definition,
1200                                                                expr_context: actor.expr_context,
1201                                                                config_override: actor.config_override.to_string(),
1202                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1203                                                            }
1204                                                        })
1205                                                        .collect(),
1206                                                }
1207                                            })
1208                                            .collect(),
1209                                    },
1210                                ),
1211                            ),
1212                        })
1213                        .map_err(|_| {
1214                            MetaError::from(anyhow!(
1215                                "failed to send request to {} {:?}",
1216                                node.worker_id,
1217                                node.host
1218                            ))
1219                        })?;
1220
1221                    node_need_collect.insert(*worker_id);
1222                    Result::<_, MetaError>::Ok(())
1223                }
1224            })
1225            .inspect_err(|e| {
1226                // Record failure in event log.
1227                use risingwave_pb::meta::event_log;
1228                let event = event_log::EventInjectBarrierFail {
1229                    prev_epoch: barrier_info.prev_epoch(),
1230                    cur_epoch: barrier_info.curr_epoch(),
1231                    error: e.to_report_string(),
1232                };
1233                self.env
1234                    .event_log_manager_ref()
1235                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1236            })?;
1237        Ok(node_need_collect)
1238    }
1239
1240    pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1241        self.connected_workers().for_each(|(_, node)| {
1242            if node
1243                .handle
1244                .request_sender
1245                .send(StreamingControlStreamRequest {
1246                    request: Some(
1247                        streaming_control_stream_request::Request::CreatePartialGraph(
1248                            CreatePartialGraphRequest {
1249                                partial_graph_id,
1250                            },
1251                        ),
1252                    ),
1253                }).is_err() {
1254                let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1255                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1256            }
1257        });
1258    }
1259
1260    pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1261        self.connected_workers().for_each(|(_, node)| {
1262            if node.handle
1263                .request_sender
1264                .send(StreamingControlStreamRequest {
1265                    request: Some(
1266                        streaming_control_stream_request::Request::RemovePartialGraph(
1267                            RemovePartialGraphRequest {
1268                                partial_graph_ids: partial_graph_ids.clone(),
1269                            },
1270                        ),
1271                    ),
1272                })
1273                .is_err()
1274            {
1275                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1276            }
1277        })
1278    }
1279
1280    pub(super) fn reset_partial_graphs(
1281        &mut self,
1282        partial_graph_ids: Vec<PartialGraphId>,
1283    ) -> HashSet<WorkerId> {
1284        self.connected_workers()
1285            .filter_map(|(worker_id, node)| {
1286                if node
1287                    .handle
1288                    .request_sender
1289                    .send(StreamingControlStreamRequest {
1290                        request: Some(
1291                            streaming_control_stream_request::Request::ResetPartialGraphs(
1292                                ResetPartialGraphsRequest {
1293                                    partial_graph_ids: partial_graph_ids.clone(),
1294                                },
1295                            ),
1296                        ),
1297                    })
1298                    .is_err()
1299                {
1300                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1301                    None
1302                } else {
1303                    Some(worker_id)
1304                }
1305            })
1306            .collect()
1307    }
1308}
1309
1310impl GlobalBarrierWorkerContextImpl {
1311    pub(super) async fn new_control_stream_impl(
1312        &self,
1313        node: &WorkerNode,
1314        init_request: &PbInitRequest,
1315    ) -> MetaResult<StreamingControlHandle> {
1316        let handle = self
1317            .env
1318            .stream_client_pool()
1319            .get(node)
1320            .await?
1321            .start_streaming_control(init_request.clone())
1322            .await?;
1323        Ok(handle)
1324    }
1325}
1326
1327pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1328    message: &str,
1329    errors: impl IntoIterator<Item = (WorkerId, E)>,
1330) -> MetaError {
1331    use std::fmt::Write;
1332
1333    use risingwave_common::error::error_request_copy;
1334    use risingwave_common::error::tonic::extra::Score;
1335
1336    let errors = errors.into_iter().collect_vec();
1337
1338    if errors.is_empty() {
1339        return anyhow!(message.to_owned()).into();
1340    }
1341
1342    // Create the error from the single error.
1343    let single_error = |(worker_id, e)| {
1344        anyhow::Error::from(e)
1345            .context(format!("{message}, in worker node {worker_id}"))
1346            .into()
1347    };
1348
1349    if errors.len() == 1 {
1350        return single_error(errors.into_iter().next().unwrap());
1351    }
1352
1353    // Find the error with the highest score.
1354    let max_score = errors
1355        .iter()
1356        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1357        .max();
1358
1359    if let Some(max_score) = max_score {
1360        let mut errors = errors;
1361        let max_scored = errors
1362            .extract_if(.., |(_, e)| {
1363                error_request_copy::<Score>(e) == Some(max_score)
1364            })
1365            .next()
1366            .unwrap();
1367
1368        return single_error(max_scored);
1369    }
1370
1371    // The errors do not have scores, so simply concatenate them.
1372    let concat: String = errors
1373        .into_iter()
1374        .fold(format!("{message}: "), |mut s, (w, e)| {
1375            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1376            s
1377        });
1378    anyhow!(concat).into()
1379}
1380
1381#[cfg(test)]
1382mod test_partial_graph_id {
1383    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1384
1385    #[test]
1386    fn test_partial_graph_id_conversion() {
1387        let database_id = 233.into();
1388        let job_id = 233.into();
1389        assert_eq!(
1390            (database_id, None),
1391            from_partial_graph_id(to_partial_graph_id(database_id, None))
1392        );
1393        assert_eq!(
1394            (database_id, Some(job_id)),
1395            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1396        );
1397    }
1398}