risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
54    streaming_control_stream_request, streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69};
70use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
71use crate::barrier::edge_builder::FragmentEdgeBuilder;
72use crate::barrier::info::{
73    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
74    SubscriberType,
75};
76use crate::barrier::progress::CreateMviewProgressTracker;
77use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
78use crate::controller::fragment::InflightFragmentInfo;
79use crate::manager::MetaSrvEnv;
80use crate::model::{
81    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
82    SubscriptionId,
83};
84use crate::stream::cdc::{
85    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
86};
87use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
88use crate::{MetaError, MetaResult};
89
90pub(super) fn to_partial_graph_id(
91    database_id: DatabaseId,
92    creating_job_id: Option<JobId>,
93) -> PartialGraphId {
94    let raw_job_id = creating_job_id
95        .map(|job_id| {
96            assert_ne!(job_id, u32::MAX);
97            job_id.as_raw_id()
98        })
99        .unwrap_or(u32::MAX);
100    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
101}
102
103pub(super) fn from_partial_graph_id(
104    partial_graph_id: PartialGraphId,
105) -> (DatabaseId, Option<JobId>) {
106    let id = partial_graph_id.as_raw_id();
107    let database_id = (id >> 32) as u32;
108    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
109    let creating_job_id = if raw_creating_job_id == u32::MAX {
110        None
111    } else {
112        Some(JobId::new(raw_creating_job_id))
113    };
114    (database_id.into(), creating_job_id)
115}
116
117fn build_locality_fragment_state_table_mapping(
118    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
119) -> HashMap<FragmentId, Vec<TableId>> {
120    let mut mapping = HashMap::new();
121
122    for (fragment_id, fragment_info) in fragment_infos {
123        let mut state_table_ids = Vec::new();
124        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
125            if let Some(NodeBody::LocalityProvider(locality_provider)) =
126                stream_node.node_body.as_ref()
127            {
128                let state_table_id = locality_provider
129                    .state_table
130                    .as_ref()
131                    .expect("must have state table")
132                    .id;
133                state_table_ids.push(state_table_id);
134                false
135            } else {
136                true
137            }
138        });
139        if !state_table_ids.is_empty() {
140            mapping.insert(*fragment_id, state_table_ids);
141        }
142    }
143
144    mapping
145}
146
147struct ControlStreamNode {
148    worker_id: WorkerId,
149    host: HostAddress,
150    handle: StreamingControlHandle,
151}
152
153enum WorkerNodeState {
154    Connected {
155        control_stream: ControlStreamNode,
156        removed: bool,
157    },
158    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
159}
160
161pub(super) struct ControlStreamManager {
162    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
163    pub env: MetaSrvEnv,
164}
165
166impl ControlStreamManager {
167    pub(super) fn new(env: MetaSrvEnv) -> Self {
168        Self {
169            workers: Default::default(),
170            env,
171        }
172    }
173
174    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
175        self.workers[&worker_id].0.host.clone().unwrap()
176    }
177
178    pub(super) async fn add_worker(
179        &mut self,
180        node: WorkerNode,
181        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
182        term_id: String,
183        context: &impl GlobalBarrierWorkerContext,
184    ) {
185        let node_id = node.id;
186        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
187            let (existing_node, worker_state) = entry.get();
188            assert_eq!(existing_node.host, node.host);
189            warn!(id = %node.id, host = ?node.host, "node already exists");
190            match worker_state {
191                WorkerNodeState::Connected { .. } => {
192                    warn!(id = %node.id, host = ?node.host, "new node already connected");
193                    return;
194                }
195                WorkerNodeState::Reconnecting(_) => {
196                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
197                    entry.remove();
198                }
199            }
200        }
201        let node_host = node.host.clone().unwrap();
202        let mut backoff = ExponentialBackoff::from_millis(100)
203            .max_delay(Duration::from_secs(3))
204            .factor(5);
205        const MAX_RETRY: usize = 5;
206        for i in 1..=MAX_RETRY {
207            match context
208                .new_control_stream(
209                    &node,
210                    &PbInitRequest {
211                        term_id: term_id.clone(),
212                    },
213                )
214                .await
215            {
216                Ok(mut handle) => {
217                    WorkerNodeConnected {
218                        handle: &mut handle,
219                        node: &node,
220                    }
221                    .initialize(inflight_infos);
222                    info!(?node_host, "add control stream worker");
223                    assert!(
224                        self.workers
225                            .insert(
226                                node_id,
227                                (
228                                    node,
229                                    WorkerNodeState::Connected {
230                                        control_stream: ControlStreamNode {
231                                            worker_id: node_id as _,
232                                            host: node_host,
233                                            handle,
234                                        },
235                                        removed: false
236                                    }
237                                )
238                            )
239                            .is_none()
240                    );
241                    return;
242                }
243                Err(e) => {
244                    // It may happen that the dns information of newly registered worker node
245                    // has not been propagated to the meta node and cause error. Wait for a while and retry
246                    let delay = backoff.next().unwrap();
247                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
248                    sleep(delay).await;
249                }
250            }
251        }
252        error!(?node_host, "fail to create worker node after retry");
253    }
254
255    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
256        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
257            let (_, worker_state) = entry.get_mut();
258            match worker_state {
259                WorkerNodeState::Connected { removed, .. } => {
260                    info!(worker_id = %node.id, "mark connected worker as removed");
261                    *removed = true;
262                }
263                WorkerNodeState::Reconnecting(_) => {
264                    info!(worker_id = %node.id, "remove worker");
265                    entry.remove();
266                }
267            }
268        }
269    }
270
271    fn retry_connect(
272        node: WorkerNode,
273        term_id: String,
274        context: Arc<impl GlobalBarrierWorkerContext>,
275    ) -> BoxFuture<'static, StreamingControlHandle> {
276        async move {
277            let mut attempt = 0;
278            let backoff = ExponentialBackoff::from_millis(100)
279                .max_delay(Duration::from_mins(1))
280                .factor(5);
281            let init_request = PbInitRequest { term_id };
282            for delay in backoff {
283                attempt += 1;
284                sleep(delay).await;
285                match context.new_control_stream(&node, &init_request).await {
286                    Ok(handle) => {
287                        return handle;
288                    }
289                    Err(e) => {
290                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
291                    }
292                }
293            }
294            unreachable!("end of retry backoff")
295        }.boxed()
296    }
297
298    pub(super) async fn recover(
299        env: MetaSrvEnv,
300        nodes: &HashMap<WorkerId, WorkerNode>,
301        term_id: &str,
302        context: Arc<impl GlobalBarrierWorkerContext>,
303    ) -> Self {
304        let reset_start_time = Instant::now();
305        let init_request = PbInitRequest {
306            term_id: term_id.to_owned(),
307        };
308        let init_request = &init_request;
309        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
310            let result = context.new_control_stream(node, init_request).await;
311            (*worker_id, node.clone(), result)
312        }))
313        .await;
314        let mut unconnected_workers = HashSet::new();
315        let mut workers = HashMap::new();
316        for (worker_id, node, result) in nodes {
317            match result {
318                Ok(handle) => {
319                    let control_stream = ControlStreamNode {
320                        worker_id: node.id,
321                        host: node.host.clone().unwrap(),
322                        handle,
323                    };
324                    assert!(
325                        workers
326                            .insert(
327                                worker_id,
328                                (
329                                    node,
330                                    WorkerNodeState::Connected {
331                                        control_stream,
332                                        removed: false
333                                    }
334                                )
335                            )
336                            .is_none()
337                    );
338                }
339                Err(e) => {
340                    unconnected_workers.insert(worker_id);
341                    warn!(
342                        e = %e.as_report(),
343                        %worker_id,
344                        ?node,
345                        "failed to connect to node"
346                    );
347                    assert!(
348                        workers
349                            .insert(
350                                worker_id,
351                                (
352                                    node.clone(),
353                                    WorkerNodeState::Reconnecting(Self::retry_connect(
354                                        node,
355                                        term_id.to_owned(),
356                                        context.clone()
357                                    ))
358                                )
359                            )
360                            .is_none()
361                    );
362                }
363            }
364        }
365
366        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
367
368        Self { workers, env }
369    }
370
371    /// Clear all nodes and response streams in the manager.
372    pub(super) fn clear(&mut self) {
373        *self = Self::new(self.env.clone());
374    }
375}
376
377pub(super) struct WorkerNodeConnected<'a> {
378    node: &'a WorkerNode,
379    handle: &'a mut StreamingControlHandle,
380}
381
382impl<'a> WorkerNodeConnected<'a> {
383    pub(super) fn initialize(
384        self,
385        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
386    ) {
387        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
388            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
389                request: Some(
390                    streaming_control_stream_request::Request::CreatePartialGraph(request),
391                ),
392            }) {
393                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
394            }
395        }
396    }
397}
398
399pub(super) enum WorkerNodeEvent<'a> {
400    Response(MetaResult<streaming_control_stream_response::Response>),
401    Connected(WorkerNodeConnected<'a>),
402}
403
404impl ControlStreamManager {
405    fn poll_next_event<'a>(
406        this_opt: &mut Option<&'a mut Self>,
407        cx: &mut Context<'_>,
408        term_id: &str,
409        context: &Arc<impl GlobalBarrierWorkerContext>,
410        poll_reconnect: bool,
411    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
412        let this = this_opt.as_mut().expect("Future polled after completion");
413        if this.workers.is_empty() {
414            return Poll::Pending;
415        }
416        {
417            for (&worker_id, (node, worker_state)) in &mut this.workers {
418                let control_stream = match worker_state {
419                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
420                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
421                        continue;
422                    }
423                    WorkerNodeState::Reconnecting(join_handle) => {
424                        match join_handle.poll_unpin(cx) {
425                            Poll::Ready(handle) => {
426                                info!(id=%node.id, host=?node.host, "reconnected to worker");
427                                *worker_state = WorkerNodeState::Connected {
428                                    control_stream: ControlStreamNode {
429                                        worker_id: node.id,
430                                        host: node.host.clone().unwrap(),
431                                        handle,
432                                    },
433                                    removed: false,
434                                };
435                                let this = this_opt.take().expect("should exist");
436                                let (node, worker_state) =
437                                    this.workers.get_mut(&worker_id).expect("should exist");
438                                let WorkerNodeState::Connected { control_stream, .. } =
439                                    worker_state
440                                else {
441                                    unreachable!()
442                                };
443                                return Poll::Ready((
444                                    worker_id,
445                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
446                                        node,
447                                        handle: &mut control_stream.handle,
448                                    }),
449                                ));
450                            }
451                            Poll::Pending => {
452                                continue;
453                            }
454                        }
455                    }
456                };
457                match control_stream.handle.response_stream.poll_next_unpin(cx) {
458                    Poll::Ready(result) => {
459                        {
460                            let result = result
461                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
462                                .and_then(|result| {
463                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
464                                        match resp
465                                            .response
466                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
467                                        {
468                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
469                                                "worker node {worker_id} is shutting down"
470                                            )
471                                                .into())),
472                                            streaming_control_stream_response::Response::Init(_) => {
473                                                // This arm should be unreachable.
474                                                Err((false, anyhow!("get unexpected init response").into()))
475                                            }
476                                            resp => {
477                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
478                                                    assert_eq!(worker_id, barrier_resp.worker_id);
479                                                }
480                                                Ok(resp)
481                                            }
482                                        }
483                                    })
484                                });
485                            let result = match result {
486                                Ok(resp) => Ok(resp),
487                                Err((shutdown, err)) => {
488                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
489                                    let WorkerNodeState::Connected { removed, .. } = worker_state
490                                    else {
491                                        unreachable!("checked connected")
492                                    };
493                                    if *removed || shutdown {
494                                        this.workers.remove(&worker_id);
495                                    } else {
496                                        *worker_state = WorkerNodeState::Reconnecting(
497                                            ControlStreamManager::retry_connect(
498                                                node.clone(),
499                                                term_id.to_owned(),
500                                                context.clone(),
501                                            ),
502                                        );
503                                    }
504                                    Err(err)
505                                }
506                            };
507                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
508                        }
509                    }
510                    Poll::Pending => {
511                        continue;
512                    }
513                }
514            }
515        };
516
517        Poll::Pending
518    }
519
520    #[await_tree::instrument("control_stream_next_event")]
521    pub(super) async fn next_event<'a>(
522        &'a mut self,
523        term_id: &str,
524        context: &Arc<impl GlobalBarrierWorkerContext>,
525    ) -> (WorkerId, WorkerNodeEvent<'a>) {
526        let mut this = Some(self);
527        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
528    }
529
530    #[await_tree::instrument("control_stream_next_response")]
531    pub(super) async fn next_response(
532        &mut self,
533        term_id: &str,
534        context: &Arc<impl GlobalBarrierWorkerContext>,
535    ) -> (
536        WorkerId,
537        MetaResult<streaming_control_stream_response::Response>,
538    ) {
539        let mut this = Some(self);
540        let (worker_id, event) =
541            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
542        match event {
543            WorkerNodeEvent::Response(result) => (worker_id, result),
544            WorkerNodeEvent::Connected(_) => {
545                unreachable!("set poll_reconnect=false")
546            }
547        }
548    }
549
550    fn collect_init_partial_graph(
551        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
552    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
553        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
554            [PbCreatePartialGraphRequest {
555                partial_graph_id: to_partial_graph_id(database_id, None),
556            }]
557            .into_iter()
558            .chain(
559                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
560                    partial_graph_id: to_partial_graph_id(database_id, Some(job_id)),
561                }),
562            )
563        })
564    }
565}
566
567pub(super) struct DatabaseInitialBarrierCollector {
568    database_id: DatabaseId,
569    node_to_collect: NodeToCollect,
570    database_state: BarrierWorkerState,
571    database_info: InflightDatabaseInfo,
572    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
573    committed_epoch: u64,
574}
575
576impl Debug for DatabaseInitialBarrierCollector {
577    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
578        f.debug_struct("DatabaseInitialBarrierCollector")
579            .field("database_id", &self.database_id)
580            .field("node_to_collect", &self.node_to_collect)
581            .finish()
582    }
583}
584
585impl DatabaseInitialBarrierCollector {
586    pub(super) fn is_collected(&self) -> bool {
587        self.node_to_collect.is_empty()
588            && self
589                .creating_streaming_job_controls
590                .values()
591                .all(|job| job.is_empty())
592    }
593
594    pub(super) fn database_state(
595        &self,
596    ) -> (
597        &BarrierWorkerState,
598        &HashMap<JobId, CreatingStreamingJobControl>,
599    ) {
600        (&self.database_state, &self.creating_streaming_job_controls)
601    }
602
603    pub(super) fn creating_job_ids(&self) -> impl Iterator<Item = JobId> {
604        self.creating_streaming_job_controls.keys().copied()
605    }
606
607    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
608        let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
609        assert_eq!(self.database_id, database_id);
610        if let Some(creating_job_id) = creating_job_id {
611            assert!(
612                !self
613                    .creating_streaming_job_controls
614                    .get_mut(&creating_job_id)
615                    .expect("should exist")
616                    .collect(resp),
617                "unlikely to finish backfill since just recovered"
618            );
619        } else {
620            assert_eq!(resp.epoch, self.committed_epoch);
621            assert!(self.node_to_collect.remove(&resp.worker_id));
622        }
623    }
624
625    pub(super) fn finish(self) -> DatabaseCheckpointControl {
626        assert!(self.is_collected());
627        DatabaseCheckpointControl::recovery(
628            self.database_id,
629            self.database_state,
630            self.committed_epoch,
631            self.database_info,
632            self.creating_streaming_job_controls,
633        )
634    }
635
636    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
637        is_valid_after_worker_err(&self.node_to_collect, worker_id)
638            && self
639                .creating_streaming_job_controls
640                .values()
641                .all(|job| job.is_valid_after_worker_err(worker_id))
642    }
643}
644
645impl ControlStreamManager {
646    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
647    #[expect(clippy::too_many_arguments)]
648    pub(super) fn inject_database_initial_barrier(
649        &mut self,
650        database_id: DatabaseId,
651        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
652        backfill_orders: HashMap<JobId, HashMap<FragmentId, Vec<FragmentId>>>,
653        state_table_committed_epochs: &mut HashMap<TableId, u64>,
654        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
655        fragment_relations: &FragmentDownstreamRelation,
656        stream_actors: &HashMap<ActorId, StreamActor>,
657        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
658        background_jobs: &mut HashSet<JobId>,
659        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
660        is_paused: bool,
661        hummock_version_stats: &HummockVersionStats,
662        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
663        injected_creating_jobs: &mut HashSet<JobId>,
664    ) -> MetaResult<DatabaseInitialBarrierCollector> {
665        self.add_partial_graph(database_id, None);
666        fn collect_source_splits(
667            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
668            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
669        ) -> HashMap<ActorId, Vec<SplitImpl>> {
670            fragment_infos
671                .flat_map(|info| info.actors.keys())
672                .filter_map(|actor_id| {
673                    let actor_id = *actor_id as ActorId;
674                    source_splits
675                        .remove(&actor_id)
676                        .map(|splits| (actor_id, splits))
677                })
678                .collect()
679        }
680        fn build_mutation(
681            splits: &HashMap<ActorId, Vec<SplitImpl>>,
682            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
683            backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
684            is_paused: bool,
685        ) -> Mutation {
686            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
687                .into_iter()
688                .collect();
689            Mutation::Add(AddMutation {
690                // Actors built during recovery is not treated as newly added actors.
691                actor_dispatchers: Default::default(),
692                added_actors: Default::default(),
693                actor_splits: build_actor_connector_splits(splits),
694                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
695                    splits: cdc_table_snapshot_split_assignment,
696                }),
697                pause: is_paused,
698                subscriptions_to_add: Default::default(),
699                backfill_nodes_to_pause,
700                new_upstream_sinks: Default::default(),
701            })
702        }
703
704        fn resolve_jobs_committed_epoch<'a>(
705            state_table_committed_epochs: &mut HashMap<TableId, u64>,
706            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
707        ) -> u64 {
708            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
709                (
710                    table_id,
711                    state_table_committed_epochs
712                        .remove(&table_id)
713                        .expect("should exist"),
714                )
715            });
716            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
717            for (table_id, epoch) in epochs {
718                assert_eq!(
719                    prev_epoch, epoch,
720                    "{} has different committed epoch to {}",
721                    first_table_id, table_id
722                );
723            }
724            prev_epoch
725        }
726
727        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
728            .keys()
729            .filter_map(|job_id| {
730                mv_depended_subscriptions
731                    .remove(&job_id.as_mv_table_id())
732                    .map(|subscriptions| {
733                        (
734                            job_id.as_mv_table_id(),
735                            subscriptions
736                                .into_iter()
737                                .map(|(subscription_id, retention)| {
738                                    (
739                                        subscription_id.as_subscriber_id(),
740                                        SubscriberType::Subscription(retention),
741                                    )
742                                })
743                                .collect(),
744                        )
745                    })
746            })
747            .collect();
748
749        let mut database_jobs = HashMap::new();
750        let mut snapshot_backfill_jobs = HashMap::new();
751
752        for (job_id, job_fragments) in jobs {
753            if background_jobs.remove(&job_id) {
754                if job_fragments.values().any(|fragment| {
755                    fragment
756                        .fragment_type_mask
757                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
758                }) {
759                    debug!(%job_id, "recovered snapshot backfill job");
760                    snapshot_backfill_jobs.insert(job_id, job_fragments);
761                } else {
762                    database_jobs.insert(job_id, (job_fragments, true));
763                }
764            } else {
765                database_jobs.insert(job_id, (job_fragments, false));
766            }
767        }
768
769        let database_job_log_epochs: HashMap<_, _> = database_jobs
770            .keys()
771            .filter_map(|job_id| {
772                state_table_log_epochs
773                    .remove(&job_id.as_mv_table_id())
774                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
775            })
776            .collect();
777
778        let prev_epoch = resolve_jobs_committed_epoch(
779            state_table_committed_epochs,
780            database_jobs.values().flat_map(|(job, _)| job.values()),
781        );
782        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
783        // Use a different `curr_epoch` for each recovery attempt.
784        let curr_epoch = prev_epoch.next();
785        let barrier_info = BarrierInfo {
786            prev_epoch,
787            curr_epoch,
788            kind: BarrierKind::Initial,
789        };
790
791        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
792        for (job_id, fragment_infos) in snapshot_backfill_jobs {
793            let committed_epoch =
794                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
795            if committed_epoch == barrier_info.prev_epoch() {
796                info!(
797                    "recovered creating snapshot backfill job {} catch up with upstream already",
798                    job_id
799                );
800                database_jobs
801                    .try_insert(job_id, (fragment_infos, true))
802                    .expect("non-duplicate");
803                continue;
804            }
805            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
806                fragment_infos
807                    .values()
808                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
809            )?
810            .0
811            .ok_or_else(|| {
812                anyhow!(
813                    "recovered snapshot backfill job {} has no snapshot backfill info",
814                    job_id
815                )
816            })?;
817            let mut snapshot_epoch = None;
818            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
819                .upstream_mv_table_id_to_backfill_epoch
820                .keys()
821                .cloned()
822                .collect();
823            for (upstream_table_id, epoch) in
824                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
825            {
826                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
827                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
828                if *snapshot_epoch != epoch {
829                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
830                }
831            }
832            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
833                anyhow!(
834                    "snapshot backfill job {} has not set snapshot epoch",
835                    job_id
836                )
837            })?;
838            for upstream_table_id in &upstream_table_ids {
839                subscribers
840                    .entry(*upstream_table_id)
841                    .or_default()
842                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
843                    .expect("non-duplicate");
844            }
845            ongoing_snapshot_backfill_jobs
846                .try_insert(
847                    job_id,
848                    (
849                        fragment_infos,
850                        upstream_table_ids,
851                        committed_epoch,
852                        snapshot_epoch,
853                    ),
854                )
855                .expect("non-duplicated");
856        }
857
858        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
859            HashMap::new();
860
861        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
862            database_jobs
863                .into_iter()
864                .map(|(job_id, (fragment_infos, is_background_creating))| {
865                    let status = if is_background_creating {
866                        let backfill_ordering =
867                            backfill_orders.get(&job_id).cloned().unwrap_or_default();
868                        let locality_fragment_state_table_mapping =
869                            build_locality_fragment_state_table_mapping(&fragment_infos);
870                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
871                            &backfill_ordering,
872                            &fragment_infos,
873                            locality_fragment_state_table_mapping,
874                        );
875                        CreateStreamingJobStatus::Creating {
876                            tracker: CreateMviewProgressTracker::recover(
877                                job_id,
878                                &fragment_infos,
879                                backfill_order_state,
880                                hummock_version_stats,
881                            ),
882                        }
883                    } else {
884                        CreateStreamingJobStatus::Created
885                    };
886                    let cdc_table_backfill_tracker =
887                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
888                            let cdc_fragment = fragment_infos
889                                .values()
890                                .find(|fragment| {
891                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
892                                        fragment.fragment_type_mask,
893                                        &fragment.nodes,
894                                    )
895                                    .is_some()
896                                })
897                                .expect("should have parallel cdc fragment");
898                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
899                            let mut tracker =
900                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
901                            cdc_table_snapshot_split_assignment
902                                .extend(tracker.reassign_splits(cdc_actors)?);
903                            Some(tracker)
904                        } else {
905                            None
906                        };
907                    Ok((
908                        job_id,
909                        InflightStreamingJobInfo {
910                            job_id,
911                            fragment_infos,
912                            subscribers: subscribers
913                                .remove(&job_id.as_mv_table_id())
914                                .unwrap_or_default(),
915                            status,
916                            cdc_table_backfill_tracker,
917                        },
918                    ))
919                })
920                .try_collect::<_, _, MetaError>()
921        }?;
922
923        let mut builder = FragmentEdgeBuilder::new(
924            database_jobs
925                .values()
926                .flat_map(|job| {
927                    job.fragment_infos()
928                        .map(|info| (info, to_partial_graph_id(database_id, None)))
929                })
930                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
931                    |(job_id, (fragments, ..))| {
932                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
933                        fragments
934                            .values()
935                            .map(move |fragment| (fragment, partial_graph_id))
936                    },
937                )),
938            self,
939        );
940        builder.add_relations(fragment_relations);
941        let mut edges = builder.build();
942
943        let node_to_collect = {
944            let new_actors =
945                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
946                    job.fragment_infos.values().map(move |fragment_infos| {
947                        (
948                            fragment_infos.fragment_id,
949                            &fragment_infos.nodes,
950                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
951                                (
952                                    stream_actors.get(actor_id).expect("should exist"),
953                                    actor.worker_id,
954                                )
955                            }),
956                            job.subscribers.keys().copied(),
957                        )
958                    })
959                }));
960
961            let nodes_actors =
962                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
963            let database_job_source_splits =
964                collect_source_splits(database_jobs.values().flatten(), source_splits);
965            let database_backfill_orders = database_jobs
966                .values()
967                .flat_map(|job| {
968                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
969                        backfill_orders
970                            .get(&job.job_id)
971                            .cloned()
972                            .unwrap_or_default()
973                    } else {
974                        HashMap::new()
975                    }
976                })
977                .collect();
978            let mutation = build_mutation(
979                &database_job_source_splits,
980                cdc_table_snapshot_split_assignment,
981                &database_backfill_orders,
982                is_paused,
983            );
984
985            let node_to_collect = self.inject_barrier(
986                database_id,
987                None,
988                Some(mutation),
989                &barrier_info,
990                &nodes_actors,
991                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
992                nodes_actors.keys().copied(),
993                Some(new_actors),
994            )?;
995            debug!(
996                ?node_to_collect,
997                %database_id,
998                "inject initial barrier"
999            );
1000            node_to_collect
1001        };
1002
1003        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
1004            HashMap::new();
1005        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1006            ongoing_snapshot_backfill_jobs
1007        {
1008            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1009                (
1010                    fragment_infos.fragment_id,
1011                    &fragment_infos.nodes,
1012                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1013                        (
1014                            stream_actors.get(actor_id).expect("should exist"),
1015                            actor.worker_id,
1016                        )
1017                    }),
1018                    vec![], // no subscribers for backfilling jobs,
1019                )
1020            }));
1021
1022            let database_job_source_splits =
1023                collect_source_splits(database_jobs.values().flatten(), source_splits);
1024            assert!(
1025                !cdc_table_snapshot_splits.contains_key(&job_id),
1026                "snapshot backfill job {job_id} should not have cdc backfill"
1027            );
1028            if is_paused {
1029                bail!("should not pause when having snapshot backfill job {job_id}");
1030            }
1031            let job_backfill_orders = backfill_orders.get(&job_id).cloned().unwrap_or_default();
1032            let mutation = build_mutation(
1033                &database_job_source_splits,
1034                Default::default(), // no cdc backfill job for
1035                &job_backfill_orders,
1036                false,
1037            );
1038
1039            // add the job_id to `injected_creating_jobs` before it attempts to call `CreatingStreamingJobControl::recover`,
1040            // which may create a new partial graph in CN.
1041            injected_creating_jobs.insert(job_id);
1042            creating_streaming_job_controls.insert(
1043                job_id,
1044                CreatingStreamingJobControl::recover(
1045                    database_id,
1046                    job_id,
1047                    upstream_table_ids,
1048                    &database_job_log_epochs,
1049                    snapshot_epoch,
1050                    committed_epoch,
1051                    &barrier_info,
1052                    info,
1053                    fragment_relations,
1054                    hummock_version_stats,
1055                    node_actors,
1056                    mutation,
1057                    self,
1058                )?,
1059            );
1060        }
1061
1062        self.env.shared_actor_infos().recover_database(
1063            database_id,
1064            database_jobs
1065                .values()
1066                .flat_map(|info| {
1067                    info.fragment_infos()
1068                        .map(move |fragment| (fragment, info.job_id))
1069                })
1070                .chain(
1071                    creating_streaming_job_controls
1072                        .values()
1073                        .flat_map(|job| job.fragment_infos_with_job_id()),
1074                ),
1075        );
1076
1077        let committed_epoch = barrier_info.prev_epoch();
1078        let new_epoch = barrier_info.curr_epoch;
1079        let database_info = InflightDatabaseInfo::recover(
1080            database_id,
1081            database_jobs.into_values(),
1082            self.env.shared_actor_infos().clone(),
1083        );
1084        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1085        Ok(DatabaseInitialBarrierCollector {
1086            database_id,
1087            node_to_collect,
1088            database_state,
1089            database_info,
1090            creating_streaming_job_controls,
1091            committed_epoch,
1092        })
1093    }
1094
1095    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1096        self.workers
1097            .iter()
1098            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1099                WorkerNodeState::Connected { control_stream, .. } => {
1100                    Some((*worker_id, control_stream))
1101                }
1102                WorkerNodeState::Reconnecting(_) => None,
1103            })
1104    }
1105
1106    pub(super) fn inject_barrier(
1107        &mut self,
1108        database_id: DatabaseId,
1109        creating_job_id: Option<JobId>,
1110        mutation: Option<Mutation>,
1111        barrier_info: &BarrierInfo,
1112        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1113        table_ids_to_sync: impl Iterator<Item = TableId>,
1114        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1115        mut new_actors: Option<StreamJobActorsToCreate>,
1116    ) -> MetaResult<NodeToCollect> {
1117        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1118            "inject_barrier_err"
1119        ));
1120
1121        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1122
1123        let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1124
1125        nodes_to_sync_table.iter().for_each(|worker_id| {
1126            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1127        });
1128
1129        let mut node_need_collect = NodeToCollect::new();
1130        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1131
1132        node_actors.iter()
1133            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1134                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1135                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1136                    table_ids_to_sync.clone()
1137                } else {
1138                    vec![]
1139                };
1140
1141                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1142                    &&
1143                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1144                {
1145                    control_stream
1146                } else {
1147                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1148                };
1149
1150                {
1151                    let mutation = mutation.clone();
1152                    let barrier = Barrier {
1153                        epoch: Some(risingwave_pb::data::Epoch {
1154                            curr: barrier_info.curr_epoch(),
1155                            prev: barrier_info.prev_epoch(),
1156                        }),
1157                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1158                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1159                            .to_protobuf(),
1160                        kind: barrier_info.kind.to_protobuf() as i32,
1161                    };
1162
1163                    node.handle
1164                        .request_sender
1165                        .send(StreamingControlStreamRequest {
1166                            request: Some(
1167                                streaming_control_stream_request::Request::InjectBarrier(
1168                                    InjectBarrierRequest {
1169                                        request_id: Uuid::new_v4().to_string(),
1170                                        barrier: Some(barrier),
1171                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1172                                        table_ids_to_sync,
1173                                        partial_graph_id,
1174                                        actors_to_build: new_actors
1175                                            .as_mut()
1176                                            .map(|new_actors| new_actors.remove(worker_id))
1177                                            .into_iter()
1178                                            .flatten()
1179                                            .flatten()
1180                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1181                                                FragmentBuildActorInfo {
1182                                                    fragment_id,
1183                                                    node: Some(node),
1184                                                    actors: actors
1185                                                        .into_iter()
1186                                                        .map(|(actor, upstreams, dispatchers)| {
1187                                                            BuildActorInfo {
1188                                                                actor_id: actor.actor_id,
1189                                                                fragment_upstreams: upstreams
1190                                                                    .into_iter()
1191                                                                    .map(|(fragment_id, upstreams)| {
1192                                                                        (
1193                                                                            fragment_id,
1194                                                                            UpstreamActors {
1195                                                                                actors: upstreams
1196                                                                                    .into_values()
1197                                                                                    .collect(),
1198                                                                            },
1199                                                                        )
1200                                                                    })
1201                                                                    .collect(),
1202                                                                dispatchers,
1203                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1204                                                                mview_definition: actor.mview_definition,
1205                                                                expr_context: actor.expr_context,
1206                                                                config_override: actor.config_override.to_string(),
1207                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1208                                                            }
1209                                                        })
1210                                                        .collect(),
1211                                                }
1212                                            })
1213                                            .collect(),
1214                                    },
1215                                ),
1216                            ),
1217                        })
1218                        .map_err(|_| {
1219                            MetaError::from(anyhow!(
1220                                "failed to send request to {} {:?}",
1221                                node.worker_id,
1222                                node.host
1223                            ))
1224                        })?;
1225
1226                    node_need_collect.insert(*worker_id);
1227                    Result::<_, MetaError>::Ok(())
1228                }
1229            })
1230            .inspect_err(|e| {
1231                // Record failure in event log.
1232                use risingwave_pb::meta::event_log;
1233                let event = event_log::EventInjectBarrierFail {
1234                    prev_epoch: barrier_info.prev_epoch(),
1235                    cur_epoch: barrier_info.curr_epoch(),
1236                    error: e.to_report_string(),
1237                };
1238                self.env
1239                    .event_log_manager_ref()
1240                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1241            })?;
1242        Ok(node_need_collect)
1243    }
1244
1245    pub(super) fn add_partial_graph(
1246        &mut self,
1247        database_id: DatabaseId,
1248        creating_job_id: Option<JobId>,
1249    ) {
1250        let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1251        self.connected_workers().for_each(|(_, node)| {
1252            if node
1253                .handle
1254                .request_sender
1255                .send(StreamingControlStreamRequest {
1256                    request: Some(
1257                        streaming_control_stream_request::Request::CreatePartialGraph(
1258                            CreatePartialGraphRequest {
1259                                partial_graph_id,
1260                            },
1261                        ),
1262                    ),
1263                }).is_err() {
1264                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1265            }
1266        });
1267    }
1268
1269    pub(super) fn remove_partial_graph(
1270        &mut self,
1271        database_id: DatabaseId,
1272        creating_job_ids: Vec<JobId>,
1273    ) {
1274        if creating_job_ids.is_empty() {
1275            return;
1276        }
1277        let partial_graph_ids = creating_job_ids
1278            .into_iter()
1279            .map(|job_id| to_partial_graph_id(database_id, Some(job_id)))
1280            .collect_vec();
1281        self.connected_workers().for_each(|(_, node)| {
1282            if node.handle
1283                .request_sender
1284                .send(StreamingControlStreamRequest {
1285                    request: Some(
1286                        streaming_control_stream_request::Request::RemovePartialGraph(
1287                            RemovePartialGraphRequest {
1288                                partial_graph_ids: partial_graph_ids.clone(),
1289                            },
1290                        ),
1291                    ),
1292                })
1293                .is_err()
1294            {
1295                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1296            }
1297        })
1298    }
1299
1300    pub(super) fn reset_partial_graphs(
1301        &mut self,
1302        partial_graph_ids: Vec<PartialGraphId>,
1303    ) -> HashSet<WorkerId> {
1304        self.connected_workers()
1305            .filter_map(|(worker_id, node)| {
1306                if node
1307                    .handle
1308                    .request_sender
1309                    .send(StreamingControlStreamRequest {
1310                        request: Some(
1311                            streaming_control_stream_request::Request::ResetPartialGraphs(
1312                                ResetPartialGraphsRequest {
1313                                    partial_graph_ids: partial_graph_ids.clone(),
1314                                },
1315                            ),
1316                        ),
1317                    })
1318                    .is_err()
1319                {
1320                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1321                    None
1322                } else {
1323                    Some(worker_id)
1324                }
1325            })
1326            .collect()
1327    }
1328}
1329
1330impl GlobalBarrierWorkerContextImpl {
1331    pub(super) async fn new_control_stream_impl(
1332        &self,
1333        node: &WorkerNode,
1334        init_request: &PbInitRequest,
1335    ) -> MetaResult<StreamingControlHandle> {
1336        let handle = self
1337            .env
1338            .stream_client_pool()
1339            .get(node)
1340            .await?
1341            .start_streaming_control(init_request.clone())
1342            .await?;
1343        Ok(handle)
1344    }
1345}
1346
1347pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1348    message: &str,
1349    errors: impl IntoIterator<Item = (WorkerId, E)>,
1350) -> MetaError {
1351    use std::fmt::Write;
1352
1353    use risingwave_common::error::error_request_copy;
1354    use risingwave_common::error::tonic::extra::Score;
1355
1356    let errors = errors.into_iter().collect_vec();
1357
1358    if errors.is_empty() {
1359        return anyhow!(message.to_owned()).into();
1360    }
1361
1362    // Create the error from the single error.
1363    let single_error = |(worker_id, e)| {
1364        anyhow::Error::from(e)
1365            .context(format!("{message}, in worker node {worker_id}"))
1366            .into()
1367    };
1368
1369    if errors.len() == 1 {
1370        return single_error(errors.into_iter().next().unwrap());
1371    }
1372
1373    // Find the error with the highest score.
1374    let max_score = errors
1375        .iter()
1376        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1377        .max();
1378
1379    if let Some(max_score) = max_score {
1380        let mut errors = errors;
1381        let max_scored = errors
1382            .extract_if(.., |(_, e)| {
1383                error_request_copy::<Score>(e) == Some(max_score)
1384            })
1385            .next()
1386            .unwrap();
1387
1388        return single_error(max_scored);
1389    }
1390
1391    // The errors do not have scores, so simply concatenate them.
1392    let concat: String = errors
1393        .into_iter()
1394        .fold(format!("{message}: "), |mut s, (w, e)| {
1395            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1396            s
1397        });
1398    anyhow!(concat).into()
1399}
1400
1401#[cfg(test)]
1402mod test_partial_graph_id {
1403    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1404
1405    #[test]
1406    fn test_partial_graph_id_conversion() {
1407        let database_id = 233.into();
1408        let job_id = 233.into();
1409        assert_eq!(
1410            (database_id, None),
1411            from_partial_graph_id(to_partial_graph_id(database_id, None))
1412        );
1413        assert_eq!(
1414            (database_id, Some(job_id)),
1415            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1416        );
1417    }
1418}