risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54    streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69    DatabaseCheckpointControlMetrics,
70};
71use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
72use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
73use crate::barrier::info::{
74    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
75    SubscriberType,
76};
77use crate::barrier::partial_graph::PartialGraphRecoverer;
78use crate::barrier::progress::CreateMviewProgressTracker;
79use crate::barrier::utils::NodeToCollect;
80use crate::controller::fragment::InflightFragmentInfo;
81use crate::controller::utils::StreamingJobExtraInfo;
82use crate::manager::MetaSrvEnv;
83use crate::model::{
84    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
85    SubscriptionId,
86};
87use crate::stream::cdc::{
88    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
89};
90use crate::stream::{
91    ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
92    build_actor_connector_splits,
93};
94use crate::{MetaError, MetaResult};
95
96pub(super) fn to_partial_graph_id(
97    database_id: DatabaseId,
98    creating_job_id: Option<JobId>,
99) -> PartialGraphId {
100    let raw_job_id = creating_job_id
101        .map(|job_id| {
102            assert_ne!(job_id, u32::MAX);
103            job_id.as_raw_id()
104        })
105        .unwrap_or(u32::MAX);
106    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
107}
108
109pub(super) fn from_partial_graph_id(
110    partial_graph_id: PartialGraphId,
111) -> (DatabaseId, Option<JobId>) {
112    let id = partial_graph_id.as_raw_id();
113    let database_id = (id >> 32) as u32;
114    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
115    let creating_job_id = if raw_creating_job_id == u32::MAX {
116        None
117    } else {
118        Some(JobId::new(raw_creating_job_id))
119    };
120    (database_id.into(), creating_job_id)
121}
122
123pub(super) fn build_locality_fragment_state_table_mapping(
124    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
125) -> HashMap<FragmentId, Vec<TableId>> {
126    let mut mapping = HashMap::new();
127
128    for (fragment_id, fragment_info) in fragment_infos {
129        let mut state_table_ids = Vec::new();
130        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
131            if let Some(NodeBody::LocalityProvider(locality_provider)) =
132                stream_node.node_body.as_ref()
133            {
134                let state_table_id = locality_provider
135                    .state_table
136                    .as_ref()
137                    .expect("must have state table")
138                    .id;
139                state_table_ids.push(state_table_id);
140                false
141            } else {
142                true
143            }
144        });
145        if !state_table_ids.is_empty() {
146            mapping.insert(*fragment_id, state_table_ids);
147        }
148    }
149
150    mapping
151}
152
153pub(super) fn database_partial_graphs<'a>(
154    database_id: DatabaseId,
155    creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
156) -> impl Iterator<Item = PartialGraphId> + 'a {
157    creating_jobs
158        .map(Some)
159        .chain([None])
160        .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
161}
162
163struct ControlStreamNode {
164    worker_id: WorkerId,
165    host: HostAddress,
166    handle: StreamingControlHandle,
167}
168
169enum WorkerNodeState {
170    Connected {
171        control_stream: ControlStreamNode,
172        removed: bool,
173    },
174    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
175}
176
177pub(super) struct ControlStreamManager {
178    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
179    pub env: MetaSrvEnv,
180}
181
182impl ControlStreamManager {
183    pub(super) fn new(env: MetaSrvEnv) -> Self {
184        Self {
185            workers: Default::default(),
186            env,
187        }
188    }
189
190    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
191        self.workers[&worker_id].0.host.clone().unwrap()
192    }
193
194    pub(super) async fn add_worker(
195        &mut self,
196        node: WorkerNode,
197        partial_graphs: impl Iterator<Item = PartialGraphId>,
198        term_id: &String,
199        context: &impl GlobalBarrierWorkerContext,
200    ) {
201        let node_id = node.id;
202        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
203            let (existing_node, worker_state) = entry.get();
204            assert_eq!(existing_node.host, node.host);
205            warn!(id = %node.id, host = ?node.host, "node already exists");
206            match worker_state {
207                WorkerNodeState::Connected { .. } => {
208                    warn!(id = %node.id, host = ?node.host, "new node already connected");
209                    return;
210                }
211                WorkerNodeState::Reconnecting(_) => {
212                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
213                    entry.remove();
214                }
215            }
216        }
217        let node_host = node.host.clone().unwrap();
218        let mut backoff = ExponentialBackoff::from_millis(100)
219            .max_delay(Duration::from_secs(3))
220            .factor(5);
221        const MAX_RETRY: usize = 5;
222        for i in 1..=MAX_RETRY {
223            match context
224                .new_control_stream(
225                    &node,
226                    &PbInitRequest {
227                        term_id: term_id.clone(),
228                    },
229                )
230                .await
231            {
232                Ok(mut handle) => {
233                    WorkerNodeConnected {
234                        handle: &mut handle,
235                        node: &node,
236                    }
237                    .initialize(partial_graphs);
238                    info!(?node_host, "add control stream worker");
239                    assert!(
240                        self.workers
241                            .insert(
242                                node_id,
243                                (
244                                    node,
245                                    WorkerNodeState::Connected {
246                                        control_stream: ControlStreamNode {
247                                            worker_id: node_id as _,
248                                            host: node_host,
249                                            handle,
250                                        },
251                                        removed: false
252                                    }
253                                )
254                            )
255                            .is_none()
256                    );
257                    return;
258                }
259                Err(e) => {
260                    // It may happen that the dns information of newly registered worker node
261                    // has not been propagated to the meta node and cause error. Wait for a while and retry
262                    let delay = backoff.next().unwrap();
263                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
264                    sleep(delay).await;
265                }
266            }
267        }
268        error!(?node_host, "fail to create worker node after retry");
269    }
270
271    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
272        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
273            let (_, worker_state) = entry.get_mut();
274            match worker_state {
275                WorkerNodeState::Connected { removed, .. } => {
276                    info!(worker_id = %node.id, "mark connected worker as removed");
277                    *removed = true;
278                }
279                WorkerNodeState::Reconnecting(_) => {
280                    info!(worker_id = %node.id, "remove worker");
281                    entry.remove();
282                }
283            }
284        }
285    }
286
287    fn retry_connect(
288        node: WorkerNode,
289        term_id: String,
290        context: Arc<impl GlobalBarrierWorkerContext>,
291    ) -> BoxFuture<'static, StreamingControlHandle> {
292        async move {
293            let mut attempt = 0;
294            let backoff = ExponentialBackoff::from_millis(100)
295                .max_delay(Duration::from_mins(1))
296                .factor(5);
297            let init_request = PbInitRequest { term_id };
298            for delay in backoff {
299                attempt += 1;
300                sleep(delay).await;
301                match context.new_control_stream(&node, &init_request).await {
302                    Ok(handle) => {
303                        return handle;
304                    }
305                    Err(e) => {
306                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
307                    }
308                }
309            }
310            unreachable!("end of retry backoff")
311        }.boxed()
312    }
313
314    pub(super) async fn recover(
315        env: MetaSrvEnv,
316        nodes: &HashMap<WorkerId, WorkerNode>,
317        term_id: &str,
318        context: Arc<impl GlobalBarrierWorkerContext>,
319    ) -> Self {
320        let reset_start_time = Instant::now();
321        let init_request = PbInitRequest {
322            term_id: term_id.to_owned(),
323        };
324        let init_request = &init_request;
325        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
326            let result = context.new_control_stream(node, init_request).await;
327            (*worker_id, node.clone(), result)
328        }))
329        .await;
330        let mut unconnected_workers = HashSet::new();
331        let mut workers = HashMap::new();
332        for (worker_id, node, result) in nodes {
333            match result {
334                Ok(handle) => {
335                    let control_stream = ControlStreamNode {
336                        worker_id: node.id,
337                        host: node.host.clone().unwrap(),
338                        handle,
339                    };
340                    assert!(
341                        workers
342                            .insert(
343                                worker_id,
344                                (
345                                    node,
346                                    WorkerNodeState::Connected {
347                                        control_stream,
348                                        removed: false
349                                    }
350                                )
351                            )
352                            .is_none()
353                    );
354                }
355                Err(e) => {
356                    unconnected_workers.insert(worker_id);
357                    warn!(
358                        e = %e.as_report(),
359                        %worker_id,
360                        ?node,
361                        "failed to connect to node"
362                    );
363                    assert!(
364                        workers
365                            .insert(
366                                worker_id,
367                                (
368                                    node.clone(),
369                                    WorkerNodeState::Reconnecting(Self::retry_connect(
370                                        node,
371                                        term_id.to_owned(),
372                                        context.clone()
373                                    ))
374                                )
375                            )
376                            .is_none()
377                    );
378                }
379            }
380        }
381
382        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
383
384        Self { workers, env }
385    }
386
387    /// Clear all nodes and response streams in the manager.
388    pub(super) fn clear(&mut self) {
389        *self = Self::new(self.env.clone());
390    }
391}
392
393pub(super) struct WorkerNodeConnected<'a> {
394    node: &'a WorkerNode,
395    handle: &'a mut StreamingControlHandle,
396}
397
398impl<'a> WorkerNodeConnected<'a> {
399    pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
400        for partial_graph_id in partial_graphs {
401            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
402                request: Some(
403                    streaming_control_stream_request::Request::CreatePartialGraph(
404                        PbCreatePartialGraphRequest { partial_graph_id },
405                    ),
406                ),
407            }) {
408                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
409            }
410        }
411    }
412}
413
414pub(super) enum WorkerNodeEvent<'a> {
415    Response(MetaResult<streaming_control_stream_response::Response>),
416    Connected(WorkerNodeConnected<'a>),
417}
418
419impl ControlStreamManager {
420    fn poll_next_event<'a>(
421        this_opt: &mut Option<&'a mut Self>,
422        cx: &mut Context<'_>,
423        term_id: &str,
424        context: &Arc<impl GlobalBarrierWorkerContext>,
425        poll_reconnect: bool,
426    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
427        let this = this_opt.as_mut().expect("Future polled after completion");
428        if this.workers.is_empty() {
429            return Poll::Pending;
430        }
431        {
432            for (&worker_id, (node, worker_state)) in &mut this.workers {
433                let control_stream = match worker_state {
434                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
435                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
436                        continue;
437                    }
438                    WorkerNodeState::Reconnecting(join_handle) => {
439                        match join_handle.poll_unpin(cx) {
440                            Poll::Ready(handle) => {
441                                info!(id=%node.id, host=?node.host, "reconnected to worker");
442                                *worker_state = WorkerNodeState::Connected {
443                                    control_stream: ControlStreamNode {
444                                        worker_id: node.id,
445                                        host: node.host.clone().unwrap(),
446                                        handle,
447                                    },
448                                    removed: false,
449                                };
450                                let this = this_opt.take().expect("should exist");
451                                let (node, worker_state) =
452                                    this.workers.get_mut(&worker_id).expect("should exist");
453                                let WorkerNodeState::Connected { control_stream, .. } =
454                                    worker_state
455                                else {
456                                    unreachable!()
457                                };
458                                return Poll::Ready((
459                                    worker_id,
460                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
461                                        node,
462                                        handle: &mut control_stream.handle,
463                                    }),
464                                ));
465                            }
466                            Poll::Pending => {
467                                continue;
468                            }
469                        }
470                    }
471                };
472                match control_stream.handle.response_stream.poll_next_unpin(cx) {
473                    Poll::Ready(result) => {
474                        {
475                            let result = result
476                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
477                                .and_then(|result| {
478                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
479                                        match resp
480                                            .response
481                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
482                                        {
483                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
484                                                "worker node {worker_id} is shutting down"
485                                            )
486                                                .into())),
487                                            streaming_control_stream_response::Response::Init(_) => {
488                                                // This arm should be unreachable.
489                                                Err((false, anyhow!("get unexpected init response").into()))
490                                            }
491                                            resp => {
492                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
493                                                    assert_eq!(worker_id, barrier_resp.worker_id);
494                                                }
495                                                Ok(resp)
496                                            }
497                                        }
498                                    })
499                                });
500                            let result = match result {
501                                Ok(resp) => Ok(resp),
502                                Err((shutdown, err)) => {
503                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
504                                    let WorkerNodeState::Connected { removed, .. } = worker_state
505                                    else {
506                                        unreachable!("checked connected")
507                                    };
508                                    if *removed || shutdown {
509                                        this.workers.remove(&worker_id);
510                                    } else {
511                                        *worker_state = WorkerNodeState::Reconnecting(
512                                            ControlStreamManager::retry_connect(
513                                                node.clone(),
514                                                term_id.to_owned(),
515                                                context.clone(),
516                                            ),
517                                        );
518                                    }
519                                    Err(err)
520                                }
521                            };
522                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
523                        }
524                    }
525                    Poll::Pending => {
526                        continue;
527                    }
528                }
529            }
530        };
531
532        Poll::Pending
533    }
534
535    #[await_tree::instrument("control_stream_next_event")]
536    pub(super) async fn next_event<'a>(
537        &'a mut self,
538        term_id: &str,
539        context: &Arc<impl GlobalBarrierWorkerContext>,
540    ) -> (WorkerId, WorkerNodeEvent<'a>) {
541        let mut this = Some(self);
542        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
543    }
544
545    #[await_tree::instrument("control_stream_next_response")]
546    pub(super) async fn next_response(
547        &mut self,
548        term_id: &str,
549        context: &Arc<impl GlobalBarrierWorkerContext>,
550    ) -> (
551        WorkerId,
552        MetaResult<streaming_control_stream_response::Response>,
553    ) {
554        let mut this = Some(self);
555        let (worker_id, event) =
556            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
557        match event {
558            WorkerNodeEvent::Response(result) => (worker_id, result),
559            WorkerNodeEvent::Connected(_) => {
560                unreachable!("set poll_reconnect=false")
561            }
562        }
563    }
564}
565
566pub(super) struct DatabaseInitialBarrierCollector {
567    pub(super) database_id: DatabaseId,
568    pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
569    pub(super) database: DatabaseCheckpointControl,
570}
571
572impl Debug for DatabaseInitialBarrierCollector {
573    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
574        f.debug_struct("DatabaseInitialBarrierCollector")
575            .field("database_id", &self.database_id)
576            .field("initializing_graphs", &self.initializing_partial_graphs)
577            .finish()
578    }
579}
580
581impl DatabaseInitialBarrierCollector {
582    pub(super) fn is_collected(&self) -> bool {
583        self.initializing_partial_graphs.is_empty()
584    }
585
586    pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
587        assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
588    }
589
590    pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
591        database_partial_graphs(
592            self.database_id,
593            self.database
594                .creating_streaming_job_controls
595                .keys()
596                .copied(),
597        )
598    }
599
600    pub(super) fn finish(self) -> DatabaseCheckpointControl {
601        assert!(self.is_collected());
602        self.database
603    }
604
605    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
606        self.database.is_valid_after_worker_err(worker_id)
607    }
608}
609
610impl PartialGraphRecoverer<'_> {
611    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
612    #[expect(clippy::too_many_arguments)]
613    pub(super) fn inject_database_initial_barrier(
614        &mut self,
615        database_id: DatabaseId,
616        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
617        job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
618        state_table_committed_epochs: &mut HashMap<TableId, u64>,
619        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
620        fragment_relations: &FragmentDownstreamRelation,
621        stream_actors: &HashMap<ActorId, StreamActor>,
622        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
623        background_jobs: &mut HashSet<JobId>,
624        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
625        is_paused: bool,
626        hummock_version_stats: &HummockVersionStats,
627        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
628    ) -> MetaResult<DatabaseCheckpointControl> {
629        fn collect_source_splits(
630            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
631            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
632        ) -> HashMap<ActorId, Vec<SplitImpl>> {
633            fragment_infos
634                .flat_map(|info| info.actors.keys())
635                .filter_map(|actor_id| {
636                    let actor_id = *actor_id as ActorId;
637                    source_splits
638                        .remove(&actor_id)
639                        .map(|splits| (actor_id, splits))
640                })
641                .collect()
642        }
643        fn build_mutation(
644            splits: &HashMap<ActorId, Vec<SplitImpl>>,
645            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
646            backfill_orders: &ExtendedFragmentBackfillOrder,
647            is_paused: bool,
648        ) -> Mutation {
649            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
650                .into_iter()
651                .collect();
652            Mutation::Add(AddMutation {
653                // Actors built during recovery is not treated as newly added actors.
654                actor_dispatchers: Default::default(),
655                added_actors: Default::default(),
656                actor_splits: build_actor_connector_splits(splits),
657                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
658                    splits: cdc_table_snapshot_split_assignment,
659                }),
660                pause: is_paused,
661                subscriptions_to_add: Default::default(),
662                backfill_nodes_to_pause,
663                new_upstream_sinks: Default::default(),
664            })
665        }
666
667        fn resolve_jobs_committed_epoch<'a>(
668            state_table_committed_epochs: &mut HashMap<TableId, u64>,
669            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
670        ) -> u64 {
671            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
672                (
673                    table_id,
674                    state_table_committed_epochs
675                        .remove(&table_id)
676                        .expect("should exist"),
677                )
678            });
679            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
680            for (table_id, epoch) in epochs {
681                assert_eq!(
682                    prev_epoch, epoch,
683                    "{} has different committed epoch to {}",
684                    first_table_id, table_id
685                );
686            }
687            prev_epoch
688        }
689        fn job_backfill_orders(
690            job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
691            job_id: JobId,
692        ) -> UserDefinedFragmentBackfillOrder {
693            UserDefinedFragmentBackfillOrder::new(
694                job_extra_info
695                    .get(&job_id)
696                    .and_then(|info| info.backfill_orders.clone())
697                    .map_or_else(HashMap::new, |orders| orders.0),
698            )
699        }
700
701        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
702            .keys()
703            .filter_map(|job_id| {
704                mv_depended_subscriptions
705                    .remove(&job_id.as_mv_table_id())
706                    .map(|subscriptions| {
707                        (
708                            job_id.as_mv_table_id(),
709                            subscriptions
710                                .into_iter()
711                                .map(|(subscription_id, retention)| {
712                                    (
713                                        subscription_id.as_subscriber_id(),
714                                        SubscriberType::Subscription(retention),
715                                    )
716                                })
717                                .collect(),
718                        )
719                    })
720            })
721            .collect();
722
723        let mut database_jobs = HashMap::new();
724        let mut snapshot_backfill_jobs = HashMap::new();
725
726        for (job_id, job_fragments) in jobs {
727            if background_jobs.remove(&job_id) {
728                if job_fragments.values().any(|fragment| {
729                    fragment
730                        .fragment_type_mask
731                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
732                }) {
733                    debug!(%job_id, "recovered snapshot backfill job");
734                    snapshot_backfill_jobs.insert(job_id, job_fragments);
735                } else {
736                    database_jobs.insert(job_id, (job_fragments, true));
737                }
738            } else {
739                database_jobs.insert(job_id, (job_fragments, false));
740            }
741        }
742
743        let database_job_log_epochs: HashMap<_, _> = database_jobs
744            .keys()
745            .filter_map(|job_id| {
746                state_table_log_epochs
747                    .remove(&job_id.as_mv_table_id())
748                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
749            })
750            .collect();
751
752        let prev_epoch = resolve_jobs_committed_epoch(
753            state_table_committed_epochs,
754            database_jobs.values().flat_map(|(job, _)| job.values()),
755        );
756        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
757        // Use a different `curr_epoch` for each recovery attempt.
758        let curr_epoch = prev_epoch.next();
759        let barrier_info = BarrierInfo {
760            prev_epoch,
761            curr_epoch,
762            kind: BarrierKind::Initial,
763        };
764
765        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
766        for (job_id, fragment_infos) in snapshot_backfill_jobs {
767            let committed_epoch =
768                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
769            if committed_epoch == barrier_info.prev_epoch() {
770                info!(
771                    "recovered creating snapshot backfill job {} catch up with upstream already",
772                    job_id
773                );
774                database_jobs
775                    .try_insert(job_id, (fragment_infos, true))
776                    .expect("non-duplicate");
777                continue;
778            }
779            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
780                fragment_infos
781                    .values()
782                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
783            )?
784            .0
785            .ok_or_else(|| {
786                anyhow!(
787                    "recovered snapshot backfill job {} has no snapshot backfill info",
788                    job_id
789                )
790            })?;
791            let mut snapshot_epoch = None;
792            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
793                .upstream_mv_table_id_to_backfill_epoch
794                .keys()
795                .cloned()
796                .collect();
797            for (upstream_table_id, epoch) in
798                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
799            {
800                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
801                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
802                if *snapshot_epoch != epoch {
803                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
804                }
805            }
806            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
807                anyhow!(
808                    "snapshot backfill job {} has not set snapshot epoch",
809                    job_id
810                )
811            })?;
812            for upstream_table_id in &upstream_table_ids {
813                subscribers
814                    .entry(*upstream_table_id)
815                    .or_default()
816                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
817                    .expect("non-duplicate");
818            }
819            ongoing_snapshot_backfill_jobs
820                .try_insert(
821                    job_id,
822                    (
823                        fragment_infos,
824                        upstream_table_ids,
825                        committed_epoch,
826                        snapshot_epoch,
827                    ),
828                )
829                .expect("non-duplicated");
830        }
831
832        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
833            HashMap::new();
834
835        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
836            database_jobs
837                .into_iter()
838                .map(|(job_id, (fragment_infos, is_background_creating))| {
839                    let status = if is_background_creating {
840                        let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
841                        let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
842                            backfill_ordering,
843                            fragment_relations,
844                            || fragment_infos.iter().map(|(fragment_id, fragment)| {
845                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
846                        }));
847                        let locality_fragment_state_table_mapping =
848                            build_locality_fragment_state_table_mapping(&fragment_infos);
849                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
850                            &backfill_ordering,
851                            &fragment_infos,
852                            locality_fragment_state_table_mapping,
853                        );
854                        CreateStreamingJobStatus::Creating {
855                            tracker: CreateMviewProgressTracker::recover(
856                                job_id,
857                                &fragment_infos,
858                                backfill_order_state,
859                                hummock_version_stats,
860                            ),
861                        }
862                    } else {
863                        CreateStreamingJobStatus::Created
864                    };
865                    let cdc_table_backfill_tracker =
866                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
867                            let cdc_fragment = fragment_infos
868                                .values()
869                                .find(|fragment| {
870                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
871                                        fragment.fragment_type_mask,
872                                        &fragment.nodes,
873                                    )
874                                    .is_some()
875                                })
876                                .expect("should have parallel cdc fragment");
877                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
878                            let mut tracker =
879                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
880                            cdc_table_snapshot_split_assignment
881                                .extend(tracker.reassign_splits(cdc_actors)?);
882                            Some(tracker)
883                        } else {
884                            None
885                        };
886                    Ok((
887                        job_id,
888                        InflightStreamingJobInfo {
889                            job_id,
890                            fragment_infos,
891                            subscribers: subscribers
892                                .remove(&job_id.as_mv_table_id())
893                                .unwrap_or_default(),
894                            status,
895                            cdc_table_backfill_tracker,
896                        },
897                    ))
898                })
899                .try_collect::<_, _, MetaError>()
900        }?;
901
902        let control_stream_manager = self.control_stream_manager();
903        let mut builder = FragmentEdgeBuilder::new(
904            database_jobs
905                .values()
906                .flat_map(|job| {
907                    let partial_graph_id = to_partial_graph_id(database_id, None);
908                    job.fragment_infos().map(move |info| {
909                        (
910                            info.fragment_id,
911                            EdgeBuilderFragmentInfo::from_inflight(
912                                info,
913                                partial_graph_id,
914                                control_stream_manager,
915                            ),
916                        )
917                    })
918                })
919                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
920                    |(job_id, (fragments, ..))| {
921                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
922                        fragments.values().map(move |fragment| {
923                            (
924                                fragment.fragment_id,
925                                EdgeBuilderFragmentInfo::from_inflight(
926                                    fragment,
927                                    partial_graph_id,
928                                    control_stream_manager,
929                                ),
930                            )
931                        })
932                    },
933                )),
934        );
935        builder.add_relations(fragment_relations);
936        let mut edges = builder.build();
937
938        {
939            let new_actors =
940                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
941                    job.fragment_infos.values().map(move |fragment_infos| {
942                        (
943                            fragment_infos.fragment_id,
944                            &fragment_infos.nodes,
945                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
946                                (
947                                    stream_actors.get(actor_id).expect("should exist"),
948                                    actor.worker_id,
949                                )
950                            }),
951                            job.subscribers.keys().copied(),
952                        )
953                    })
954                }));
955
956            let nodes_actors =
957                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
958            let database_job_source_splits =
959                collect_source_splits(database_jobs.values().flatten(), source_splits);
960            let database_backfill_orders =
961                UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
962                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
963                        job_backfill_orders(job_extra_info, job.job_id)
964                    } else {
965                        UserDefinedFragmentBackfillOrder::default()
966                    }
967                }));
968            let database_backfill_orders =
969                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
970                    database_backfill_orders,
971                    fragment_relations,
972                    || {
973                        database_jobs.values().flat_map(|job_fragments| {
974                            job_fragments
975                                .fragment_infos
976                                .iter()
977                                .map(|(fragment_id, fragment)| {
978                                    (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
979                                })
980                        })
981                    },
982                );
983            let mutation = build_mutation(
984                &database_job_source_splits,
985                cdc_table_snapshot_split_assignment,
986                &database_backfill_orders,
987                is_paused,
988            );
989
990            let partial_graph_id = to_partial_graph_id(database_id, None);
991            self.recover_graph(
992                partial_graph_id,
993                mutation,
994                &barrier_info,
995                &nodes_actors,
996                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
997                new_actors,
998                DatabaseCheckpointControlMetrics::new(database_id),
999            )?;
1000            debug!(
1001                %database_id,
1002                "inject initial barrier"
1003            );
1004        };
1005
1006        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
1007            HashMap::new();
1008        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1009            ongoing_snapshot_backfill_jobs
1010        {
1011            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1012                (
1013                    fragment_infos.fragment_id,
1014                    &fragment_infos.nodes,
1015                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1016                        (
1017                            stream_actors.get(actor_id).expect("should exist"),
1018                            actor.worker_id,
1019                        )
1020                    }),
1021                    vec![], // no subscribers for backfilling jobs,
1022                )
1023            }));
1024
1025            let database_job_source_splits =
1026                collect_source_splits(database_jobs.values().flatten(), source_splits);
1027            assert!(
1028                !cdc_table_snapshot_splits.contains_key(&job_id),
1029                "snapshot backfill job {job_id} should not have cdc backfill"
1030            );
1031            if is_paused {
1032                bail!("should not pause when having snapshot backfill job {job_id}");
1033            }
1034            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1035            let job_backfill_orders =
1036                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1037                    job_backfill_orders,
1038                    fragment_relations,
1039                    || {
1040                        info.iter().map(|(fragment_id, fragment)| {
1041                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1042                        })
1043                    },
1044                );
1045            let mutation = build_mutation(
1046                &database_job_source_splits,
1047                Default::default(), // no cdc backfill job for
1048                &job_backfill_orders,
1049                false,
1050            );
1051
1052            let job = CreatingStreamingJobControl::recover(
1053                database_id,
1054                job_id,
1055                upstream_table_ids,
1056                &database_job_log_epochs,
1057                snapshot_epoch,
1058                committed_epoch,
1059                &barrier_info,
1060                info,
1061                job_backfill_orders,
1062                fragment_relations,
1063                hummock_version_stats,
1064                node_actors,
1065                mutation.clone(),
1066                self,
1067            )?;
1068            creating_streaming_job_controls.insert(job_id, job);
1069        }
1070
1071        self.control_stream_manager()
1072            .env
1073            .shared_actor_infos()
1074            .recover_database(
1075                database_id,
1076                database_jobs
1077                    .values()
1078                    .flat_map(|info| {
1079                        info.fragment_infos()
1080                            .map(move |fragment| (fragment, info.job_id))
1081                    })
1082                    .chain(
1083                        creating_streaming_job_controls
1084                            .values()
1085                            .flat_map(|job| job.fragment_infos_with_job_id()),
1086                    ),
1087            );
1088
1089        let committed_epoch = barrier_info.prev_epoch();
1090        let new_epoch = barrier_info.curr_epoch;
1091        let database_info = InflightDatabaseInfo::recover(
1092            database_id,
1093            database_jobs.into_values(),
1094            self.control_stream_manager()
1095                .env
1096                .shared_actor_infos()
1097                .clone(),
1098        );
1099        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1100        Ok(DatabaseCheckpointControl::recovery(
1101            database_id,
1102            database_state,
1103            committed_epoch,
1104            database_info,
1105            creating_streaming_job_controls,
1106        ))
1107    }
1108}
1109
1110impl ControlStreamManager {
1111    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1112        self.workers
1113            .iter()
1114            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1115                WorkerNodeState::Connected { control_stream, .. } => {
1116                    Some((*worker_id, control_stream))
1117                }
1118                WorkerNodeState::Reconnecting(_) => None,
1119            })
1120    }
1121
1122    pub(super) fn inject_barrier(
1123        &mut self,
1124        partial_graph_id: PartialGraphId,
1125        mutation: Option<Mutation>,
1126        barrier_info: &BarrierInfo,
1127        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1128        table_ids_to_sync: impl Iterator<Item = TableId>,
1129        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1130        mut new_actors: Option<StreamJobActorsToCreate>,
1131    ) -> MetaResult<NodeToCollect> {
1132        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1133            "inject_barrier_err"
1134        ));
1135
1136        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1137
1138        nodes_to_sync_table.iter().for_each(|worker_id| {
1139            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1140        });
1141
1142        let mut node_need_collect = NodeToCollect::new();
1143        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1144
1145        node_actors.iter()
1146            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1147                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1148                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1149                    table_ids_to_sync.clone()
1150                } else {
1151                    vec![]
1152                };
1153
1154                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1155                    &&
1156                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1157                {
1158                    control_stream
1159                } else {
1160                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1161                };
1162
1163                {
1164                    let mutation = mutation.clone();
1165                    let barrier = Barrier {
1166                        epoch: Some(risingwave_pb::data::Epoch {
1167                            curr: barrier_info.curr_epoch(),
1168                            prev: barrier_info.prev_epoch(),
1169                        }),
1170                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1171                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1172                            .to_protobuf(),
1173                        kind: barrier_info.kind.to_protobuf() as i32,
1174                    };
1175
1176                    node.handle
1177                        .request_sender
1178                        .send(StreamingControlStreamRequest {
1179                            request: Some(
1180                                streaming_control_stream_request::Request::InjectBarrier(
1181                                    InjectBarrierRequest {
1182                                        request_id: Uuid::new_v4().to_string(),
1183                                        barrier: Some(barrier),
1184                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1185                                        table_ids_to_sync,
1186                                        partial_graph_id,
1187                                        actors_to_build: new_actors
1188                                            .as_mut()
1189                                            .map(|new_actors| new_actors.remove(worker_id))
1190                                            .into_iter()
1191                                            .flatten()
1192                                            .flatten()
1193                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1194                                                FragmentBuildActorInfo {
1195                                                    fragment_id,
1196                                                    node: Some(node),
1197                                                    actors: actors
1198                                                        .into_iter()
1199                                                        .map(|(actor, upstreams, dispatchers)| {
1200                                                            BuildActorInfo {
1201                                                                actor_id: actor.actor_id,
1202                                                                fragment_upstreams: upstreams
1203                                                                    .into_iter()
1204                                                                    .map(|(fragment_id, upstreams)| {
1205                                                                        (
1206                                                                            fragment_id,
1207                                                                            UpstreamActors {
1208                                                                                actors: upstreams
1209                                                                                    .into_values()
1210                                                                                    .collect(),
1211                                                                            },
1212                                                                        )
1213                                                                    })
1214                                                                    .collect(),
1215                                                                dispatchers,
1216                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1217                                                                mview_definition: actor.mview_definition,
1218                                                                expr_context: actor.expr_context,
1219                                                                config_override: actor.config_override.to_string(),
1220                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1221                                                            }
1222                                                        })
1223                                                        .collect(),
1224                                                }
1225                                            })
1226                                            .collect(),
1227                                    },
1228                                ),
1229                            ),
1230                        })
1231                        .map_err(|_| {
1232                            MetaError::from(anyhow!(
1233                                "failed to send request to {} {:?}",
1234                                node.worker_id,
1235                                node.host
1236                            ))
1237                        })?;
1238
1239                    node_need_collect.insert(*worker_id);
1240                    Result::<_, MetaError>::Ok(())
1241                }
1242            })
1243            .inspect_err(|e| {
1244                // Record failure in event log.
1245                use risingwave_pb::meta::event_log;
1246                let event = event_log::EventInjectBarrierFail {
1247                    prev_epoch: barrier_info.prev_epoch(),
1248                    cur_epoch: barrier_info.curr_epoch(),
1249                    error: e.to_report_string(),
1250                };
1251                self.env
1252                    .event_log_manager_ref()
1253                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1254            })?;
1255        Ok(node_need_collect)
1256    }
1257
1258    pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1259        self.connected_workers().for_each(|(_, node)| {
1260            if node
1261                .handle
1262                .request_sender
1263                .send(StreamingControlStreamRequest {
1264                    request: Some(
1265                        streaming_control_stream_request::Request::CreatePartialGraph(
1266                            CreatePartialGraphRequest {
1267                                partial_graph_id,
1268                            },
1269                        ),
1270                    ),
1271                }).is_err() {
1272                let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1273                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1274            }
1275        });
1276    }
1277
1278    pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1279        self.connected_workers().for_each(|(_, node)| {
1280            if node.handle
1281                .request_sender
1282                .send(StreamingControlStreamRequest {
1283                    request: Some(
1284                        streaming_control_stream_request::Request::RemovePartialGraph(
1285                            RemovePartialGraphRequest {
1286                                partial_graph_ids: partial_graph_ids.clone(),
1287                            },
1288                        ),
1289                    ),
1290                })
1291                .is_err()
1292            {
1293                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1294            }
1295        })
1296    }
1297
1298    pub(super) fn reset_partial_graphs(
1299        &mut self,
1300        partial_graph_ids: Vec<PartialGraphId>,
1301    ) -> HashSet<WorkerId> {
1302        self.connected_workers()
1303            .filter_map(|(worker_id, node)| {
1304                if node
1305                    .handle
1306                    .request_sender
1307                    .send(StreamingControlStreamRequest {
1308                        request: Some(
1309                            streaming_control_stream_request::Request::ResetPartialGraphs(
1310                                ResetPartialGraphsRequest {
1311                                    partial_graph_ids: partial_graph_ids.clone(),
1312                                },
1313                            ),
1314                        ),
1315                    })
1316                    .is_err()
1317                {
1318                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1319                    None
1320                } else {
1321                    Some(worker_id)
1322                }
1323            })
1324            .collect()
1325    }
1326}
1327
1328impl GlobalBarrierWorkerContextImpl {
1329    pub(super) async fn new_control_stream_impl(
1330        &self,
1331        node: &WorkerNode,
1332        init_request: &PbInitRequest,
1333    ) -> MetaResult<StreamingControlHandle> {
1334        let handle = self
1335            .env
1336            .stream_client_pool()
1337            .get(node)
1338            .await?
1339            .start_streaming_control(init_request.clone())
1340            .await?;
1341        Ok(handle)
1342    }
1343}
1344
1345pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1346    message: &str,
1347    errors: impl IntoIterator<Item = (WorkerId, E)>,
1348) -> MetaError {
1349    use std::fmt::Write;
1350
1351    use risingwave_common::error::error_request_copy;
1352    use risingwave_common::error::tonic::extra::Score;
1353
1354    let errors = errors.into_iter().collect_vec();
1355
1356    if errors.is_empty() {
1357        return anyhow!(message.to_owned()).into();
1358    }
1359
1360    // Create the error from the single error.
1361    let single_error = |(worker_id, e)| {
1362        anyhow::Error::from(e)
1363            .context(format!("{message}, in worker node {worker_id}"))
1364            .into()
1365    };
1366
1367    if errors.len() == 1 {
1368        return single_error(errors.into_iter().next().unwrap());
1369    }
1370
1371    // Find the error with the highest score.
1372    let max_score = errors
1373        .iter()
1374        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1375        .max();
1376
1377    if let Some(max_score) = max_score {
1378        let mut errors = errors;
1379        let max_scored = errors
1380            .extract_if(.., |(_, e)| {
1381                error_request_copy::<Score>(e) == Some(max_score)
1382            })
1383            .next()
1384            .unwrap();
1385
1386        return single_error(max_scored);
1387    }
1388
1389    // The errors do not have scores, so simply concatenate them.
1390    let concat: String = errors
1391        .into_iter()
1392        .fold(format!("{message}: "), |mut s, (w, e)| {
1393            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1394            s
1395        });
1396    anyhow!(concat).into()
1397}
1398
1399#[cfg(test)]
1400mod test_partial_graph_id {
1401    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1402
1403    #[test]
1404    fn test_partial_graph_id_conversion() {
1405        let database_id = 233.into();
1406        let job_id = 233.into();
1407        assert_eq!(
1408            (database_id, None),
1409            from_partial_graph_id(to_partial_graph_id(database_id, None))
1410        );
1411        assert_eq!(
1412            (database_id, Some(job_id)),
1413            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1414        );
1415    }
1416}